diff --git a/jupyter_server_documents/rooms/yroom.py b/jupyter_server_documents/rooms/yroom.py index 62d9ed3..df4c903 100644 --- a/jupyter_server_documents/rooms/yroom.py +++ b/jupyter_server_documents/rooms/yroom.py @@ -291,6 +291,7 @@ def _init_awareness(self, ydoc: pycrdt.Doc) -> pycrdt.Awareness: self._awareness_subscription = self._awareness.observe( self._on_awareness_update ) + asyncio.create_task(self._awareness.start()) return self._awareness @@ -444,22 +445,22 @@ def handle_message(self, client_id: str, message: bytes) -> None: ) # Handle Awareness messages elif message_type == YMessageType.AWARENESS: - self.log.debug(f"Received AwarenessUpdate from '{client_id}'.") + self.log.debug(f"Received AwarenessUpdate from '{client_id}' for room '{self.room_id}'.") self.handle_awareness_update(client_id, message) - self.log.debug(f"Handled AwarenessUpdate from '{client_id}'.") + self.log.debug(f"Handled AwarenessUpdate from '{client_id}' for room '{self.room_id}'.") # Handle Sync messages elif sync_message_subtype == YSyncMessageSubtype.SYNC_STEP1: - self.log.info(f"Received SS1 from '{client_id}'.") + self.log.info(f"Received SS1 from '{client_id}' for room '{self.room_id}'.") self.handle_sync_step1(client_id, message) - self.log.info(f"Handled SS1 from '{client_id}'.") + self.log.info(f"Handled SS1 from '{client_id}' for room '{self.room_id}'.") elif sync_message_subtype == YSyncMessageSubtype.SYNC_STEP2: - self.log.info(f"Received SS2 from '{client_id}'.") + self.log.info(f"Received SS2 from '{client_id}' for room '{self.room_id}'.") self.handle_sync_step2(client_id, message) - self.log.info(f"Handled SS2 from '{client_id}'.") + self.log.info(f"Handled SS2 from '{client_id}' for room '{self.room_id}'.") elif sync_message_subtype == YSyncMessageSubtype.SYNC_UPDATE: - self.log.info(f"Received SyncUpdate from '{client_id}'.") + self.log.info(f"Received SyncUpdate from '{client_id} for room '{self.room_id}''.") self.handle_sync_update(client_id, message) - self.log.info(f"Handled SyncUpdate from '{client_id}'.") + self.log.info(f"Handled SyncUpdate from '{client_id} for room '{self.room_id}''.") def handle_sync_step1(self, client_id: str, message: bytes) -> None: @@ -668,9 +669,6 @@ def handle_awareness_update(self, client_id: str, message: bytes) -> None: self.log.exception(e) return - # Broadcast AwarenessUpdate message to all other synced clients - self._broadcast_message(message, message_type="AwarenessUpdate") - def _should_ignore_update(self, client_id: str, message_type: Literal['AwarenessUpdate', 'SyncUpdate']) -> bool: """ @@ -704,7 +702,7 @@ def _broadcast_message(self, message: bytes, message_type: Literal['AwarenessUpd if message_type == "SyncUpdate": self.log.info( - f"Broadcasting SyncUpdate to all {client_count} synced clients." + f"Broadcasting {message_type} to all {client_count} synced clients." ) for client in clients: @@ -723,7 +721,7 @@ def _broadcast_message(self, message: bytes, message_type: Literal['AwarenessUpd if message_type == "SyncUpdate": self.log.info( - f"Broadcast of SyncUpdate complete." + f"Broadcast of {message_type} complete for room {self.room_id}." ) def _on_awareness_update(self, type: str, changes: tuple[dict[str, Any], Any]) -> None: @@ -735,12 +733,13 @@ def _on_awareness_update(self, type: str, changes: tuple[dict[str, Any], Any]) - type: The change type. changes: The awareness changes. """ - if type != "update" or changes[1] != "local": - return + self.log.debug(f"awareness update, type={type}, changes={changes}, changes[1]={changes[1]}, meta={self._awareness.meta}, ydoc.clientid={self._ydoc.client_id}, roomId={self.room_id}") updated_clients = [v for value in changes[0].values() for v in value] + self.log.debug(f"awareness update, updated_clients={updated_clients}") state = self._awareness.encode_awareness_update(updated_clients) message = pycrdt.create_awareness_message(state) + self.log.debug(f"awareness update, message={message}") self._broadcast_message(message, "AwarenessUpdate") diff --git a/jupyter_server_documents/websockets/clients.py b/jupyter_server_documents/websockets/clients.py index ac4dbe4..3d15191 100644 --- a/jupyter_server_documents/websockets/clients.py +++ b/jupyter_server_documents/websockets/clients.py @@ -125,6 +125,7 @@ def remove(self, client_id: str) -> None: return try: + self.log.debug(f"client {client_id} is closed in remove method.") client.websocket.close() except Exception as e: self.log.exception(f"An exception occurred when remove client '{client_id}' for room '{self.room_id}': {e}") @@ -169,7 +170,7 @@ async def _clean_desynced(self) -> None: self.remove(client_id) for (client_id, client) in list(self.synced.items()): if client.websocket is None or client.websocket.ws_connection is None: - self.log.warning(f"Remove client '{client_id}' for room '{self.room_id}' since client does not become synced after {self.desynced_timeout_seconds} seconds.") + self.log.warning(f"Remove client '{client_id}' for room '{self.room_id}' since client websocket is closed") self.remove(client_id) except asyncio.CancelledError: break