+# run a Python command asynchronously
+def fire_and_forget(f):
+ def _fire_and_forget():
+ try:
+ f()
+ except Exception:
+ logger.critical("fire_and_forget: Got exception out of callback:\n%s" % traceback.format_exc())
+ t = threading.Thread(target=_fire_and_forget)
+ t.start()
+
+# run a command asynchronously and log the return value if not 0
+# prefix must be a string identifying the code position where the call came from
+def fire_and_forget_cmd (cmd, log_prefix):
+ logger.debug("Firing and forgetting command from %s: %s" % (log_prefix,str(cmd)))
+ def _fire_and_forget_cmd ():
+ with open("/dev/null", "w") as fnull:
+ retcode = subprocess.call(cmd, stdout=fnull, stderr=fnull)
+ if retcode is not 0:
+ logger.error("%sReturn code %d at command: %s" % (log_prefix,retcode,str(cmd)))
+ fire_and_forget(_fire_and_forget_cmd)
+
+# Threaded callback class
+class ThreadFunction():
+ _CALL = 0
+ _TERM = 1
+
+ def __init__(self, f, name):
+ self.name = name
+ self._f = f
+ self._q = queue.Queue()
+ self._t = threading.Thread(target=self._thread_func)
+ self._t.start()
+
+ def _thread_func(self):
+ while True:
+ (cmd, data) = self._q.get()
+ # run command
+ if cmd == ThreadFunction._CALL:
+ try:
+ self._f(*data)
+ except Exception as e:
+ logger.critical("ThreadFunction: Got exception out of handler thread %s:\n%s" % (self.name, traceback.format_exc()))
+ elif cmd == ThreadFunction._TERM:
+ assert data is None
+ break
+ else:
+ logger.error("ThreadFunction: Command %d does not exist" % cmd)
+
+ def __call__(self, *arg):
+ self._q.put((ThreadFunction._CALL, arg))
+
+ def stop(self):
+ # empty the queue
+ try:
+ while True:
+ self._q.get_nowait()
+ except queue.Empty:
+ pass
+ # now wait till the job-in-progress is done
+ self._q.put((ThreadFunction._TERM, None))
+ self._t.join()
+
+# Thread timer-repeater class: Call a function every <sleep_time> seconds
+class ThreadRepeater():
+ def __init__(self, f, sleep_time, name):
+ self.name = name
+ self._f = f
+ self._stop = False
+ self._sleep_time = sleep_time
+ self._t = threading.Thread(target=self._thread_func)
+ self._t.start()
+
+ def _thread_func(self):
+ while True:
+ if self._stop:
+ break
+ try:
+ self._f()
+ except Exception as e:
+ logger.critical("ThreadRepeater: Got exception out of handler thread %s:\n%s" % (self.name, traceback.format_exc()))
+ time.sleep(self._sleep_time)
+
+ def stop(self):
+ self._stop = True
+ self._t.join()