X-Git-Url: https://git.ralfj.de/saartuer.git/blobdiff_plain/dcf982c61b81917b9b01261bf832768c00cb4843..a7c0b8e1ba3fc6170f14bb0c964e5ac9e9ba4881:/pins.py?ds=sidebyside diff --git a/pins.py b/pins.py deleted file mode 100644 index 30f9666..0000000 --- a/pins.py +++ /dev/null @@ -1,71 +0,0 @@ -import RPi.GPIO as GPIO -from collections import namedtuple -from libtuer import ThreadRepeater, logger -from statemachine import StateMachine - -class PinsState(): - pass - -class PinWatcher(): - def __init__(self, pin, histlen): - GPIO.setup(pin, GPIO.IN) - assert histlen > 1 # otherwise our logic goes nuts... - self.pin = pin - self._histlen = histlen - # state change detection - self.state = None - self._newstate = None # != None iff we are currently seeing a state change - self._newstatelen = 0 # only valid if newstate != None - - def read(self): - curstate = GPIO.input(self.pin) - assert curstate in (0, 1) - if curstate != self._state: - # the state is about to change - if curstate == self._newstate: - # we already saw this new state - self._newstatelen += 1 - if self._newstatelen >= self._histlen: - # we saw it often enough to declare it the new state - self.state = curstate - self._newstate = None - return True - else: - # now check for how long we see this new state - self._newstate = curstate - self._newstatelen = 1 - else: - # old state is preserved - self._newstate = None - return False - -class PinsWatcher(): - def __init__(self, state_machine): - self.pins = { - 'bell_ringing': PinWatcher(18, 2), - 'door_closed': PinWatcher(8, 5), - 'door_locked': PinWatcher(10, 5), - 'space_active': PinWatcher(24, 5), - } - self._sm = state_machine - - # start a thread doing the work - self._t = ThreadRepeater(self._read, 0.02) - - def _read(): - saw_change = False - for name in self.pins.keys(): - pin = pins[name] - if pin.read(): - saw_change = True - logger.debug("Pin %s changed to %d" % (name, pin.state) - if not saw_change: return - # create return object - pinsState = PinsState() - for name in self.pins.keys(): - setattr(pinsState, name, self.pins[name].state) - # send it to state machine - self._sm.callback(StateMachine.CMD_PINS, pinsState) - - def stop(): - self._t.stop()