X-Git-Url: https://git.ralfj.de/saartuer.git/blobdiff_plain/df7d5268c3bee2e973ca285d97d1a3a77de24743..8b136f42bf0c83f8aee52c315f7dbebc371cce1b:/pins.py?ds=sidebyside diff --git a/pins.py b/pins.py new file mode 100644 index 0000000..8b699be --- /dev/null +++ b/pins.py @@ -0,0 +1,71 @@ +import RPi.GPIO as GPIO +from collections import namedtuple +from libtuer import ThreadRepeater, logger +from statemachine import StateMachine + +class PinsState(): + pass + +class PinWatcher(): + def __init__(self, pin, histlen): + GPIO.setup(pin, GPIO.IN) + assert histlen > 1 # otherwise our logic goes nuts... + self.pin = pin + self._histlen = histlen + # state change detection + self.state = None + self._newstate = None # != None iff we are currently seeing a state change + self._newstatelen = 0 # only valid if newstate != None + + def read(self): + curstate = GPIO.input(self.pin) + assert curstate in (0, 1) + if curstate != self._state: + # the state is about to change + if curstate == self._newstate: + # we already saw this new state + self._newstatelen += 1 + if self._newstatelen >= self._histlen: + # we saw it often enough to declare it the new state + self.state = curstate + self._newstate = None + return True + else: + # now check for how long we see this new state + self._newstate = curstate + self._newstatelen = 1 + else: + # old state is preserved + self._newstate = None + return False + +class PinsWatcher(): + def __init__(self, state_machine): + self.pins = { + 'bell_ringing': PinWatcher(18, 2), + 'door_closed': PinWatcher(8, 5), + 'door_locked': PinWatcher(9, 5), + 'space_active': PinWatcher(10, 5), + } + self._sm = state_machine + + # start a thread doing the work + self._t = ThreadRepeater(self._read, 0.02) + + def _read(): + saw_change = False + for name in self.pins.keys(): + pin = pins[name] + if pin.read(): + saw_change = True + logger.debug("Pin %s changed to %d" % (name, pin.state) + if not saw_change: return + # create return object + pinsState = PinsState() + for name in self.pins.keys(): + setattr(pinsState, name, self.pins[name].state) + # send it to state machine + self._sm.callback(StateMachine.CMD_PINS, pinsState) + + def stop(): + self._t.stop()