2 import time, socket, atexit
3 import queue, threading, select
4 from libtuer import log
5 import RPi.GPIO as GPIO
6 GPIO.setmode(GPIO.BOARD)
7 atexit.register(GPIO.cleanup)
9 tuerSock = "/run/tuer.sock"
14 def __init__(self, pin, histlen):
15 GPIO.setup(pin, GPIO.IN)
16 assert histlen > 1 # otherwise our logic goes nuts...
18 self._histlen = histlen
19 # state change detection
21 self._newstate = None # != None iff we are currently seeing a state change
22 self._newstatelen = 0 # only valid if newstate != None
23 # start state change handler thread
24 self._q = queue.Queue()
25 self._t = threading.Thread(target=self.queue_consumer)
28 def queue_consumer(self):
31 if el is None: return # we are supposed to terminate
32 # handle the state change
33 (oldstate, newstate) = el
34 self.callback(oldstate, newstate)
37 curstate = GPIO.input(self._pin)
38 assert curstate in (0, 1)
39 if curstate != self._state:
40 # the state is about to change
41 if curstate == self._newstate:
42 # we already saw this new state
43 self._newstatelen += 1
44 if self._newstatelen >= self._histlen:
45 self._q.put((self._state, curstate)) # send stuff to the other thread
46 self._state = curstate
49 # now check for how long we see this new state
50 self._newstate = curstate
53 # old state is preserved
60 class RingWatcher(PinWatcher):
62 super().__init__(ringPin, 2)
63 self.last1Event = None
65 def callback(self, oldstate, newstate):
67 return # ignore the very first state change
68 # now (oldstate, newstate) is either (0, 1) or (1, 0)
70 self.last1Event = time.time()
71 elif self.last1Event is not None:
72 # how long was this pressed?
73 timePressed = time.time() - self.last1Event
74 log("Ring button pressed for",timePressed)
75 if timePressed >= 1.5 and timePressed <= 3:
81 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
97 except KeyboardInterrupt: