Skip to content

Commit e992ad3

Browse files
committed
fix sync update flood by ignoring changes to state key
1 parent abea3e6 commit e992ad3

File tree

1 file changed

+47
-18
lines changed

1 file changed

+47
-18
lines changed

jupyter_rtc_core/rooms/yroom.py

Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from typing import Literal, Tuple, Any
1616
from jupyter_server_fileid.manager import BaseFileIdManager
1717
from jupyter_server.services.contents.manager import AsyncContentsManager, ContentsManager
18+
from pycrdt import TransactionEvent
1819

1920
class YRoom:
2021
"""A Room to manage all client connection to one notebook file"""
@@ -69,12 +70,12 @@ def __init__(
6970
type[YBaseDoc],
7071
jupyter_ydoc_classes.get(file_type, jupyter_ydoc_classes["file"])
7172
)
72-
self.jupyter_ydoc = JupyterYDocClass(ydoc=self._ydoc, awareness=self._awareness)
73+
self._jupyter_ydoc = JupyterYDocClass(ydoc=self._ydoc, awareness=self._awareness)
7374

7475
# Initialize YRoomFileAPI and begin loading content
7576
self.file_api = YRoomFileAPI(
7677
room_id=self.room_id,
77-
jupyter_ydoc=self.jupyter_ydoc,
78+
jupyter_ydoc=self._jupyter_ydoc,
7879
log=self.log,
7980
loop=self._loop,
8081
fileid_manager=fileid_manager,
@@ -88,8 +89,9 @@ def __init__(
8889
self._on_awareness_update
8990
)
9091
self._ydoc_subscription = self._ydoc.observe(
91-
lambda event: self._on_ydoc_update(event.update)
92+
self._on_ydoc_update
9293
)
94+
self._jupyter_ydoc.observe(self._on_jupyter_ydoc_update)
9395

9496
# Initialize message queue and start background task that routes new
9597
# messages in the message queue to the appropriate handler method.
@@ -117,7 +119,7 @@ async def get_jupyter_ydoc(self):
117119
loaded from the ContentsManager.
118120
"""
119121
await self.file_api.ydoc_content_loaded
120-
return self.jupyter_ydoc
122+
return self._jupyter_ydoc
121123

122124

123125
async def get_ydoc(self):
@@ -297,10 +299,11 @@ def handle_sync_update(self, client_id: str, message: bytes) -> None:
297299
clients after this method is called via the `self._on_ydoc_update()`
298300
observer.
299301
"""
300-
# Remove client and kill websocket if received SyncUpdate when client is desynced
302+
# If client is desynced and sends a SyncUpdate, that results in a
303+
# protocol error. Close the WebSocket and return early in this case.
301304
if self._should_ignore_update(client_id, "SyncUpdate"):
302-
self.log.error(f"Should not receive SyncUpdate message when double handshake is not completed for client '{client_id}' and room '{self.room_id}'")
303-
self._client_group.remove(client_id)
305+
self.clients.remove(client_id)
306+
return
304307

305308
# Apply the SyncUpdate to the YDoc
306309
try:
@@ -315,23 +318,33 @@ def handle_sync_update(self, client_id: str, message: bytes) -> None:
315318
return
316319

317320

318-
def _on_ydoc_update(self, message_payload: bytes, client_id: str | None = None) -> None:
321+
def _on_ydoc_update(self, event: TransactionEvent) -> None:
319322
"""
320-
This method is an observer on `self.ydoc` which:
323+
This method is an observer on `self._ydoc` which broadcasts a
324+
`SyncUpdate` message to all synced clients whenever the YDoc changes.
321325
322-
- Broadcasts a SyncUpdate message payload to all connected clients by
323-
writing to their respective WebSockets,
326+
The YDoc is saved in the `self._on_jupyter_ydoc_update()` observer.
327+
"""
328+
update: bytes = event.update
329+
message = pycrdt.create_update_message(update)
330+
self._broadcast_message(message, message_type="SyncUpdate")
324331

325-
- Persists the contents of the updated YDoc by writing to disk.
326332

327-
This method can also be called manually.
333+
def _on_jupyter_ydoc_update(self, updated_key: str, *_) -> None:
328334
"""
329-
# Broadcast the message
330-
message = pycrdt.create_update_message(message_payload)
331-
self._broadcast_message(message, message_type="SyncUpdate")
335+
This method is an observer on `self._jupyter_ydoc` which saves the file
336+
whenever the YDoc changes, unless `updated_key == "state"`.
337+
338+
The `state` key is used by `jupyter_ydoc` to store temporary data like
339+
whether a file is 'dirty' (has unsaved changes). This data is not
340+
persisted, so changes to the `state` key should be ignored. Otherwise,
341+
an infinite loop of saves will result, as saving sets `dirty = False`.
332342
333-
# Save the file to disk
334-
self.file_api.schedule_save()
343+
This observer is separate because because `pycrdt.Doc.observe()` does
344+
not pass `updated_key` to `self._on_ydoc_update()`.
345+
"""
346+
if updated_key != "state":
347+
self.file_api.schedule_save()
335348

336349

337350
def handle_awareness_update(self, client_id: str, message: bytes) -> None:
@@ -377,6 +390,15 @@ def _broadcast_message(self, message: bytes, message_type: Literal['AwarenessUpd
377390
fails. `message_type` is used to produce more readable warnings.
378391
"""
379392
clients = self.clients.get_all()
393+
client_count = len(clients)
394+
if not client_count:
395+
return
396+
397+
if message_type == "SyncUpdate":
398+
self.log.info(
399+
f"Broadcasting SyncUpdate to all {client_count} synced clients."
400+
)
401+
380402
for client in clients:
381403
try:
382404
# TODO: remove this assertion once websocket is made required
@@ -389,6 +411,12 @@ def _broadcast_message(self, message: bytes, message_type: Literal['AwarenessUpd
389411
f"to client '{client.id}:'"
390412
)
391413
self.log.exception(e)
414+
continue
415+
416+
if message_type == "SyncUpdate":
417+
self.log.info(
418+
f"Broadcast of SyncUpdate complete."
419+
)
392420

393421
def _on_awareness_update(self, type: str, changes: tuple[dict[str, Any], Any]) -> None:
394422
"""
@@ -417,6 +445,7 @@ async def stop(self) -> None:
417445
# Remove all observers, as updates no longer need to be broadcast
418446
self._ydoc.unobserve(self._ydoc_subscription)
419447
self._awareness.unobserve(self._awareness_subscription)
448+
self._jupyter_ydoc.unobserve()
420449

421450
# Finish processing all messages, then stop the queue to end the
422451
# `_process_message_queue()` background task.

0 commit comments

Comments
 (0)