cb7f5140148cfb52b091c66a2a80670e17228711
[bubblebox.git] / bubblebox.py
1 import sys, os, glob, random, string, subprocess
2 from pprint import pprint
3
4 HOME = os.environ["HOME"]
5 XDG_RUNTIME_DIR = os.environ["XDG_RUNTIME_DIR"]
6 BUBBLEBOX_DIR = XDG_RUNTIME_DIR + "/bubblebox"
7 os.makedirs(BUBBLEBOX_DIR, exist_ok=True)
8
9 def flat_map(f, xs):
10     """Concatenate the result of applying `f` to each element of `xs` to a list.
11     `None` is treated like the empty list."""
12     ys = []
13     for x in xs:
14         x_mapped = f(x)
15         if x_mapped is not None:
16             ys.extend(x_mapped)
17     return ys
18
19 def randname():
20     # choose from all lowercase letter
21     letters = string.ascii_lowercase
22     return ''.join(random.choice(letters) for i in range(8))
23
24 class BoxFlags:
25     """Flags that configure the bubblebox"""
26     def __init__(self, bwrap_flags = None, dbus_proxy_flags = None):
27         self.bwrap_flags = bwrap_flags
28         self.dbus_proxy_flags = dbus_proxy_flags
29
30 def launch_dbus_proxy(flags):
31     """Launches the dbus proxy and returns the bwrap flags to be used to talk to it."""
32     # Prepare a pipe to coordinate shutdown of bwrap and the proxy
33     bwrap_end, other_end = os.pipe() # both FDs are "non-inheritable" now
34     # Invoke the debus-proxy
35     filename = BUBBLEBOX_DIR + "/bus-" + randname()
36     args = ["/usr/bin/xdg-dbus-proxy", "--fd="+str(other_end)]
37     args += [os.environ["DBUS_SESSION_BUS_ADDRESS"], filename, "--filter"] + flags
38     #pprint(args)
39     subprocess.Popen(
40         args,
41         pass_fds = [other_end], # default is to pass only the std FDs!
42     )
43     # Wait until the proxy is ready
44     os.read(bwrap_end, 1)
45     assert os.path.exists(filename)
46     # Make sure bwrap can access the other end of the pipe
47     os.set_inheritable(bwrap_end, True)
48     # Put this at the usual location for the bus insode the sandbox.
49     # TODO: What if DBUS_SESSION_BUS_ADDRESS says something else?
50     return ["--bind", filename, XDG_RUNTIME_DIR + "/bus", "--sync-fd", str(bwrap_end)]
51
52 # Constructors that should be used instead of directly mentioning the class above.
53 def bwrap_flags(*flags):
54     return BoxFlags(bwrap_flags=flags)
55 def dbus_proxy_flags(*flags):
56     return BoxFlags(dbus_proxy_flags=flags)
57 def collect_flags(*flags):
58     bwrap_flags = flat_map(lambda x: x.bwrap_flags, flags)
59     dbus_proxy_flags = flat_map(lambda x: x.dbus_proxy_flags, flags)
60     return BoxFlags(bwrap_flags, dbus_proxy_flags)
61
62 # Run the application in the bubblebox with the given flags.
63 def bubblebox(*flags):
64     if len(sys.argv) <= 1:
65         print(f"USAGE: {sys.argv[0]} <program name> <program arguments>")
66         sys.exit(1)
67     # Make sure `--die-with-parent` is always set.
68     flags = collect_flags(bwrap_flags("--die-with-parent"), *flags)
69     bwrap = "/usr/bin/bwrap"
70     extraflags = []
71     if flags.dbus_proxy_flags:
72         extraflags += launch_dbus_proxy(flags.dbus_proxy_flags)
73     args = [bwrap] + flags.bwrap_flags + extraflags + ["--"] + sys.argv[1:]
74     #pprint(args)
75     os.execvp(args[0], args)
76
77 # Give all instances of the same box a shared XDG_RUNTIME_DIR
78 def shared_runtime_dir(boxname):
79     dirname = BUBBLEBOX_DIR + "/" + boxname
80     os.makedirs(dirname, exist_ok=True)
81     return bwrap_flags("--bind", dirname, XDG_RUNTIME_DIR)
82
83 # Convenient way to declare host access
84 class Access:
85     Read = 0
86     Write = 1
87     Device = 2
88
89     def flag(val):
90         if val == Access.Read:
91             return "--ro-bind"
92         elif val == Access.Write:
93             return "--bind"
94         elif val == Access.Device:
95             return "--dev-bind"
96         else:
97             raise Exception(f"invalid access value: {val}")
98 def host_access(dirs):
99     def expand(root, names):
100         """`names` is one or more strings that can contain globs. Expand them all relative to `root`."""
101         if isinstance(names, str):
102             names = (names,)
103         assert isinstance(names, tuple)
104         for name in names:
105             assert not (name.startswith("../") or name.__contains__("/../") or name.endswith("../"))
106             path = root + "/" + name
107             # prettification
108             path = path.replace("//", "/")
109             path = path.removesuffix("/.")
110             # glob expansion
111             yield from glob.glob(path)
112     def recursive_host_access(root, dirs, out):
113         for names, desc in dirs.items():
114             for path in expand(root, names):
115                 if isinstance(desc, dict):
116                     # Recurse into children
117                     recursive_host_access(path, desc, out)
118                 else:
119                     # Allow access to this path
120                     out.extend([Access.flag(desc), path, path])
121     # Start the recursive traversal
122     out = []
123     recursive_host_access("", dirs, out)
124     #pprint(out)
125     return bwrap_flags(*out)
126 def home_access(dirs):
127     return host_access({ HOME: dirs })
128
129 # Profile the profiles when importing bubblebox.
130 import profiles