1 import RPi.GPIO as GPIO
2 from collections import namedtuple
3 from libtuer import ThreadRepeater, logger
4 from statemachine import StateMachine
10 def __init__(self, pin, histlen):
11 GPIO.setup(pin, GPIO.IN)
12 assert histlen > 1 # otherwise our logic goes nuts...
14 self._histlen = histlen
15 # state change detection
17 self._newstate = None # != None iff we are currently seeing a state change
18 self._newstatelen = 0 # only valid if newstate != None
21 curstate = GPIO.input(self.pin)
22 assert curstate in (0, 1)
23 if curstate != self.state:
24 # the state is about to change
25 if curstate == self._newstate:
26 # we already saw this new state
27 self._newstatelen += 1
28 if self._newstatelen >= self._histlen:
29 # we saw it often enough to declare it the new state
34 # now check for how long we see this new state
35 self._newstate = curstate
38 # old state is preserved
43 def __init__(self, state_machine):
45 'bell_ringing': PinWatcher(18, 2),
46 'door_closed': PinWatcher(8, 4),
47 'door_locked': PinWatcher(10, 4),
48 'space_active': PinWatcher(24, 4),
50 self._sm = state_machine
52 # start a thread doing the work
53 self._t = ThreadRepeater(self._read, 0.02, name="PinsWatcher")
57 for name in self._pins.keys():
58 pin = self._pins[name]
61 logger.debug("Pin %s changed to %d" % (name, pin.state))
64 # create return object
65 pinsState = PinsState()
66 for name in self._pins.keys():
67 setattr(pinsState, name, self._pins[name].state)
68 # send it to state machine
69 self._sm.callback(StateMachine.CMD_PINS, pinsState)