Merge pull request #2 from darkmattercoder/darkmattercoder-patch-readmetypo
[bubblebox.git] / bubblebox.py
index 628107e589930ed1018b74aeba5afbd20143fff2..a6b9d96d9f19f9d4085ba937de63562607da722f 100644 (file)
@@ -6,67 +6,107 @@ XDG_RUNTIME_DIR = os.environ["XDG_RUNTIME_DIR"]
 BUBBLEBOX_DIR = XDG_RUNTIME_DIR + "/bubblebox"
 os.makedirs(BUBBLEBOX_DIR, exist_ok=True)
 
-def flat_map(f, xs):
-    """Concatenate the result of applying `f` to each element of `xs` to a list.
-    `None` is treated like the empty list."""
-    ys = []
-    for x in xs:
-        x_mapped = f(x)
-        if x_mapped is not None:
-            ys.extend(x_mapped)
-    return ys
-
 def randname():
     # choose from all lowercase letter
     letters = string.ascii_lowercase
     return ''.join(random.choice(letters) for i in range(8))
 
-class BoxFlags:
-    """Flags that configure the bubblebox"""
-    def __init__(self, bwrap_flags = None, dbus_proxy_flags = None):
+class BwrapInvocation:
+    """Gathered information for a bwrap invocation.
+    This will be created empty, and then each directive's `setup` function is called
+    with this object, so they can accumulate the bwrap flags and any other relevant state."""
+    def __init__(self):
+        # The flags to pass to bwrap.
+        self.flags = []
+        # Functions to call at the end of the setup process.
+        # They will receive this object as argument, so they can add further flags.
+        self.finalizers = []
+        # If this is `None` it means so far no d-bus proxy has been set up.
+        self.dbus_proxy_flags = None
+
+class BwrapDirective:
+    """Directive that just passes flags to bwrap."""
+    def __init__(self, bwrap_flags):
         self.bwrap_flags = bwrap_flags
-        self.dbus_proxy_flags = dbus_proxy_flags
+    def setup(self, bwrap):
+        bwrap.flags.extend(self.bwrap_flags)
 
-def launch_dbus_proxy(flags):
-    """Launches the dbus proxy and returns the bwrap flags to be used to talk to it."""
-    # Prepare a pipe to coordinate shutdown of bwrap and the proxy
-    bwrap_end, other_end = os.pipe() # both FDs are "non-inheritable" now
-    # Invoke the debus-proxy
-    filename = BUBBLEBOX_DIR + "/bus-" + randname()
-    args = ["/usr/bin/xdg-dbus-proxy", "--fd="+str(other_end)]
-    args += [os.environ["DBUS_SESSION_BUS_ADDRESS"], filename, "--filter"] + flags
-    #pprint(args)
-    subprocess.Popen(
-        args,
-        pass_fds = [other_end], # default is to pass only the std FDs!
-    )
-    # Wait until the proxy is ready
-    os.read(bwrap_end, 1)
-    assert os.path.exists(filename)
-    # Make sure bwrap can access the other end of the pipe
-    os.set_inheritable(bwrap_end, True)
-    # Put this at the usual location for the bus insode the sandbox.
-    # TODO: What if DBUS_SESSION_BUS_ADDRESS says something else?
-    return ["--bind", filename, XDG_RUNTIME_DIR + "/bus", "--sync-fd", str(bwrap_end)]
+class GroupDirective:
+    """Directive that groups a bunch of directives to be treated as one."""
+    def __init__(self, directives):
+        self.directives = directives
+    def setup(self, bwrap):
+        for directive in self.directives:
+            directive.setup(bwrap)
+
+class DbusProxyDirective:
+    """Directive that sets up a d-bus proxy and adds flags to it.
+    If the directive is used multiple times, the flags accumulate."""
+    def __init__(self, dbus_proxy_flags):
+        self.dbus_proxy_flags = dbus_proxy_flags
+    def setup(self, bwrap):
+        if bwrap.dbus_proxy_flags is None:
+            # We are the first d-bus proxy directive. Set up the flags and the finalizer.
+            bwrap.dbus_proxy_flags = []
+            bwrap.finalizers.append(DbusProxyDirective.launch_dbus_proxy)
+        # Always add the flags.
+        bwrap.dbus_proxy_flags.extend(self.dbus_proxy_flags)
+    def launch_dbus_proxy(bwrap):
+        """Finalizer that launches a d-bus proxy with the flags accumulated in `bwrap`."""
+        # For the system bus, we assume it to be at a fixed location and provide it to the sandbox at that same location.
+        # For the session bus, we tell the proxy to talk to DBUS_SESSION_BUS_ADDRESS on the host, but we always put it
+        # at `$XDG_RUNTIME_DIR/bus` in the sandbox.
+        session_bus = XDG_RUNTIME_DIR + "/bus" # how the sandbox will see the bus
+        system_bus = "/run/dbus/system_bus_socket"
+        session_bus_proxy = BUBBLEBOX_DIR + "/bus-" + randname()
+        system_bus_proxy = BUBBLEBOX_DIR + "/bus-system-" + randname()
+        # Prepare a pipe to coordinate shutdown of bwrap and the proxy
+        bwrap_end, other_end = os.pipe() # both FDs are "non-inheritable" now
+        # Invoke the debus-proxy
+        args = ["/usr/bin/xdg-dbus-proxy", "--fd="+str(other_end)]
+        args += ["unix:path="+system_bus, system_bus_proxy, "--filter"] # just block everything for the system bus
+        args += [os.environ["DBUS_SESSION_BUS_ADDRESS"], session_bus_proxy, "--filter"] + bwrap.dbus_proxy_flags
+        #pprint(args)
+        subprocess.Popen(
+            args,
+            pass_fds = [other_end], # default is to pass only the std FDs!
+        )
+        # Wait until the proxy is ready
+        os.read(bwrap_end, 1)
+        assert os.path.exists(session_bus_proxy)
+        # Make sure bwrap can access the other end of the pipe
+        os.set_inheritable(bwrap_end, True)
+        # Put this at the usual location for the bus insode the sandbox.
+        # TODO: What if DBUS_SESSION_BUS_ADDRESS says something else?
+        bwrap.flags.extend((
+            "--setenv", "DBUS_SESSION_BUS_ADDRESS", "unix:path="+session_bus,
+            "--bind", session_bus_proxy, session_bus,
+            "--bind", system_bus_proxy, system_bus,
+            "--sync-fd", str(bwrap_end),
+        ))
 
 # Constructors that should be used instead of directly mentioning the class above.
 def bwrap_flags(*flags):
