X-Git-Url: https://git.ralfj.de/saartuer.git/blobdiff_plain/df7d5268c3bee2e973ca285d97d1a3a77de24743..8b136f42bf0c83f8aee52c315f7dbebc371cce1b:/ringd diff --git a/ringd b/ringd deleted file mode 100755 index b6a27f9..0000000 --- a/ringd +++ /dev/null @@ -1,87 +0,0 @@ -#!/usr/bin/python3 -import time, socket, atexit -import queue, threading, select -from libtuer import log, ThreadFunction -import RPi.GPIO as GPIO -GPIO.setmode(GPIO.BOARD) -atexit.register(GPIO.cleanup) - -tuerSock = "/run/tuer.sock" -ringPin = 18 - - -# Main classes -class PinWatcher(): - def __init__(self, pin, histlen): - GPIO.setup(pin, GPIO.IN) - assert histlen > 1 # otherwise our logic goes nuts... - self._pin = pin - self._histlen = histlen - # state change detection - self._state = None - self._newstate = None # != None iff we are currently seeing a state change - self._newstatelen = 0 # only valid if newstate != None - # start state change handler thread - self._callback = ThreadFunction(self.callback) - self.stop = self._callback.stop - - def read(self): - curstate = GPIO.input(self._pin) - assert curstate in (0, 1) - if curstate != self._state: - # the state is about to change - if curstate == self._newstate: - # we already saw this new state - self._newstatelen += 1 - if self._newstatelen >= self._histlen: - self._callback(self._state, curstate) # send stuff to the other thread - self._state = curstate - self._newstate = None - else: - # now check for how long we see this new state - self._newstate = curstate - self._newstatelen = 1 - else: - # old state is preserved - self._newstate = None - -class RingWatcher(PinWatcher): - def __init__(self): - super().__init__(ringPin, 2) - self.last1Event = None - - def callback(self, oldstate, newstate): - if oldstate is None: - return # ignore the very first state change - # now (oldstate, newstate) is either (0, 1) or (1, 0) - if newstate: - self.last1Event = time.time() - elif self.last1Event is not None: - # how long was this pressed? - timePressed = time.time() - self.last1Event - log("Ring button pressed for",timePressed) - if timePressed >= 1.5 and timePressed <= 3: - self.buzz() - - def buzz(self): - log("Opening door") - # talk with tuerd - s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - s.connect(tuerSock) - s.send(b'buzz') - s.close() - -# MAIN PROGRAM -pins = [ - RingWatcher(), -] - -try: - log("entering loop") - while True: - for pin in pins: - pin.read() - time.sleep(0.02) -except KeyboardInterrupt: - for pin in pins: - pin.stop()