2 import time, socket, atexit
3 import queue, threading, select
4 from libtuer import log, ThreadFunction
5 import RPi.GPIO as GPIO
6 GPIO.setmode(GPIO.BOARD)
7 atexit.register(GPIO.cleanup)
9 tuerSock = "/run/tuer.sock"
15 def __init__(self, pin, histlen):
16 GPIO.setup(pin, GPIO.IN)
17 assert histlen > 1 # otherwise our logic goes nuts...
19 self._histlen = histlen
20 # state change detection
22 self._newstate = None # != None iff we are currently seeing a state change
23 self._newstatelen = 0 # only valid if newstate != None
24 # start state change handler thread
25 self._callback = ThreadFunction(self.callback)
26 self.stop = self._callback.stop
29 curstate = GPIO.input(self._pin)
30 assert curstate in (0, 1)
31 if curstate != self._state:
32 # the state is about to change
33 if curstate == self._newstate:
34 # we already saw this new state
35 self._newstatelen += 1
36 if self._newstatelen >= self._histlen:
37 self._callback(self._state, curstate) # send stuff to the other thread
38 self._state = curstate
41 # now check for how long we see this new state
42 self._newstate = curstate
45 # old state is preserved
48 class RingWatcher(PinWatcher):
50 super().__init__(ringPin, 2)
51 self.last1Event = None
53 def callback(self, oldstate, newstate):
55 return # ignore the very first state change
56 # now (oldstate, newstate) is either (0, 1) or (1, 0)
58 self.last1Event = time.time()
59 elif self.last1Event is not None:
60 # how long was this pressed?
61 timePressed = time.time() - self.last1Event
62 log("Ring button pressed for",timePressed)
63 if timePressed >= 1.5 and timePressed <= 3:
69 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
85 except KeyboardInterrupt: