- over = time.time() - self.time_entered
- nexttry = (self.tries+1) * OPEN_REPEAT_TIMEOUT
- if over > nexttry:
- if self.tries < OPEN_REPEAT_NUMBER:
- self.actor().act(Actor.CMD_OPEN)
- self.tries += 1
- else:
- logger.critical("Couldn't open door after %d tries. Going back to StateZu." % OPEN_REPEAT_NUMBER)
- self.notify(False)
- return StateZu(self.state_machine)
-
- class AbstractStateWhereOpeningIsRedundant(State):
- def __init__ (self,sm):
- State.__init__(sm):
- def handle_open_event(self, callback):
+ # blink red LED
+ if self._red_state:
+ self.actor().act(Actor.CMD_RED_OFF)
+ self._red_state = False
+ else:
+ self.actor().act(Actor.CMD_RED_ON)
+ self._red_state = True
+ def handle_cmd_unlock_event(self,arg):
+ if arg is not None:
+ arg("200 okay: Trying to unlock the door. The System is in fallback mode, success information is not available.")
+ self.actor().act(Actor.CMD_UNLOCK)
+ def handle_cmd_lock_event(self,arg):
+ if arg is not None:
+ arg("200 okay: Trying to lock the door. The System is in fallback mode, success information is not available.")
+ self.actor().act(Actor.CMD_LOCK)
+ def handle_cmd_fallback_on_event(self,arg):
+ arg("412 Precondition Failed: Fallback mode already active.")
+ def handle_cmd_fallback_off_event(self,arg):
+ arg("200 okay: Leaving fallback mode and notifying admins.")
+ logger.critical("Leaving fallback mode. Somebody thinks, the sensors are working again.")
+ return StateMachine.StateStart(self.state_machine)
+
+ class StateZu(AbstractLockedState):
+ def handle_cmd_unlock_event(self,callback):
+ return StateMachine.StateUnlocking(self.state_machine, callback)
+ def handle_pins_event(self):
+ if not self.old_pins().space_active and self.pins().space_active: # first thing to check: edge detection
+ logger.info("Space toggled to active while it was closed - unlocking the door")
+ return StateMachine.StateUnlocking(self.state_machine)
+ return super().handle_pins_event()
+
+ class StateUnlocking(AbstractLockedState):
+ def __init__(self,sm,callback=None):
+ # construct a nervlist
+ nervlist = [(OPEN_REPEAT_TIMEOUT, lambda: self.actor().act(Actor.CMD_UNLOCK)) for t in range(OPEN_REPEAT_NUMBER)]
+ nervlist += [(OPEN_REPEAT_TIMEOUT, self.could_not_open)]
+ super().__init__(sm,nervlist)
+ self.callbacks = []
+ self.actor().act(Actor.CMD_UNLOCK)
+ # enqueue the callback
+ self.handle_cmd_unlock_event(callback)
+ def notify(self, s, lastMsg):
+ for cb in self.callbacks:
+ cb(s, lastMsg)
+ def on_leave(self):
+ s = "200 okay: door unlocked" if not self.pins().door_locked else ("500 internal server error: Couldn't unlock door with %d tries à %f seconds" % (OPEN_REPEAT_NUMBER,OPEN_REPEAT_TIMEOUT))
+ self.notify(s, lastMsg=True)
+ def handle_cmd_unlock_event(self,callback):
+ if callback is not None:
+ callback("202 processing: Trying to unlock the door", lastMsg=False)
+ self.callbacks.append(callback)
+ def could_not_open(self):
+ logger.critical("StateMachine: Couldn't open door after %d tries. Going back to StateZu." % OPEN_REPEAT_NUMBER)
+ return StateMachine.StateZu(self.state_machine)
+
+ class AbstractStateWhereUnlockingIsRedundant(AbstractUnlockedState):
+ def handle_cmd_unlock_event(self, callback):