Skip to content

Commit 21ecc8a

Browse files
committed
reconnect on receive
1 parent 44c41bc commit 21ecc8a

File tree

1 file changed

+37
-15
lines changed

1 file changed

+37
-15
lines changed

template/server/messaging.py

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -83,55 +83,63 @@ async def connect(self):
8383
async def reconnect(self, max_retries: int = 5, retry_delay: float = 0.1):
8484
"""Reconnect the WebSocket if it's disconnected with retry logic."""
8585
logger.info(f"Attempting to reconnect WebSocket {self.context_id}")
86-
86+
8787
# Close existing connection if any
8888
if self._ws is not None:
8989
try:
9090
await self._ws.close()
9191
except Exception as e:
9292
logger.warning(f"Error closing existing WebSocket: {e}")
93-
93+
9494
# Cancel existing receive task if any
9595
if self._receive_task is not None and not self._receive_task.done():
9696
self._receive_task.cancel()
9797
try:
9898
await self._receive_task
9999
except asyncio.CancelledError:
100100
pass
101-
101+
102102
# Reset WebSocket and task references
103103
self._ws = None
104104
self._receive_task = None
105-
105+
106106
# Attempt to reconnect with fixed delay
107107
for attempt in range(max_retries):
108108
try:
109109
await self.connect()
110-
logger.info(f"Successfully reconnected WebSocket {self.context_id} on attempt {attempt + 1}")
110+
logger.info(
111+
f"Successfully reconnected WebSocket {self.context_id} on attempt {attempt + 1}"
112+
)
111113
return True
112114
except Exception as e:
113115
if attempt < max_retries - 1:
114-
logger.warning(f"Reconnection attempt {attempt + 1} failed: {e}. Retrying in {retry_delay}s...")
116+
logger.warning(
117+
f"Reconnection attempt {attempt + 1} failed: {e}. Retrying in {retry_delay}s..."
118+
)
115119
await asyncio.sleep(retry_delay)
116120
else:
117-
logger.error(f"Failed to reconnect WebSocket {self.context_id} after {max_retries} attempts: {e}")
121+
logger.error(
122+
f"Failed to reconnect WebSocket {self.context_id} after {max_retries} attempts: {e}"
123+
)
118124
return False
119-
125+
120126
return False
121127

122128
def is_connected(self) -> bool:
123129
"""Check if the WebSocket is connected and healthy."""
124130
return (
125-
self._ws is not None
126-
and not self._ws.closed
127-
and self._receive_task is not None
131+
self._ws is not None
132+
and not self._ws.closed
133+
and self._receive_task is not None
128134
and not self._receive_task.done()
129135
)
130136

131137
async def ensure_connected(self):
132138
"""Ensure WebSocket is connected, reconnect if necessary."""
133139
if not self.is_connected():
134-
logger.warning(f"WebSocket {self.context_id} is not connected, attempting to reconnect")
140+
logger.warning(
141+
f"WebSocket {self.context_id} is not connected, attempting to reconnect"
142+
)
135143
success = await self.reconnect()
136144
if not success:
137145
raise Exception(f"Failed to reconnect WebSocket {self.context_id}")
@@ -302,10 +310,10 @@ async def change_current_directory(
302310
):
303311
message_id = str(uuid.uuid4())
304312
self._executions[message_id] = Execution(in_background=True)
305-
313+
306314
# Ensure WebSocket is connected before changing directory
307315
await self.ensure_connected()
308-
316+
309317
if language == "python":
310318
request = self._get_execute_request(message_id, f"%cd {path}", True)
311319
elif language == "deno":
@@ -328,7 +336,7 @@ async def change_current_directory(
328336

329337
if self._ws is None:
330338
raise Exception("WebSocket not connected")
331-
339+
332340
await self._ws.send(request)
333341

334342
async for item in self._wait_for_result(message_id):
@@ -412,6 +420,20 @@ async def _receive_message(self):
412420
await self._process_message(json.loads(message))
413421
except Exception as e:
414422
logger.error(f"WebSocket received error while receiving messages: {str(e)}")
423+
424+
# Attempt to reconnect when connection drops
425+
logger.info("Attempting to reconnect due to connection loss...")
426+
reconnect_success = await self.reconnect()
427+
428+
if reconnect_success:
429+
logger.info("Successfully reconnected after connection loss")
430+
# Continue receiving messages with the new connection
431+
try:
432+
async for message in self._ws:
433+
await self._process_message(json.loads(message))
434+
except Exception as reconnect_e:
435+
logger.error(f"Error in reconnected WebSocket: {str(reconnect_e)}")
436+
415437
# Mark all pending executions as failed due to connection loss
416438
for execution in self._executions.values():
417439
await execution.queue.put(

0 commit comments

Comments
 (0)