Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 14 additions & 15 deletions jupyter_server_documents/rooms/yroom.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ def _init_awareness(self, ydoc: pycrdt.Doc) -> pycrdt.Awareness:
self._awareness_subscription = self._awareness.observe(
self._on_awareness_update
)
asyncio.create_task(self._awareness.start())
return self._awareness


Expand Down Expand Up @@ -444,22 +445,22 @@ def handle_message(self, client_id: str, message: bytes) -> None:
)
# Handle Awareness messages
elif message_type == YMessageType.AWARENESS:
self.log.debug(f"Received AwarenessUpdate from '{client_id}'.")
self.log.debug(f"Received AwarenessUpdate from '{client_id}' for room '{self.room_id}'.")
self.handle_awareness_update(client_id, message)
self.log.debug(f"Handled AwarenessUpdate from '{client_id}'.")
self.log.debug(f"Handled AwarenessUpdate from '{client_id}' for room '{self.room_id}'.")
# Handle Sync messages
elif sync_message_subtype == YSyncMessageSubtype.SYNC_STEP1:
self.log.info(f"Received SS1 from '{client_id}'.")
self.log.info(f"Received SS1 from '{client_id}' for room '{self.room_id}'.")
self.handle_sync_step1(client_id, message)
self.log.info(f"Handled SS1 from '{client_id}'.")
self.log.info(f"Handled SS1 from '{client_id}' for room '{self.room_id}'.")
elif sync_message_subtype == YSyncMessageSubtype.SYNC_STEP2:
self.log.info(f"Received SS2 from '{client_id}'.")
self.log.info(f"Received SS2 from '{client_id}' for room '{self.room_id}'.")
self.handle_sync_step2(client_id, message)
self.log.info(f"Handled SS2 from '{client_id}'.")
self.log.info(f"Handled SS2 from '{client_id}' for room '{self.room_id}'.")
elif sync_message_subtype == YSyncMessageSubtype.SYNC_UPDATE:
self.log.info(f"Received SyncUpdate from '{client_id}'.")
self.log.info(f"Received SyncUpdate from '{client_id} for room '{self.room_id}''.")
self.handle_sync_update(client_id, message)
self.log.info(f"Handled SyncUpdate from '{client_id}'.")
self.log.info(f"Handled SyncUpdate from '{client_id} for room '{self.room_id}''.")


def handle_sync_step1(self, client_id: str, message: bytes) -> None:
Expand Down Expand Up @@ -668,9 +669,6 @@ def handle_awareness_update(self, client_id: str, message: bytes) -> None:
self.log.exception(e)
return

# Broadcast AwarenessUpdate message to all other synced clients
self._broadcast_message(message, message_type="AwarenessUpdate")

Comment on lines -671 to -673
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch Jialin, thank you for removing this & the condition in _on_awareness_update().

I had to check the pycrdt source code (Awareness._emit()) to figure out why your PR works. The two deleted portions of code were used together to handle broadcasting awareness updates in two areas; this was done following the original implementation in jupyter_collaboration. But by removing those 2 portions of code, all of the broadcasting is now done in _on_awareness_update(). Fantastic! 👍

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key fix is to turn on server awareness message asyncio.create_task(self._awareness.start()), previously we did not kick start this task so a server generated awareness message (like a ping message to keep client side aware of its aliveness) are not sent every 15 seconds. And yes, you are right, removing the condition in _on_awareness_update() will allow those server awareness message to be broadcasted. Last, removing the broadcast call in handle_awareness_update method is to prevent a client awareness message for broadcasted twice.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for reviewing, @dlqqq !

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Server side awareness messages will keep the ydoc websocket alive even if client side stops sending awareness messages due to browser background tab throttling.


def _should_ignore_update(self, client_id: str, message_type: Literal['AwarenessUpdate', 'SyncUpdate']) -> bool:
"""
Expand Down Expand Up @@ -704,7 +702,7 @@ def _broadcast_message(self, message: bytes, message_type: Literal['AwarenessUpd

if message_type == "SyncUpdate":
self.log.info(
f"Broadcasting SyncUpdate to all {client_count} synced clients."
f"Broadcasting {message_type} to all {client_count} synced clients."
)

for client in clients:
Expand All @@ -723,7 +721,7 @@ def _broadcast_message(self, message: bytes, message_type: Literal['AwarenessUpd

if message_type == "SyncUpdate":
self.log.info(
f"Broadcast of SyncUpdate complete."
f"Broadcast of {message_type} complete for room {self.room_id}."
)

def _on_awareness_update(self, type: str, changes: tuple[dict[str, Any], Any]) -> None:
Expand All @@ -735,12 +733,13 @@ def _on_awareness_update(self, type: str, changes: tuple[dict[str, Any], Any]) -
type: The change type.
changes: The awareness changes.
"""
if type != "update" or changes[1] != "local":
return

self.log.debug(f"awareness update, type={type}, changes={changes}, changes[1]={changes[1]}, meta={self._awareness.meta}, ydoc.clientid={self._ydoc.client_id}, roomId={self.room_id}")
updated_clients = [v for value in changes[0].values() for v in value]
self.log.debug(f"awareness update, updated_clients={updated_clients}")
state = self._awareness.encode_awareness_update(updated_clients)
message = pycrdt.create_awareness_message(state)
self.log.debug(f"awareness update, message={message}")
self._broadcast_message(message, "AwarenessUpdate")


Expand Down
3 changes: 2 additions & 1 deletion jupyter_server_documents/websockets/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def remove(self, client_id: str) -> None:
return

try:
self.log.debug(f"client {client_id} is closed in remove method.")
client.websocket.close()
except Exception as e:
self.log.exception(f"An exception occurred when remove client '{client_id}' for room '{self.room_id}': {e}")
Expand Down Expand Up @@ -169,7 +170,7 @@ async def _clean_desynced(self) -> None:
self.remove(client_id)
for (client_id, client) in list(self.synced.items()):
if client.websocket is None or client.websocket.ws_connection is None:
self.log.warning(f"Remove client '{client_id}' for room '{self.room_id}' since client does not become synced after {self.desynced_timeout_seconds} seconds.")
self.log.warning(f"Remove client '{client_id}' for room '{self.room_id}' since client websocket is closed")
self.remove(client_id)
except asyncio.CancelledError:
break
Expand Down
Loading