-    return BoxFlags(bwrap_flags=flags)
+    return BwrapDirective(flags)
 def dbus_proxy_flags(*flags):
-    return BoxFlags(dbus_proxy_flags=flags)
-def collect_flags(*flags):
-    bwrap_flags = flat_map(lambda x: x.bwrap_flags, flags)
-    dbus_proxy_flags = flat_map(lambda x: x.dbus_proxy_flags, flags)
-    return BoxFlags(bwrap_flags, dbus_proxy_flags)
+    return DbusProxyDirective(flags)
+def group(*directives):
+    return GroupDirective(directives)
 
 # Run the application in the bubblebox with the given flags.
-def bubblebox(*flags):
-    flags = collect_flags(*flags)
-    bwrap = "/usr/bin/bwrap"
-    extraflags = []
-    if flags.dbus_proxy_flags:
-        extraflags += launch_dbus_proxy(flags.dbus_proxy_flags)
-    args = [bwrap] + flags.bwrap_flags + extraflags + ["--"] + sys.argv[1:]
+def bubblebox(*directives):
+    if len(sys.argv) <= 1:
+        print(f"USAGE: {sys.argv[0]} <program name> <program arguments>")
+        sys.exit(1)
+    # Make sure `--die-with-parent` is always set.
+    directives = group(bwrap_flags("--die-with-parent"), *directives)
+    # Compute the bwrap invocation by running all the directives.
+    bwrap = BwrapInvocation()
+    directives.setup(bwrap)
+    for finalizer in bwrap.finalizers:
+        finalizer(bwrap)
+    # Run bwrap
+    args = ["/usr/bin/bwrap"] + bwrap.flags + ["--"] + sys.argv[1:]
     #pprint(args)
     os.execvp(args[0], args)
 
@@ -104,7 +144,10 @@ def host_access(dirs):
             path = path.replace("//", "/")
             path = path.removesuffix("/.")
             # glob expansion
-            yield from glob.glob(path)
+            globbed = glob.glob(path)
+            if len(globbed) == 0:
+                raise Exception(f"Path does not exist: {path}")
+            yield from globbed
     def recursive_host_access(root, dirs, out):
         for names, desc in dirs.items():
             for path in expand(root, names):
@@ -113,7 +156,7 @@ def host_access(dirs):
                     recursive_host_access(path, desc, out)
                 else:
                     # Allow access to this path
-                    out.extend([Access.flag(desc), path, path])
+                    out.extend((Access.flag(desc), path, path))
     # Start the recursive traversal
     out = []
     recursive_host_access("", dirs, out)