X-Git-Url: https://git.ralfj.de/bubblebox.git/blobdiff_plain/9f759e738ebda4bc95c177c3d2206bb5481483bc..HEAD:/bubblebox.py diff --git a/bubblebox.py b/bubblebox.py index cb7f514..a6b9d96 100644 --- a/bubblebox.py +++ b/bubblebox.py @@ -6,71 +6,107 @@ XDG_RUNTIME_DIR = os.environ["XDG_RUNTIME_DIR"] BUBBLEBOX_DIR = XDG_RUNTIME_DIR + "/bubblebox" os.makedirs(BUBBLEBOX_DIR, exist_ok=True) -def flat_map(f, xs): - """Concatenate the result of applying `f` to each element of `xs` to a list. - `None` is treated like the empty list.""" - ys = [] - for x in xs: - x_mapped = f(x) - if x_mapped is not None: - ys.extend(x_mapped) - return ys - def randname(): # choose from all lowercase letter letters = string.ascii_lowercase return ''.join(random.choice(letters) for i in range(8)) -class BoxFlags: - """Flags that configure the bubblebox""" - def __init__(self, bwrap_flags = None, dbus_proxy_flags = None): +class BwrapInvocation: + """Gathered information for a bwrap invocation. + This will be created empty, and then each directive's `setup` function is called + with this object, so they can accumulate the bwrap flags and any other relevant state.""" + def __init__(self): + # The flags to pass to bwrap. + self.flags = [] + # Functions to call at the end of the setup process. + # They will receive this object as argument, so they can add further flags. + self.finalizers = [] + # If this is `None` it means so far no d-bus proxy has been set up. + self.dbus_proxy_flags = None + +class BwrapDirective: + """Directive that just passes flags to bwrap.""" + def __init__(self, bwrap_flags): self.bwrap_flags = bwrap_flags - self.dbus_proxy_flags = dbus_proxy_flags + def setup(self, bwrap): + bwrap.flags.extend(self.bwrap_flags) -def launch_dbus_proxy(flags): - """Launches the dbus proxy and returns the bwrap flags to be used to talk to it.""" - # Prepare a pipe to coordinate shutdown of bwrap and the proxy - bwrap_end, other_end = os.pipe() # both FDs are "non-inheritable" now - # Invoke the debus-proxy - filename = BUBBLEBOX_DIR + "/bus-" + randname() - args = ["/usr/bin/xdg-dbus-proxy", "--fd="+str(other_end)] - args += [os.environ["DBUS_SESSION_BUS_ADDRESS"], filename, "--filter"] + flags - #pprint(args) - subprocess.Popen( - args, - pass_fds = [other_end], # default is to pass only the std FDs! - ) - # Wait until the proxy is ready - os.read(bwrap_end, 1) - assert os.path.exists(filename) - # Make sure bwrap can access the other end of the pipe - os.set_inheritable(bwrap_end, True) - # Put this at the usual location for the bus insode the sandbox. - # TODO: What if DBUS_SESSION_BUS_ADDRESS says something else? - return ["--bind", filename, XDG_RUNTIME_DIR + "/bus", "--sync-fd", str(bwrap_end)] +class GroupDirective: + """Directive that groups a bunch of directives to be treated as one.""" + def __init__(self, directives): + self.directives = directives + def setup(self, bwrap): + for directive in self.directives: + directive.setup(bwrap) + +class DbusProxyDirective: + """Directive that sets up a d-bus proxy and adds flags to it. + If the directive is used multiple times, the flags accumulate.""" + def __init__(self, dbus_proxy_flags): + self.dbus_proxy_flags = dbus_proxy_flags + def setup(self, bwrap): + if bwrap.dbus_proxy_flags is None: + # We are the first d-bus proxy directive. Set up the flags and the finalizer. + bwrap.dbus_proxy_flags = [] + bwrap.finalizers.append(DbusProxyDirective.launch_dbus_proxy) + # Always add the flags. + bwrap.dbus_proxy_flags.extend(self.dbus_proxy_flags) + def launch_dbus_proxy(bwrap): + """Finalizer that launches a d-bus proxy with the flags accumulated in `bwrap`.""" + # For the system bus, we assume it to be at a fixed location and provide it to the sandbox at that same location. + # For the session bus, we tell the proxy to talk to DBUS_SESSION_BUS_ADDRESS on the host, but we always put it + # at `$XDG_RUNTIME_DIR/bus` in the sandbox. + session_bus = XDG_RUNTIME_DIR + "/bus" # how the sandbox will see the bus + system_bus = "/run/dbus/system_bus_socket" + session_bus_proxy = BUBBLEBOX_DIR + "/bus-" + randname() + system_bus_proxy = BUBBLEBOX_DIR + "/bus-system-" + randname() + # Prepare a pipe to coordinate shutdown of bwrap and the proxy + bwrap_end, other_end = os.pipe() # both FDs are "non-inheritable" now + # Invoke the debus-proxy + args = ["/usr/bin/xdg-dbus-proxy", "--fd="+str(other_end)] + args += ["unix:path="+system_bus, system_bus_proxy, "--filter"] # just block everything for the system bus + args += [os.environ["DBUS_SESSION_BUS_ADDRESS"], session_bus_proxy, "--filter"] + bwrap.dbus_proxy_flags + #pprint(args) + subprocess.Popen( + args, + pass_fds = [other_end], # default is to pass only the std FDs! + ) + # Wait until the proxy is ready + os.read(bwrap_end, 1) + assert os.path.exists(session_bus_proxy) + # Make sure bwrap can access the other end of the pipe + os.set_inheritable(bwrap_end, True) + # Put this at the usual location for the bus insode the sandbox. + # TODO: What if DBUS_SESSION_BUS_ADDRESS says something else? + bwrap.flags.extend(( + "--setenv", "DBUS_SESSION_BUS_ADDRESS", "unix:path="+session_bus, + "--bind", session_bus_proxy, session_bus, + "--bind", system_bus_proxy, system_bus, + "--sync-fd", str(bwrap_end), + )) # Constructors that should be used instead of directly mentioning the class above. def bwrap_flags(*flags): - return BoxFlags(bwrap_flags=flags) + return BwrapDirective(flags) def dbus_proxy_flags(*flags): - return BoxFlags(dbus_proxy_flags=flags) -def collect_flags(*flags): - bwrap_flags = flat_map(lambda x: x.bwrap_flags, flags) - dbus_proxy_flags = flat_map(lambda x: x.dbus_proxy_flags, flags) - return BoxFlags(bwrap_flags, dbus_proxy_flags) + return DbusProxyDirective(flags) +def group(*directives): + return GroupDirective(directives) # Run the application in the bubblebox with the given flags. -def bubblebox(*flags): +def bubblebox(*directives): if len(sys.argv) <= 1: print(f"USAGE: {sys.argv[0]} ") sys.exit(1) # Make sure `--die-with-parent` is always set. - flags = collect_flags(bwrap_flags("--die-with-parent"), *flags) - bwrap = "/usr/bin/bwrap" - extraflags = [] - if flags.dbus_proxy_flags: - extraflags += launch_dbus_proxy(flags.dbus_proxy_flags) - args = [bwrap] + flags.bwrap_flags + extraflags + ["--"] + sys.argv[1:] + directives = group(bwrap_flags("--die-with-parent"), *directives) + # Compute the bwrap invocation by running all the directives. + bwrap = BwrapInvocation() + directives.setup(bwrap) + for finalizer in bwrap.finalizers: + finalizer(bwrap) + # Run bwrap + args = ["/usr/bin/bwrap"] + bwrap.flags + ["--"] + sys.argv[1:] #pprint(args) os.execvp(args[0], args) @@ -108,7 +144,10 @@ def host_access(dirs): path = path.replace("//", "/") path = path.removesuffix("/.") # glob expansion - yield from glob.glob(path) + globbed = glob.glob(path) + if len(globbed) == 0: + raise Exception(f"Path does not exist: {path}") + yield from globbed def recursive_host_access(root, dirs, out): for names, desc in dirs.items(): for path in expand(root, names): @@ -117,7 +156,7 @@ def host_access(dirs): recursive_host_access(path, desc, out) else: # Allow access to this path - out.extend([Access.flag(desc), path, path]) + out.extend((Access.flag(desc), path, path)) # Start the recursive traversal out = [] recursive_host_access("", dirs, out)