host_access: error if the path does not exist
[bubblebox.git] / bubblebox.py
1 import sys, os, glob, random, string, subprocess
2 from pprint import pprint
3
4 HOME = os.environ["HOME"]
5 XDG_RUNTIME_DIR = os.environ["XDG_RUNTIME_DIR"]
6 BUBBLEBOX_DIR = XDG_RUNTIME_DIR + "/bubblebox"
7 os.makedirs(BUBBLEBOX_DIR, exist_ok=True)
8
9 def randname():
10     # choose from all lowercase letter
11     letters = string.ascii_lowercase
12     return ''.join(random.choice(letters) for i in range(8))
13
14 class BwrapInvocation:
15     """Gathered information for a bwrap invocation.
16     This will be created empty, and then each directive's `setup` function is called
17     with this object, so they can accumulate the bwrap flags and any other relevant state."""
18     def __init__(self):
19         # The flags to pass to bwrap.
20         self.flags = []
21         # Functions to call at the end of the setup process.
22         # They will receive this object as argument, so they can add further flags.
23         self.finalizers = []
24         # If this is `None` it means so far no d-bus proxy has been set up.
25         self.dbus_proxy_flags = None
26
27 class BwrapDirective:
28     """Directive that just passes flags to bwrap."""
29     def __init__(self, bwrap_flags):
30         self.bwrap_flags = bwrap_flags
31     def setup(self, bwrap):
32         bwrap.flags.extend(self.bwrap_flags)
33
34 class GroupDirective:
35     """Directive that groups a bunch of directives to be treated as one."""
36     def __init__(self, directives):
37         self.directives = directives
38     def setup(self, bwrap):
39         for directive in self.directives:
40             directive.setup(bwrap)
41
42 class DbusProxyDirective:
43     """Directive that sets up a d-bus proxy and adds flags to it.
44     If the directive is used multiple times, the flags accumulate."""
45     def __init__(self, dbus_proxy_flags):
46         self.dbus_proxy_flags = dbus_proxy_flags
47     def setup(self, bwrap):
48         if bwrap.dbus_proxy_flags is None:
49             # We are the first d-bus proxy directive. Set up the flags and the finalizer.
50             bwrap.dbus_proxy_flags = []
51             bwrap.finalizers.append(DbusProxyDirective.launch_dbus_proxy)
52         # Always add the flags.
53         bwrap.dbus_proxy_flags.extend(self.dbus_proxy_flags)
54     def launch_dbus_proxy(bwrap):
55         """Finalizer that launches a d-bus proxy with the flags accumulated in `bwrap`."""
56         # For the system bus, we assume it to be at a fixed location and provide it to the sandbox at that same location.
57         # For the session bus, we tell the proxy to talk to DBUS_SESSION_BUS_ADDRESS on the host, but we always put it
58         # at `$XDG_RUNTIME_DIR/bus` in the sandbox.
59         session_bus = XDG_RUNTIME_DIR + "/bus" # how the sandbox will see the bus
60         system_bus = "/run/dbus/system_bus_socket"
61         session_bus_proxy = BUBBLEBOX_DIR + "/bus-" + randname()
62         system_bus_proxy = BUBBLEBOX_DIR + "/bus-system-" + randname()
63         # Prepare a pipe to coordinate shutdown of bwrap and the proxy
64         bwrap_end, other_end = os.pipe() # both FDs are "non-inheritable" now
65         # Invoke the debus-proxy
66         args = ["/usr/bin/xdg-dbus-proxy", "--fd="+str(other_end)]
67         args += ["unix:path="+system_bus, system_bus_proxy, "--filter"] # just block everything for the system bus
68         args += [os.environ["DBUS_SESSION_BUS_ADDRESS"], session_bus_proxy, "--filter"] + bwrap.dbus_proxy_flags
69         #pprint(args)
70         subprocess.Popen(
71             args,
72             pass_fds = [other_end], # default is to pass only the std FDs!
73         )
74         # Wait until the proxy is ready
75         os.read(bwrap_end, 1)
76         assert os.path.exists(session_bus_proxy)
77         # Make sure bwrap can access the other end of the pipe
78         os.set_inheritable(bwrap_end, True)
79         # Put this at the usual location for the bus insode the sandbox.
80         # TODO: What if DBUS_SESSION_BUS_ADDRESS says something else?
81         bwrap.flags.extend((
82             "--setenv", "DBUS_SESSION_BUS_ADDRESS", "unix:path="+session_bus,
83             "--bind", session_bus_proxy, session_bus,
84             "--bind", system_bus_proxy, system_bus,
85             "--sync-fd", str(bwrap_end),
86         ))
87
88 # Constructors that should be used instead of directly mentioning the class above.
89 def bwrap_flags(*flags):
90     return BwrapDirective(flags)
91 def dbus_proxy_flags(*flags):
92     return DbusProxyDirective(flags)
93 def group(*directives):
94     return GroupDirective(directives)
95
96 # Run the application in the bubblebox with the given flags.
97 def bubblebox(*directives):
98     if len(sys.argv) <= 1:
99         print(f"USAGE: {sys.argv[0]} <program name> <program arguments>")
100         sys.exit(1)
101     # Make sure `--die-with-parent` is always set.
102     directives = group(bwrap_flags("--die-with-parent"), *directives)
103     # Compute the bwrap invocation by running all the directives.
104     bwrap = BwrapInvocation()
105     directives.setup(bwrap)
106     for finalizer in bwrap.finalizers:
107         finalizer(bwrap)
108     # Run bwrap
109     args = ["/usr/bin/bwrap"] + bwrap.flags + ["--"] + sys.argv[1:]
110     #pprint(args)
111     os.execvp(args[0], args)
112
113 # Give all instances of the same box a shared XDG_RUNTIME_DIR
114 def shared_runtime_dir(boxname):
115     dirname = BUBBLEBOX_DIR + "/" + boxname
116     os.makedirs(dirname, exist_ok=True)
117     return bwrap_flags("--bind", dirname, XDG_RUNTIME_DIR)
118
119 # Convenient way to declare host access
120 class Access:
121     Read = 0
122     Write = 1
123     Device = 2
124
125     def flag(val):
126         if val == Access.Read:
127             return "--ro-bind"
128         elif val == Access.Write:
129             return "--bind"
130         elif val == Access.Device:
131             return "--dev-bind"
132         else:
133             raise Exception(f"invalid access value: {val}")
134 def host_access(dirs):
135     def expand(root, names):
136         """`names` is one or more strings that can contain globs. Expand them all relative to `root`."""
137         if isinstance(names, str):
138             names = (names,)
139         assert isinstance(names, tuple)
140         for name in names:
141             assert not (name.startswith("../") or name.__contains__("/../") or name.endswith("../"))
142             path = root + "/" + name
143             # prettification
144             path = path.replace("//", "/")
145             path = path.removesuffix("/.")
146             # glob expansion
147             globbed = glob.glob(path)
148             if len(globbed) == 0:
149                 raise Exception(f"Path does not exist: {path}")
150             yield from globbed
151     def recursive_host_access(root, dirs, out):
152         for names, desc in dirs.items():
153             for path in expand(root, names):
154                 if isinstance(desc, dict):
155                     # Recurse into children
156                     recursive_host_access(path, desc, out)
157                 else:
158                     # Allow access to this path
159                     out.extend((Access.flag(desc), path, path))
160     # Start the recursive traversal
161     out = []
162     recursive_host_access("", dirs, out)
163     #pprint(out)
164     return bwrap_flags(*out)
165 def home_access(dirs):
166     return host_access({ HOME: dirs })
167
168 # Profile the profiles when importing bubblebox.
169 import profiles