+++ /dev/null
-import RPi.GPIO as GPIO
-from collections import namedtuple
-from libtuer import ThreadRepeater, logger
-from statemachine import StateMachine
-
-class PinsState():
- pass
-
-class PinWatcher():
- def __init__(self, pin, histlen):
- GPIO.setup(pin, GPIO.IN)
- assert histlen > 1 # otherwise our logic goes nuts...
- self.pin = pin
- self._histlen = histlen
- # state change detection
- self.state = None
- self._newstate = None # != None iff we are currently seeing a state change
- self._newstatelen = 0 # only valid if newstate != None
-
- def read(self):
- curstate = GPIO.input(self.pin)
- assert curstate in (0, 1)
- if curstate != self.state:
- # the state is about to change
- if curstate == self._newstate:
- # we already saw this new state
- self._newstatelen += 1
- if self._newstatelen >= self._histlen:
- # we saw it often enough to declare it the new state
- self.state = curstate
- self._newstate = None
- return True
- else:
- # now check for how long we see this new state
- self._newstate = curstate
- self._newstatelen = 1
- else:
- # old state is preserved
- self._newstate = None
- return False
-
-class PinsWatcher():
- def __init__(self, state_machine):
- self._pins = {
- 'bell_ringing': PinWatcher(18, 2),
- 'door_closed': PinWatcher(8, 4),
- 'door_locked': PinWatcher(10, 4),
- 'space_active': PinWatcher(24, 4),
- }
- self._sm = state_machine
-
- # start a thread doing the work
- self._t = ThreadRepeater(self._read, 0.02, name="PinsWatcher")
-
- def _read(self):
- saw_change = False
- for name in self._pins.keys():
- pin = self._pins[name]
- if pin.read():
- saw_change = True
- logger.debug("Pin %s changed to %d" % (name, pin.state))
- if not saw_change:
- return None
- # create return object
- pinsState = PinsState()
- for name in self._pins.keys():
- setattr(pinsState, name, self._pins[name].state)
- # send it to state machine
- self._sm.callback(StateMachine.CMD_PINS, pinsState)
-
- def stop(self):
- self._t.stop()