Skip to content
This repository was archived by the owner on May 6, 2024. It is now read-only.

Commit a2b4eec

Browse files
Added a socket keep alive thread
Work around for the hearth beat reset error
1 parent 0ae67f8 commit a2b4eec

File tree

2 files changed

+41
-0
lines changed

2 files changed

+41
-0
lines changed

vantage/node/__init__.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ def on_expired_token(self, msg):
100100
self.node_worker_ref.__sync_task_que_with_server()
101101
self.log.debug("Tasks synced again with the server...")
102102

103+
def on_pang(self, msg):
104+
self.log.debug(f"Pong received, WS still connected <{msg}>")
105+
self.node_worker_ref.socket_connected = True
106+
103107
# ------------------------------------------------------------------------------
104108
class NodeWorker:
105109
""" Node to handle incomming computation requests.
@@ -203,6 +207,9 @@ def __init__(self, ctx):
203207
t = Thread(target=self.__proxy_server_worker, daemon=True)
204208
t.start()
205209

210+
self.log.info("starting thread to check the socker connection")
211+
t = Thread(target=self.__keep_socket_alive, daemon=True)
212+
t.start()
206213
# after here, you should/could call self.run_forever(). This
207214
# could be done in a seperate Thread
208215

@@ -485,6 +492,34 @@ def __speaking_worker(self):
485492
'finished_at': datetime.datetime.now().isoformat(),
486493
}
487494
)
495+
496+
def __keep_socket_alive(self):
497+
498+
while True:
499+
time.sleep(60)
500+
501+
# send ping
502+
self.socket_connected = False
503+
self.socket_tasks.emit("ping", self.server_io.whoami.id_)
504+
505+
# wait for pong
506+
max_waiting_time = 5
507+
count = 0
508+
while (not self.socket_connected) and count < max_waiting_time:
509+
self.log.debug("Waiting for pong")
510+
time.sleep(1)
511+
count += 1
512+
513+
if not self.socket_connected:
514+
self.log.warn("WS seems disconnected, resetting")
515+
self.socketIO.disconnect()
516+
self.log.debug("Disconnecting WS")
517+
self.server_io.refresh_token()
518+
self.log.debug("Token refreshed")
519+
self.connect_to_socket()
520+
self.log.debug("Connected to socket")
521+
self.__sync_task_que_with_server()
522+
488523

489524
# ------------------------------------------------------------------------------
490525
def run(ctx):

vantage/server/websockets.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def on_connect(self):
5858

5959
if session.type == 'node':
6060
session.rooms.append('collaboration_' + str(auth.collaboration_id))
61+
session.rooms.append('node_' + str(auth.id))
6162
elif session.type == 'user':
6263
session.rooms.append('user_'+str(auth.id))
6364

@@ -103,6 +104,11 @@ def on_container_failed(self, node_id, status_code, result_id, collaboration_id)
103104
room = "collaboration_"+str(collaboration_id)
104105
emit("container_failed", run_id, room=room)
105106

107+
def on_ping(self, node_id):
108+
# self.log.debug(f"ping from id={node_id}")
109+
room = f"node_{node_id}"
110+
emit("pang","success!", room=room)
111+
106112
def __join_room_and_notify(self, room):
107113
join_room(room)
108114
msg = f'<{session.name}> joined room <{room}>'

0 commit comments

Comments
 (0)