Skip to content

Commit 1148f38

Browse files
author
Jialin Zhang
committed
turn on server awareness messages to keep client websocket alive
1 parent a88717f commit 1148f38

File tree

2 files changed

+16
-16
lines changed

2 files changed

+16
-16
lines changed

jupyter_server_documents/rooms/yroom.py

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ def _init_awareness(self, ydoc: pycrdt.Doc) -> pycrdt.Awareness:
291291
self._awareness_subscription = self._awareness.observe(
292292
self._on_awareness_update
293293
)
294+
self._loop.create_task(self._awareness.start())
294295
return self._awareness
295296

296297

@@ -444,22 +445,22 @@ def handle_message(self, client_id: str, message: bytes) -> None:
444445
)
445446
# Handle Awareness messages
446447
elif message_type == YMessageType.AWARENESS:
447-
self.log.debug(f"Received AwarenessUpdate from '{client_id}'.")
448+
self.log.debug(f"Received AwarenessUpdate from '{client_id}' for room '{self.room_id}'.")
448449
self.handle_awareness_update(client_id, message)
449-
self.log.debug(f"Handled AwarenessUpdate from '{client_id}'.")
450+
self.log.debug(f"Handled AwarenessUpdate from '{client_id}' for room '{self.room_id}'.")
450451
# Handle Sync messages
451452
elif sync_message_subtype == YSyncMessageSubtype.SYNC_STEP1:
452-
self.log.info(f"Received SS1 from '{client_id}'.")
453+
self.log.info(f"Received SS1 from '{client_id}' for room '{self.room_id}'.")
453454
self.handle_sync_step1(client_id, message)
454-
self.log.info(f"Handled SS1 from '{client_id}'.")
455+
self.log.info(f"Handled SS1 from '{client_id}' for room '{self.room_id}'.")
455456
elif sync_message_subtype == YSyncMessageSubtype.SYNC_STEP2:
456-
self.log.info(f"Received SS2 from '{client_id}'.")
457+
self.log.info(f"Received SS2 from '{client_id}' for room '{self.room_id}'.")
457458
self.handle_sync_step2(client_id, message)
458-
self.log.info(f"Handled SS2 from '{client_id}'.")
459+
self.log.info(f"Handled SS2 from '{client_id}' for room '{self.room_id}'.")
459460
elif sync_message_subtype == YSyncMessageSubtype.SYNC_UPDATE:
460-
self.log.info(f"Received SyncUpdate from '{client_id}'.")
461+
self.log.info(f"Received SyncUpdate from '{client_id} for room '{self.room_id}''.")
461462
self.handle_sync_update(client_id, message)
462-
self.log.info(f"Handled SyncUpdate from '{client_id}'.")
463+
self.log.info(f"Handled SyncUpdate from '{client_id} for room '{self.room_id}''.")
463464

464465

465466
def handle_sync_step1(self, client_id: str, message: bytes) -> None:
@@ -668,9 +669,6 @@ def handle_awareness_update(self, client_id: str, message: bytes) -> None:
668669
self.log.exception(e)
669670
return
670671

671-
# Broadcast AwarenessUpdate message to all other synced clients
672-
self._broadcast_message(message, message_type="AwarenessUpdate")
673-
674672

675673
def _should_ignore_update(self, client_id: str, message_type: Literal['AwarenessUpdate', 'SyncUpdate']) -> bool:
676674
"""
@@ -704,7 +702,7 @@ def _broadcast_message(self, message: bytes, message_type: Literal['AwarenessUpd
704702

705703
if message_type == "SyncUpdate":
706704
self.log.info(
707-
f"Broadcasting SyncUpdate to all {client_count} synced clients."
705+
f"Broadcasting {message_type} to all {client_count} synced clients."
708706
)
709707

710708
for client in clients:
@@ -723,7 +721,7 @@ def _broadcast_message(self, message: bytes, message_type: Literal['AwarenessUpd
723721

724722
if message_type == "SyncUpdate":
725723
self.log.info(
726-
f"Broadcast of SyncUpdate complete."
724+
f"Broadcast of {message_type} complete for room {self.room_id}."
727725
)
728726

729727
def _on_awareness_update(self, type: str, changes: tuple[dict[str, Any], Any]) -> None:
@@ -735,12 +733,13 @@ def _on_awareness_update(self, type: str, changes: tuple[dict[str, Any], Any]) -
735733
type: The change type.
736734
changes: The awareness changes.
737735
"""
738-
if type != "update" or changes[1] != "local":
739-
return
740736

737+
self.log.debug(f"awareness update, type={type}, changes={changes}, changes[1]={changes[1]}, meta={self._awareness.meta}, ydoc.clientid={self._ydoc.client_id}, roomId={self.room_id}")
741738
updated_clients = [v for value in changes[0].values() for v in value]
739+
self.log.debug(f"awareness update, updated_clients={updated_clients}")
742740
state = self._awareness.encode_awareness_update(updated_clients)
743741
message = pycrdt.create_awareness_message(state)
742+
self.log.debug(f"awareness update, message={message}")
744743
self._broadcast_message(message, "AwarenessUpdate")
745744

746745

jupyter_server_documents/websockets/clients.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ def remove(self, client_id: str) -> None:
125125
return
126126

127127
try:
128+
self.log.debug(f"client {client_id} is closed in remove method.")
128129
client.websocket.close()
129130
except Exception as e:
130131
self.log.exception(f"An exception occurred when remove client '{client_id}' for room '{self.room_id}': {e}")
@@ -169,7 +170,7 @@ async def _clean_desynced(self) -> None:
169170
self.remove(client_id)
170171
for (client_id, client) in list(self.synced.items()):
171172
if client.websocket is None or client.websocket.ws_connection is None:
172-
self.log.warning(f"Remove client '{client_id}' for room '{self.room_id}' since client does not become synced after {self.desynced_timeout_seconds} seconds.")
173+
self.log.warning(f"Remove client '{client_id}' for room '{self.room_id}' since client websocket is closed")
173174
self.remove(client_id)
174175
except asyncio.CancelledError:
175176
break

0 commit comments

Comments
 (0)