+++ /dev/null
-from libtuer import ThreadRepeater
-from threading import Lock
-
-SLEEP_TIME = 0.5
-
-class ToBeWoken:
- '''a simple struct storing information about a to-be-woken function'''
- def __init__(self, f, period, one_shot):
- self.f = f
- self.period = period
- self.time_since_call = 0
- self.one_shot = one_shot
-
-class Waker():
- def __init__(self):
- self._tobewokens = []
- self._tobewokens_lock = Lock()
- self._t = ThreadRepeater(self._wake, SLEEP_TIME, name="Waker")
-
- def register(self, f, time, one_shot = False):
- '''Register a function which is called approximately every <time> seconds (or just once, if one_shot is True). f should return quickly, or it will delay the waker!'''
- time = max(time//SLEEP_TIME, 1)
- with self._tobewokens_lock:
- self._tobewokens.append(ToBeWoken(f, time, one_shot))
-
- def _wake(self):
- with self._tobewokens_lock:
- delete = []
- # run the functions we ought to run
- for tobewoken, idx in zip(self._tobewokens, range(len(self._tobewokens))):
- tobewoken.time_since_call += 1
- if tobewoken.time_since_call >= tobewoken.period:
- tobewoken.f()
- tobewoken.time_since_call = 0
- if tobewoken.one_shot:
- delete.append(idx)
- # delete what we have to delete - in reverse order so the indices stay valid!
- delete.reverse()
- for idx in delete:
- del self._tobewokens[idx]
-
- def stop(self):
- self._t.stop()