CMD_UNLOCK = 1
CMD_LOCK = 2
+ class CMD():
+ def __init__(self, name, pin, tid, todo):
+ self.name = name
+ self.pin = pin
+ self.tid = tid
+ self.todo = todo
+ # don't do the GPIO setup here, the main init did not yet run
+
+ def execute(self):
+ logger.info("Actor: Running command %s" % self.name)
+ for (value, delay) in self.todo:
+ if value is not None:
+ logger.debug("Actor: Setting pin %d to %d" % (self.pin, value))
+ GPIO.output(self.pin, value)
+ time.sleep(delay)
+
CMDs = {
- CMD_UNLOCK: ("unlock", 12, [(True, 0.3), (False, 0.1)]),
- CMD_LOCK: ("lock", 16, [(True, 0.3), (False, 0.1)]),
- CMD_BUZZ: ("buzz", 22, [(True, 2.0), (False, 0.1)]),
+ CMD_UNLOCK: CMD("unlock", pin=12, tid=0, todo=[(True, 0.3), (False, 0.1)]),
+ CMD_LOCK: CMD("lock", pin=16, tid=0, todo=[(True, 0.3), (False, 0.1)]),
+ CMD_BUZZ: CMD("buzz", pin=22, tid=1, todo=[(True, 2.0), (False, 0.1)]),
}
def __init__(self):
- self.act = ThreadFunction(self._act, name="Actor")
- for (name, pin, todo) in self.CMDs.values():
- GPIO.setup(pin, GPIO.OUT)
+ # launch threads, all running the "_execute" method
+ self.threads = {}
+ for cmd in Actor.CMDs.values():
+ GPIO.setup(cmd.pin, GPIO.OUT)
+ if not cmd.tid in self.threads:
+ self.threads[cmd.tid] = ThreadFunction(self._execute, "Actor TID %d" % cmd.tid)
- def _act(self, cmd):
- if cmd in self.CMDs:
- (name, pin, todo) = self.CMDs[cmd]
- logger.info("Actor: Running command %s" % name)
- for (value, delay) in todo:
- if value is not None:
- logger.debug("Actor: Setting pin %d to %d" % (pin, value))
- GPIO.output(pin, value)
- time.sleep(delay)
- else:
- logger.critical("Actor: Got unknown command %d" % cmd)
+ def _execute(self, cmd):
+ Actor.CMDs[cmd].execute()
+
+ def act(self, cmd):
+ # dispatch command to correct thread
+ self.threads[Actor.CMDs[cmd].tid](cmd)
def stop(self):
- self.act.stop()
+ for thread in self.threads.values():
+ thread.stop()
import logging, logging.handlers, os, time, queue, threading, subprocess
import traceback, smtplib
-import email.mime.text, email.util
+import email.mime.text, email.utils
# Logging configuration
syslogLevel = logging.INFO
msg['To'] = ', '.join(receivers)
if replyTo is not None:
msg['Reply-To'] = replyTo
- # FIXME set time
# put into envelope and send
s = smtplib.SMTP('ralfj.de')
s.sendmail(sender, receivers, msg.as_string())
# Timeout we wait after the switch was switched to "Closed", until we assume nobody will open the door and we just lock it
# ALso the time we wait after the door was opend, till we assume something went wrong and start nerving
-LEAVE_TIMEOUT = 20
+LEAVE_TIMEOUT = 4
# play_sound constants
SOUNDS_DIRECTORY = "/opt/tuer/sounds/"
'''A state with invariant "The space is locked", switching to StateAboutToOpen when the space becomes unlocked'''
def handle_pins_event(self):
if not self.pins().door_locked:
- return StateAboutToOpen(self.state_machine)
- return super().handle_pins_event
+ logger.info("Door unlocked, space is about to open")
+ return StateMachine.StateAboutToOpen(self.state_machine)
+ return super().handle_pins_event()
class AbstractUnlockedState(State):
'''A state with invariant "The space is unlocked", switching to StateZu when the space becomes locked'''
def handle_pins_event(self):
if self.pins().door_locked:
+ logger.info("Door locked, closing space")
if self.pins().space_active:
# FIXME the same state can be reached by first locking the door, and then activating the space. What to do then?
- logger.info("StateMachine: door manually locked, but space switch is still on - going to StateZu")
+ logger.warning("StateMachine: door manually locked, but space switch is still on - going to StateZu")
play_sound("manual_lock")
- return StateZu(self.state_machine)
- return super().handle_pins_event
+ return StateMachine.StateZu(self.state_machine)
+ return super().handle_pins_event()
class StateStart(State):
def handle_pins_event(self):
pins = self.pins()
if not (pins.door_locked is None or pins.door_closed is None or pins.space_active is None or pins.bell_ringing is None):
- # All sensors got a value, switch to a proper state
+ logger.info("All sensors got a value, switching to a proper state")
if pins.door_locked:
return StateMachine.StateZu(self.state_machine)
else:
def handle_pins_event(self):
pins = self.pins()
if pins.space_active:
+ logger.info("Space activated, opening procedure completed")
return StateMachine.StateAuf(self.state_machine)
return super().handle_pins_event()
if not self.pins().door_closed:
return StateMachine.StateLeaving(self.state_machine)
if self.pins().space_active:
+ logger.info("Space re-activated, cancelling leaving procedure")
return StateMachine.StateAuf(self.state_machine)
return super().handle_pins_event()
super().__init__(sm, nervlist)
def handle_pins_event(self):
if self.pins().door_closed:
+ logger.info("The space was left, locking the door")
return StateMachine.StateLocking(self.state_machine)
if self.pins().space_active:
+ logger.info("Space re-activated, cancelling leaving procedure")
return StateMachine.StateAuf(self.state_machine)
return super().handle_pins_event()
newstate = self.current_state.handle_event(cmd,arg) # returns None or an instance of the new state
self.old_pins = self.pins
while newstate is not None:
+ assert isinstance(newstate, StateMachine.State), "I should get a state"
self.current_state.on_leave()
logger.debug("StateMachine: Doing state transition %s -> %s" % (self.current_state.__class__.__name__, newstate.__class__.__name__))
self.current_state = newstate
-import socket, os, stat, struct, pwd
+import socket, os, stat, struct, pwd, errno
from statemachine import StateMachine
from libtuer import logger
SO_PEERCRED = 17 # DO - NOT - TOUCH