02bcb5afd81834464c3519d0f5ca7ee7a187f680
[bubblebox.git] / bubblebox.py
1 import sys, os, glob, random, string, subprocess
2 from pprint import pprint
3
4 HOME = os.environ["HOME"]
5 XDG_RUNTIME_DIR = os.environ["XDG_RUNTIME_DIR"]
6 BUBBLEBOX_DIR = XDG_RUNTIME_DIR + "/bubblebox"
7 os.makedirs(BUBBLEBOX_DIR, exist_ok=True)
8
9 def randname():
10     # choose from all lowercase letter
11     letters = string.ascii_lowercase
12     return ''.join(random.choice(letters) for i in range(8))
13
14 class BwrapInvocation:
15     """Gathered information for a bwrap invocation.
16     This will be created empty, and then each directive's `setup` function is called
17     with this object, so they can accumulate the bwrap flags and any other relevant state."""
18     def __init__(self):
19         # The flags to pass to bwrap.
20         self.flags = []
21         # Functions to call at the end of the setup process.
22         # They will receive this object as argument, so they can add further flags.
23         self.finalizers = []
24         # If this is `None` it means so far no d-bus proxy has been set up.
25         self.dbus_proxy_flags = None
26
27 class BwrapDirective:
28     """Directive that just passes flags to bwrap."""
29     def __init__(self, bwrap_flags):
30         self.bwrap_flags = bwrap_flags
31     def setup(self, bwrap):
32         bwrap.flags.extend(self.bwrap_flags)
33
34 class GroupDirective:
35     """Directive that groups a bunch of directives to be treated as one."""
36     def __init__(self, directives):
37         self.directives = directives
38     def setup(self, bwrap):
39         for directive in self.directives:
40             directive.setup(bwrap)
41
42 class DbusProxyDirective:
43     """Directive that sets up a d-bus proxy and adds flags to it.
44     If the directive is used multiple times, the flags accumulate."""
45     def __init__(self, dbus_proxy_flags):
46         self.dbus_proxy_flags = dbus_proxy_flags
47     def setup(self, bwrap):
48         if bwrap.dbus_proxy_flags is None:
49             # We are the first d-bus proxy directive. Set up the flags and the finalizer.
50             bwrap.dbus_proxy_flags = []
51             bwrap.finalizers.append(DbusProxyDirective.launch_dbus_proxy)
52         # Always add the flags.
53         bwrap.dbus_proxy_flags.extend(self.dbus_proxy_flags)
54     def launch_dbus_proxy(bwrap):
55         """Finalizer that launches a d-bus proxy with the flags accumulated in `bwrap`."""
56         # Prepare a pipe to coordinate shutdown of bwrap and the proxy
57         bwrap_end, other_end = os.pipe() # both FDs are "non-inheritable" now
58         # Invoke the debus-proxy
59         filename = BUBBLEBOX_DIR + "/bus-" + randname()
60         args = ["/usr/bin/xdg-dbus-proxy", "--fd="+str(other_end)]
61         args += [os.environ["DBUS_SESSION_BUS_ADDRESS"], filename, "--filter"] + bwrap.dbus_proxy_flags
62         #pprint(args)
63         subprocess.Popen(
64             args,
65             pass_fds = [other_end], # default is to pass only the std FDs!
66         )
67         # Wait until the proxy is ready
68         os.read(bwrap_end, 1)
69         assert os.path.exists(filename)
70         # Make sure bwrap can access the other end of the pipe
71         os.set_inheritable(bwrap_end, True)
72         # Put this at the usual location for the bus insode the sandbox.
73         # TODO: What if DBUS_SESSION_BUS_ADDRESS says something else?
74         bwrap.flags.extend(["--bind", filename, XDG_RUNTIME_DIR + "/bus", "--sync-fd", str(bwrap_end)])
75
76 # Constructors that should be used instead of directly mentioning the class above.
77 def bwrap_flags(*flags):
78     return BwrapDirective(flags)
79 def dbus_proxy_flags(*flags):
80     return DbusProxyDirective(flags)
81 def group(*directives):
82     return GroupDirective(directives)
83
84 # Run the application in the bubblebox with the given flags.
85 def bubblebox(*directives):
86     if len(sys.argv) <= 1:
87         print(f"USAGE: {sys.argv[0]} <program name> <program arguments>")
88         sys.exit(1)
89     # Make sure `--die-with-parent` is always set.
90     directives = group(bwrap_flags("--die-with-parent"), *directives)
91     # Compute the bwrap invocation by running all the directives.
92     bwrap = BwrapInvocation()
93     directives.setup(bwrap)
94     for finalizer in bwrap.finalizers:
95         finalizer(bwrap)
96     # Run bwrap
97     args = ["/usr/bin/bwrap"] + bwrap.flags + ["--"] + sys.argv[1:]
98     #pprint(args)
99     os.execvp(args[0], args)
100
101 # Give all instances of the same box a shared XDG_RUNTIME_DIR
102 def shared_runtime_dir(boxname):
103     dirname = BUBBLEBOX_DIR + "/" + boxname
104     os.makedirs(dirname, exist_ok=True)
105     return bwrap_flags("--bind", dirname, XDG_RUNTIME_DIR)
106
107 # Convenient way to declare host access
108 class Access:
109     Read = 0
110     Write = 1
111     Device = 2
112
113     def flag(val):
114         if val == Access.Read:
115             return "--ro-bind"
116         elif val == Access.Write:
117             return "--bind"
118         elif val == Access.Device:
119             return "--dev-bind"
120         else:
121             raise Exception(f"invalid access value: {val}")
122 def host_access(dirs):
123     def expand(root, names):
124         """`names` is one or more strings that can contain globs. Expand them all relative to `root`."""
125         if isinstance(names, str):
126             names = (names,)
127         assert isinstance(names, tuple)
128         for name in names:
129             assert not (name.startswith("../") or name.__contains__("/../") or name.endswith("../"))
130             path = root + "/" + name
131             # prettification
132             path = path.replace("//", "/")
133             path = path.removesuffix("/.")
134             # glob expansion
135             yield from glob.glob(path)
136     def recursive_host_access(root, dirs, out):
137         for names, desc in dirs.items():
138             for path in expand(root, names):
139                 if isinstance(desc, dict):
140                     # Recurse into children
141                     recursive_host_access(path, desc, out)
142                 else:
143                     # Allow access to this path
144                     out.extend([Access.flag(desc), path, path])
145     # Start the recursive traversal
146     out = []
147     recursive_host_access("", dirs, out)
148     #pprint(out)
149     return bwrap_flags(*out)
150 def home_access(dirs):
151     return host_access({ HOME: dirs })
152
153 # Profile the profiles when importing bubblebox.
154 import profiles