add a nicer way to declare host access
[bubblebox.git] / bubblebox.py
1 import sys, os, glob, random, string, subprocess
2 from pprint import pprint
3
4 HOME = os.environ["HOME"]
5 XDG_RUNTIME_DIR = os.environ["XDG_RUNTIME_DIR"]
6 BUBBLEBOX_DIR = XDG_RUNTIME_DIR + "/bubblebox"
7 os.makedirs(BUBBLEBOX_DIR, exist_ok=True)
8
9 def flat_map(f, xs):
10     """Concatenate the result of applying `f` to each element of `xs` to a list.
11     `None` is treated like the empty list."""
12     ys = []
13     for x in xs:
14         x_mapped = f(x)
15         if x_mapped is not None:
16             ys.extend(x_mapped)
17     return ys
18
19 def randname():
20     # choose from all lowercase letter
21     letters = string.ascii_lowercase
22     return ''.join(random.choice(letters) for i in range(8))
23
24 class BoxFlags:
25     """Flags that configure the bubblebox"""
26     def __init__(self, bwrap_flags = None, dbus_proxy_flags = None):
27         self.bwrap_flags = bwrap_flags
28         self.dbus_proxy_flags = dbus_proxy_flags
29
30 def launch_dbus_proxy(flags):
31     """Launches the dbus proxy and returns the bwrap flags to be used to talk to it."""
32     # Prepare a pipe to coordinate shutdown of bwrap and the proxy
33     bwrap_end, other_end = os.pipe() # both FDs are "non-inheritable" now
34     # Invoke the debus-proxy
35     filename = BUBBLEBOX_DIR + "/bus-" + randname()
36     args = ["/usr/bin/xdg-dbus-proxy", "--fd="+str(other_end)]
37     args += [os.environ["DBUS_SESSION_BUS_ADDRESS"], filename, "--filter"] + flags
38     #pprint(args)
39     subprocess.Popen(
40         args,
41         pass_fds = [other_end], # default is to pass only the std FDs!
42     )
43     # Wait until the proxy is ready
44     os.read(bwrap_end, 1)
45     assert os.path.exists(filename)
46     # Make sure bwrap can access the other end of the pipe
47     os.set_inheritable(bwrap_end, True)
48     # Put this at the usual location for the bus insode the sandbox.
49     # TODO: What if DBUS_SESSION_BUS_ADDRESS says something else?
50     return ["--bind", filename, XDG_RUNTIME_DIR + "/bus", "--sync-fd", str(bwrap_end)]
51
52 # Constructors that should be used instead of directly mentioning the class above.
53 def bwrap_flags(*flags):
54     return BoxFlags(bwrap_flags=flags)
55 def dbus_proxy_flags(*flags):
56     return BoxFlags(dbus_proxy_flags=flags)
57 def collect_flags(*flags):
58     bwrap_flags = flat_map(lambda x: x.bwrap_flags, flags)
59     dbus_proxy_flags = flat_map(lambda x: x.dbus_proxy_flags, flags)
60     return BoxFlags(bwrap_flags, dbus_proxy_flags)
61
62 # Run the application in the bubblebox with the given flags.
63 def bubblebox(*flags):
64     flags = collect_flags(*flags)
65     bwrap = "/usr/bin/bwrap"
66     extraflags = []
67     if flags.dbus_proxy_flags:
68         extraflags += launch_dbus_proxy(flags.dbus_proxy_flags)
69     args = [bwrap] + flags.bwrap_flags + extraflags + ["--"] + sys.argv[1:]
70     #pprint(args)
71     os.execvp(args[0], args)
72
73 # Give all instances of the same box a shared XDG_RUNTIME_DIR
74 def shared_runtime_dir(boxname):
75     dirname = BUBBLEBOX_DIR + "/" + boxname
76     os.makedirs(dirname, exist_ok=True)
77     return bwrap_flags("--bind", dirname, XDG_RUNTIME_DIR)
78
79 # Convenient way to declare host access
80 class Access:
81     Read = 0
82     Write = 1
83     Device = 2
84
85     def flag(val):
86         if val == Access.Read:
87             return "--ro-bind"
88         elif val == Access.Write:
89             return "--bind"
90         elif val == Access.Device:
91             return "--dev-bind"
92         else:
93             raise Exception(f"invalid access value: {val}")
94 def host_access(dirs):
95     def expand(root, names):
96         """`names` is one or more strings that can contain globs. Expand them all relative to `root`."""
97         if isinstance(names, str):
98             names = (names,)
99         assert isinstance(names, tuple)
100         for name in names:
101             assert not (name.startswith("../") or name.__contains__("/../") or name.endswith("../"))
102             path = root + "/" + name
103             # prettification
104             path = path.replace("//", "/")
105             path = path.removesuffix("/.")
106             # glob expansion
107             yield from glob.glob(path)
108     def recursive_host_access(root, dirs, out):
109         for names, desc in dirs.items():
110             for path in expand(root, names):
111                 if isinstance(desc, dict):
112                     # Recurse into children
113                     recursive_host_access(path, desc, out)
114                 else:
115                     # Allow access to this path
116                     out.extend([Access.flag(desc), path, path])
117     # Start the recursive traversal
118     out = []
119     recursive_host_access("", dirs, out)
120     #pprint(out)
121     return bwrap_flags(*out)
122 def home_access(dirs):
123     return host_access({ HOME: dirs })
124
125 # Profile the profiles when importing bubblebox.
126 import profiles