From 1d265c1d24a8ddd2bfe4abb96806376b9f63a2f2 Mon Sep 17 00:00:00 2001 From: Eduardo Aguilar Date: Wed, 1 Apr 2020 12:06:38 +0200 Subject: [PATCH 1/2] websocket: add timeout, reconnect on exceptions Handle exceptions on 'recv' websocket method to reconnect. Adding the timeout based on ping interval also detects a missing ping. Signed-off-by: Eduardo Aguilar --- ventilator_websocket.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/ventilator_websocket.py b/ventilator_websocket.py index 70b1233..287223b 100755 --- a/ventilator_websocket.py +++ b/ventilator_websocket.py @@ -39,6 +39,12 @@ def do_handshake(self): self.send_msg(hello_msg) reply = self.ws.recv() print(reply) + try: + msg = json.loads(reply) + self.interval = msg['heartbeat']['interval'] + self.ws.settimeout(self.interval/1000) + except: + print("Could not parse handshake response") def run(self, name): print("Starting {}".format(name)) @@ -50,7 +56,16 @@ def run(self, name): self.subscribe('settings') while True: - json_msg = self.ws.recv() + try: + json_msg = self.ws.recv() + except: + print("Timeout or socket closed: reconnecting") + self.ws.close() + self.attempt_reconnect() + self.do_handshake() + self.subscribe('settings') + continue + try: msg = json.loads(json_msg) if msg['type'] == "ping": From 86026f293fd9d8f309a59c40e53e61114bdc16be Mon Sep 17 00:00:00 2001 From: Eduardo Aguilar Date: Wed, 1 Apr 2020 13:04:42 +0200 Subject: [PATCH 2/2] websocket: raise alarm on connectivity loss Signed-off-by: Eduardo Aguilar --- daemon.py | 2 +- ventilator_alarm.py | 2 +- ventilator_websocket.py | 7 ++++++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/daemon.py b/daemon.py index 0b8ef89..c7795a7 100755 --- a/daemon.py +++ b/daemon.py @@ -44,7 +44,7 @@ def run(): else: ser_handler = SerialHandler(db_queue, request_queue, serial_output_queue, alarm_input_queue) db_handler = DbClient(db_queue) - websocket_handler = WebsocketHandler(serial_output_queue) + websocket_handler = WebsocketHandler(serial_output_queue, alarm_input_queue) alarm_handler = AlarmHandler(alarm_input_queue,serial_output_queue, request_queue) request_handler = RequestHandler(api_request, request_queue) diff --git a/ventilator_alarm.py b/ventilator_alarm.py index 1e70222..4e78fee 100644 --- a/ventilator_alarm.py +++ b/ventilator_alarm.py @@ -42,7 +42,7 @@ def run(self, name): msg = None if msg != None: - if msg['type'] == "ALARM": + if msg['type'] == proto.alarm: self.time_last_kick_received == cur_time if msg['val'] != 0: self.request_queue.put({'type': 'error', 'value': msg['val']}) diff --git a/ventilator_websocket.py b/ventilator_websocket.py index 287223b..41489ab 100755 --- a/ventilator_websocket.py +++ b/ventilator_websocket.py @@ -23,6 +23,9 @@ def handle_settings(self, settings): print("send setting {}".format(key)) self.serial_queue.put({'type': key, 'val': settings[key]}) + def raise_alarm(self): + self.alarm_queue.put({'type': proto.alarm, 'val': 1}) + def subscribe(self, path): """ Subscribe to updates @@ -60,6 +63,7 @@ def run(self, name): json_msg = self.ws.recv() except: print("Timeout or socket closed: reconnecting") + self.raise_alarm() self.ws.close() self.attempt_reconnect() self.do_handshake() @@ -88,10 +92,11 @@ def attempt_reconnect(self): except: continue - def __init__(self, serial_queue, addr='localhost', port=3001): + def __init__(self, serial_queue, alarm_queue, addr='localhost', port=3001): self.url = "ws://" + addr + ":" + str(port) + "/" self.id = 1 self.serial_queue = serial_queue + self.alarm_queue = alarm_queue if __name__ == "__main__":