1 import RPi.GPIO as GPIO
2 from collections import namedtuple
3 from libtuer import ThreadRepeater, logger
4 from statemachine import StateMachine
10 def __init__(self, pin, histlen):
11 GPIO.setup(pin, GPIO.IN)
12 assert histlen > 1 # otherwise our logic goes nuts...
14 self._histlen = histlen
15 # state change detection
17 self._newstate = None # != None iff we are currently seeing a state change
18 self._newstatelen = 0 # only valid if newstate != None
21 curstate = GPIO.input(self.pin)
22 assert curstate in (0, 1)
23 if curstate != self._state:
24 # the state is about to change
25 if curstate == self._newstate:
26 # we already saw this new state
27 self._newstatelen += 1
28 if self._newstatelen >= self._histlen:
29 # we saw it often enough to declare it the new state
34 # now check for how long we see this new state
35 self._newstate = curstate
38 # old state is preserved
43 def __init__(self, state_machine):
45 'bell_ringing': PinWatcher(18, 2),
46 'door_closed': PinWatcher(8, 5),
47 'door_locked': PinWatcher(9, 5),
48 'space_active': PinWatcher(10, 5),
50 self._sm = state_machine
52 # start a thread doing the work
53 self._t = ThreadRepeater(self._read, 0.02)
57 for name in self.pins.keys():
61 logger.debug("Pin %s changed to %d" % (name, pin.state)
62 if not saw_change: return
63 # create return object
64 pinsState = PinsState()
65 for name in self.pins.keys():
66 setattr(pinsState, name, self.pins[name].state)
67 # send it to state machine
68 self._sm.callback(StateMachine.CMD_PINS, pinsState)