From f67596482d434c6be802c5152464b3e0263d6976 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Thu, 14 Mar 2024 08:02:27 -0700 Subject: [PATCH 1/9] prefer if/else over if/if not --- jupyter_collaboration/rooms.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/jupyter_collaboration/rooms.py b/jupyter_collaboration/rooms.py index e4e83264..1125c51b 100644 --- a/jupyter_collaboration/rooms.py +++ b/jupyter_collaboration/rooms.py @@ -120,7 +120,16 @@ async def initialize(self) -> None: # YDoc not found in the YStore, create the document from the source file (no change history) pass - if not read_from_source: + if read_from_source: + self._emit(LogLevel.INFO, "load", "Content loaded from disk.") + self.log.info( + "Content in room %s loaded from file %s", self._room_id, self._file.path + ) + self._document.source = model["content"] + + if self.ystore: + await self.ystore.encode_state_as_update(self.ydoc) + else: # if YStore updates and source file are out-of-sync, resync updates with source if self._document.source != model["content"]: # TODO: Delete document from the store. @@ -134,15 +143,6 @@ async def initialize(self) -> None: ) read_from_source = True - if read_from_source: - self._emit(LogLevel.INFO, "load", "Content loaded from disk.") - self.log.info( - "Content in room %s loaded from file %s", self._room_id, self._file.path - ) - self._document.source = model["content"] - - if self.ystore: - await self.ystore.encode_state_as_update(self.ydoc) self._document.dirty = False self.ready = True From 0fd9b4c16908cd1bb0ae336d309f358cfe014d8f Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Thu, 14 Mar 2024 15:29:37 -0700 Subject: [PATCH 2/9] refactor DocumentRoom to not rely on asyncio locks --- jupyter_collaboration/rooms.py | 195 ++++++++++++++++++--------------- 1 file changed, 107 insertions(+), 88 deletions(-) diff --git a/jupyter_collaboration/rooms.py b/jupyter_collaboration/rooms.py index 1125c51b..68a8483c 100644 --- a/jupyter_collaboration/rooms.py +++ b/jupyter_collaboration/rooms.py @@ -43,11 +43,24 @@ def __init__( self._logger = logger self._save_delay = save_delay - self._update_lock = asyncio.Lock() - self._initialization_lock = asyncio.Lock() self._cleaner: asyncio.Task | None = None - self._saving_document: asyncio.Task | None = None - self._messages: dict[str, asyncio.Lock] = {} + + # the task currently saving to disk via FileLoader, if any. + self._save_task: asyncio.Task | None = None + + # flag that indicates whether a previous call to + # `self._maybe_save_document()` is waiting to save. + # + # if `self._maybe_save_document()` is called while this flag is `True`, + # then the call does nothing, as a previous task will save the Ydoc + # within `self._save_delay` seconds. + self._waiting_to_save = False + + # flag that indicates whether a previous call to + # `self._maybe_save_document()` should call itself again after + # `self._save_task` finishes. this is set to `True` when a document + # update occurs while `self._save_task` is running. + self._should_resave = False # Listen for document changes self._document.observe(self._on_document_change) @@ -78,7 +91,8 @@ async def initialize(self) -> None: """ Initializes the room. - This method is thread safe so only one client can initialize the room. + This method is not coroutine-safe, i.e. consumers must await this method + before calling any other methods. To initialize the room, we check if the content was already in the store as a Y updates and if it is up to date with the content on disk. In this @@ -89,64 +103,61 @@ async def initialize(self) -> None: It is important to set the ready property in the parent class (`self.ready = True`), this setter will subscribe for updates on the shared document. """ - async with self._initialization_lock: - if self.ready: # type: ignore[has-type] - return + if self.ready: # type: ignore[has-type] + return - self.log.info("Initializing room %s", self._room_id) + self.log.info("Initializing room %s", self._room_id) + model = await self._file.load_content(self._file_format, self._file_type) - model = await self._file.load_content(self._file_format, self._file_type) + # try to apply Y updates from the YStore for this document + read_from_source = True + if self.ystore is not None: + try: + await self.ystore.apply_updates(self.ydoc) + self._emit( + LogLevel.INFO, + "load", + "Content loaded from the store {}".format( + self.ystore.__class__.__qualname__ + ), + ) + self.log.info( + "Content in room %s loaded from the ystore %s", + self._room_id, + self.ystore.__class__.__name__, + ) + read_from_source = False + except YDocNotFound: + # YDoc not found in the YStore, create the document from the source file (no change history) + pass + + if read_from_source: + self._emit(LogLevel.INFO, "load", "Content loaded from disk.") + self.log.info( + "Content in room %s loaded from file %s", self._room_id, self._file.path + ) + self._document.source = model["content"] - async with self._update_lock: - # try to apply Y updates from the YStore for this document + if self.ystore: + await self.ystore.encode_state_as_update(self.ydoc) + else: + # if YStore updates and source file are out-of-sync, resync updates with source + if self._document.source != model["content"]: + # TODO: Delete document from the store. + self._emit( + LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore." + ) + self.log.info( + "Content in file %s is out-of-sync with the ystore %s", + self._file.path, + self.ystore.__class__.__name__, + ) read_from_source = True - if self.ystore is not None: - try: - await self.ystore.apply_updates(self.ydoc) - self._emit( - LogLevel.INFO, - "load", - "Content loaded from the store {}".format( - self.ystore.__class__.__qualname__ - ), - ) - self.log.info( - "Content in room %s loaded from the ystore %s", - self._room_id, - self.ystore.__class__.__name__, - ) - read_from_source = False - except YDocNotFound: - # YDoc not found in the YStore, create the document from the source file (no change history) - pass - - if read_from_source: - self._emit(LogLevel.INFO, "load", "Content loaded from disk.") - self.log.info( - "Content in room %s loaded from file %s", self._room_id, self._file.path - ) - self._document.source = model["content"] - - if self.ystore: - await self.ystore.encode_state_as_update(self.ydoc) - else: - # if YStore updates and source file are out-of-sync, resync updates with source - if self._document.source != model["content"]: - # TODO: Delete document from the store. - self._emit( - LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore." - ) - self.log.info( - "Content in file %s is out-of-sync with the ystore %s", - self._file.path, - self.ystore.__class__.__name__, - ) - read_from_source = True - - - self._document.dirty = False - self.ready = True - self._emit(LogLevel.INFO, "initialize", "Room initialized") + + + self._document.dirty = False + self.ready = True + self._emit(LogLevel.INFO, "initialize", "Room initialized") def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None: data = {"level": level.value, "room": self._room_id, "path": self._file.path} @@ -165,8 +176,8 @@ def stop(self) -> None: """ super().stop() # TODO: Should we cancel or wait ? - if self._saving_document: - self._saving_document.cancel() + if self._save_task: + self._save_task.cancel() self._document.unobserve() self._file.unobserve(self.room_id) @@ -193,9 +204,8 @@ async def _on_outofband_change(self) -> None: self._emit(LogLevel.ERROR, None, msg) return - async with self._update_lock: - self._document.source = model["content"] - self._document.dirty = False + self._document.source = model["content"] + self._document.dirty = False def _on_document_change(self, target: str, event: Any) -> None: """ @@ -212,49 +222,60 @@ def _on_document_change(self, target: str, event: Any) -> None: document. This tasks are debounced (60 seconds by default) so we need to cancel previous tasks before creating a new one. """ - if self._update_lock.locked(): - return - - self._saving_document = asyncio.create_task( - self._maybe_save_document(self._saving_document) - ) + asyncio.create_task(self._maybe_save_document()) - async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> None: + async def _maybe_save_document(self) -> None: """ Saves the content of the document to disk. ### Note: - There is a save delay to debounce the save since we could receive a high + There is a save delay to throttle the save since we could receive a high amount of changes in a short period of time. This way we can cancel the previous save. """ if self._save_delay is None: + # TODO: fix this; if _save_delay is unset, then this never writes to disk return - if saving_document is not None and not saving_document.done(): - # the document is being saved, cancel that - saving_document.cancel() + if self._waiting_to_save: + # if a previously spawned `self._maybe_save_document()` task is + # waiting to save, then that task will save the Ydoc within + # `self._save_delay` seconds. therefore we can return early. + return + + if self._save_task and not self._save_task.done(): + # if we are currently saving, then set the `_should_resave` + # flag. this indicates to the currently running `self._save_task` + # that it should re-call this method after it completes. + self._should_resave = True + return + + # save after `self._save_delay` seconds of inactivity + self._waiting_to_save = True + await asyncio.sleep(self._save_delay) + self._waiting_to_save = False # all async code (i.e. await statements) must be part of this try/except block # because this coroutine is run in a cancellable task and cancellation is handled here - try: - # save after X seconds of inactivity - await asyncio.sleep(self._save_delay) - + # do not write to `self._document` in this `try` block, as that will + # trigger the observer and result in a save loop. self.log.info("Saving the content from room %s", self._room_id) - await self._file.maybe_save_content( + loop = asyncio.get_running_loop() + self._save_task = loop.create_task(self._file.maybe_save_content( { "format": self._file_format, "type": self._file_type, "content": self._document.source, } - ) - async with self._update_lock: - self._document.dirty = False - + )) + await self._save_task self._emit(LogLevel.INFO, "save", "Content saved.") + if self._should_resave: + self._should_resave = False + asyncio.create_task(self._maybe_save_document()) + except asyncio.CancelledError: return @@ -268,10 +289,8 @@ async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> No self._emit(LogLevel.ERROR, None, msg) return None - async with self._update_lock: - self._document.source = model["content"] - self._document.dirty = False - + self._document.source = model["content"] + self._document.dirty = False self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes while saving.") except Exception as e: From d1e28efba3feb4918b31c33b1cb3396228ae022a Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Thu, 14 Mar 2024 15:31:07 -0700 Subject: [PATCH 3/9] remove references to unused 'dirty' attribute set on pycrdt.Doc --- jupyter_collaboration/rooms.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/jupyter_collaboration/rooms.py b/jupyter_collaboration/rooms.py index 68a8483c..db394b6a 100644 --- a/jupyter_collaboration/rooms.py +++ b/jupyter_collaboration/rooms.py @@ -155,7 +155,6 @@ async def initialize(self) -> None: read_from_source = True - self._document.dirty = False self.ready = True self._emit(LogLevel.INFO, "initialize", "Room initialized") @@ -205,7 +204,6 @@ async def _on_outofband_change(self) -> None: return self._document.source = model["content"] - self._document.dirty = False def _on_document_change(self, target: str, event: Any) -> None: """ @@ -290,7 +288,6 @@ async def _maybe_save_document(self) -> None: return None self._document.source = model["content"] - self._document.dirty = False self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes while saving.") except Exception as e: From 32082cfc29a0c295930d8a70e237ec4ae9ced76c Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Thu, 14 Mar 2024 15:48:56 -0700 Subject: [PATCH 4/9] correct first commit 'f67596', allowing unit tests to pass --- jupyter_collaboration/rooms.py | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/jupyter_collaboration/rooms.py b/jupyter_collaboration/rooms.py index db394b6a..476e6381 100644 --- a/jupyter_collaboration/rooms.py +++ b/jupyter_collaboration/rooms.py @@ -131,6 +131,19 @@ async def initialize(self) -> None: # YDoc not found in the YStore, create the document from the source file (no change history) pass + if not read_from_source and self._document.source != model["content"]: + # TODO: Delete document from the store. + self._emit( + LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore." + ) + self.log.info( + "Content in file %s is out-of-sync with the ystore %s", + self._file.path, + self.ystore.__class__.__name__, + ) + read_from_source = True + + # if YStore updates and source file are out-of-sync, resync updates with source if read_from_source: self._emit(LogLevel.INFO, "load", "Content loaded from disk.") self.log.info( @@ -140,20 +153,6 @@ async def initialize(self) -> None: if self.ystore: await self.ystore.encode_state_as_update(self.ydoc) - else: - # if YStore updates and source file are out-of-sync, resync updates with source - if self._document.source != model["content"]: - # TODO: Delete document from the store. - self._emit( - LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore." - ) - self.log.info( - "Content in file %s is out-of-sync with the ystore %s", - self._file.path, - self.ystore.__class__.__name__, - ) - read_from_source = True - self.ready = True self._emit(LogLevel.INFO, "initialize", "Room initialized") From 51604bf456119277c8dc33519cadc6f575048b74 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Tue, 19 Mar 2024 17:02:16 -0700 Subject: [PATCH 5/9] save references to scheduled tasks --- jupyter_collaboration/rooms.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/jupyter_collaboration/rooms.py b/jupyter_collaboration/rooms.py index 476e6381..d500c96c 100644 --- a/jupyter_collaboration/rooms.py +++ b/jupyter_collaboration/rooms.py @@ -45,6 +45,9 @@ def __init__( self._cleaner: asyncio.Task | None = None + # the current `self._maybe_save_document()` task. + self._maybe_save_task: asyncio.Task | None = None + # the task currently saving to disk via FileLoader, if any. self._save_task: asyncio.Task | None = None @@ -219,7 +222,17 @@ def _on_document_change(self, target: str, event: Any) -> None: document. This tasks are debounced (60 seconds by default) so we need to cancel previous tasks before creating a new one. """ - asyncio.create_task(self._maybe_save_document()) + if self._maybe_save_task and not self._maybe_save_task.done(): + # only one `self._maybe_save_task` needs to be running. + # if this method is called after the save delay, then we need to set + # `self._should_resave` to `True` to reschedule + # `self._maybe_save_document()` on the event loop after the current + # `self._maybe_save_task` completes. + if not self._waiting_to_save: + self._should_resave = True + return + + self._maybe_save_task = asyncio.create_task(self._maybe_save_document()) async def _maybe_save_document(self) -> None: """ @@ -234,19 +247,6 @@ async def _maybe_save_document(self) -> None: # TODO: fix this; if _save_delay is unset, then this never writes to disk return - if self._waiting_to_save: - # if a previously spawned `self._maybe_save_document()` task is - # waiting to save, then that task will save the Ydoc within - # `self._save_delay` seconds. therefore we can return early. - return - - if self._save_task and not self._save_task.done(): - # if we are currently saving, then set the `_should_resave` - # flag. this indicates to the currently running `self._save_task` - # that it should re-call this method after it completes. - self._should_resave = True - return - # save after `self._save_delay` seconds of inactivity self._waiting_to_save = True await asyncio.sleep(self._save_delay) @@ -271,7 +271,7 @@ async def _maybe_save_document(self) -> None: if self._should_resave: self._should_resave = False - asyncio.create_task(self._maybe_save_document()) + self._maybe_save_task = asyncio.create_task(self._maybe_save_document()) except asyncio.CancelledError: return From 0a345132080b9544715b52dd2f3838d0c3318069 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Tue, 19 Mar 2024 17:14:26 -0700 Subject: [PATCH 6/9] prefer asyncio.create_task() --- jupyter_collaboration/rooms.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/jupyter_collaboration/rooms.py b/jupyter_collaboration/rooms.py index d500c96c..93bc9d0b 100644 --- a/jupyter_collaboration/rooms.py +++ b/jupyter_collaboration/rooms.py @@ -258,8 +258,7 @@ async def _maybe_save_document(self) -> None: # do not write to `self._document` in this `try` block, as that will # trigger the observer and result in a save loop. self.log.info("Saving the content from room %s", self._room_id) - loop = asyncio.get_running_loop() - self._save_task = loop.create_task(self._file.maybe_save_content( + self._save_task = asyncio.create_task(self._file.maybe_save_content( { "format": self._file_format, "type": self._file_type, From 43df0347231a042a597811d2791cab83ccb3b2b7 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Tue, 19 Mar 2024 17:39:56 -0700 Subject: [PATCH 7/9] correct commit 'd1e28ef', set 'dirty' attribute while preventing save loop --- jupyter_collaboration/rooms.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/jupyter_collaboration/rooms.py b/jupyter_collaboration/rooms.py index 93bc9d0b..01a37a8d 100644 --- a/jupyter_collaboration/rooms.py +++ b/jupyter_collaboration/rooms.py @@ -11,6 +11,7 @@ from jupyter_ydoc import ydocs as YDOCS from pycrdt_websocket.websocket_server import YRoom from pycrdt_websocket.ystore import BaseYStore, YDocNotFound +from pycrdt import MapEvent from .loaders import FileLoader from .utils import JUPYTER_COLLABORATION_EVENTS_URI, LogLevel, OutOfBandChanges @@ -157,6 +158,7 @@ async def initialize(self) -> None: if self.ystore: await self.ystore.encode_state_as_update(self.ydoc) + self._document.dirty = False self.ready = True self._emit(LogLevel.INFO, "initialize", "Room initialized") @@ -206,6 +208,7 @@ async def _on_outofband_change(self) -> None: return self._document.source = model["content"] + self._document.dirty = False def _on_document_change(self, target: str, event: Any) -> None: """ @@ -222,6 +225,12 @@ def _on_document_change(self, target: str, event: Any) -> None: document. This tasks are debounced (60 seconds by default) so we need to cancel previous tasks before creating a new one. """ + if target == "state" and isinstance(event, MapEvent) and list(event.keys.keys()) == ["dirty"]: + # do not write when we are just setting the `dirty` attribute to + # `False` for the JupyterLab UI. this prevents a save loop, as this + # is set when the Ydoc is saved. + return + if self._maybe_save_task and not self._maybe_save_task.done(): # only one `self._maybe_save_task` needs to be running. # if this method is called after the save delay, then we need to set @@ -266,6 +275,7 @@ async def _maybe_save_document(self) -> None: } )) await self._save_task + self._document.dirty = False self._emit(LogLevel.INFO, "save", "Content saved.") if self._should_resave: @@ -286,6 +296,7 @@ async def _maybe_save_document(self) -> None: return None self._document.source = model["content"] + self._document.dirty = False self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes while saving.") except Exception as e: From 9f5540063fd8509bc86ab3013c3ecb9e02b8680a Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Wed, 20 Mar 2024 10:48:08 -0700 Subject: [PATCH 8/9] prefer add_done_callback() over recursive coroutine --- jupyter_collaboration/rooms.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/jupyter_collaboration/rooms.py b/jupyter_collaboration/rooms.py index 01a37a8d..2a357587 100644 --- a/jupyter_collaboration/rooms.py +++ b/jupyter_collaboration/rooms.py @@ -242,6 +242,15 @@ def _on_document_change(self, target: str, event: Any) -> None: return self._maybe_save_task = asyncio.create_task(self._maybe_save_document()) + self._maybe_save_task.add_done_callback(self._maybe_save_done_callback) + + def _maybe_save_done_callback(self, _future): + if not self._should_resave: + return + + self._should_resave = False + self._maybe_save_task = asyncio.create_task(self._maybe_save_document()) + self._maybe_save_task.add_done_callback(self._maybe_save_done_callback) async def _maybe_save_document(self) -> None: """ @@ -278,10 +287,6 @@ async def _maybe_save_document(self) -> None: self._document.dirty = False self._emit(LogLevel.INFO, "save", "Content saved.") - if self._should_resave: - self._should_resave = False - self._maybe_save_task = asyncio.create_task(self._maybe_save_document()) - except asyncio.CancelledError: return From 52b1b12640164c05a0c5a37e21ce8fcb7771b318 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Wed, 20 Mar 2024 13:33:08 -0700 Subject: [PATCH 9/9] pre-commit --- jupyter_collaboration/rooms.py | 38 +++++++++++++++++----------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/jupyter_collaboration/rooms.py b/jupyter_collaboration/rooms.py index 2a357587..b2dc3bc3 100644 --- a/jupyter_collaboration/rooms.py +++ b/jupyter_collaboration/rooms.py @@ -9,9 +9,9 @@ from jupyter_events import EventLogger from jupyter_ydoc import ydocs as YDOCS +from pycrdt import MapEvent from pycrdt_websocket.websocket_server import YRoom from pycrdt_websocket.ystore import BaseYStore, YDocNotFound -from pycrdt import MapEvent from .loaders import FileLoader from .utils import JUPYTER_COLLABORATION_EVENTS_URI, LogLevel, OutOfBandChanges @@ -121,9 +121,7 @@ async def initialize(self) -> None: self._emit( LogLevel.INFO, "load", - "Content loaded from the store {}".format( - self.ystore.__class__.__qualname__ - ), + f"Content loaded from the store {self.ystore.__class__.__qualname__}", ) self.log.info( "Content in room %s loaded from the ystore %s", @@ -137,9 +135,7 @@ async def initialize(self) -> None: if not read_from_source and self._document.source != model["content"]: # TODO: Delete document from the store. - self._emit( - LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore." - ) + self._emit(LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore.") self.log.info( "Content in file %s is out-of-sync with the ystore %s", self._file.path, @@ -150,9 +146,7 @@ async def initialize(self) -> None: # if YStore updates and source file are out-of-sync, resync updates with source if read_from_source: self._emit(LogLevel.INFO, "load", "Content loaded from disk.") - self.log.info( - "Content in room %s loaded from file %s", self._room_id, self._file.path - ) + self.log.info("Content in room %s loaded from file %s", self._room_id, self._file.path) self._document.source = model["content"] if self.ystore: @@ -225,7 +219,11 @@ def _on_document_change(self, target: str, event: Any) -> None: document. This tasks are debounced (60 seconds by default) so we need to cancel previous tasks before creating a new one. """ - if target == "state" and isinstance(event, MapEvent) and list(event.keys.keys()) == ["dirty"]: + if ( + target == "state" + and isinstance(event, MapEvent) + and list(event.keys.keys()) == ["dirty"] + ): # do not write when we are just setting the `dirty` attribute to # `False` for the JupyterLab UI. this prevents a save loop, as this # is set when the Ydoc is saved. @@ -243,7 +241,7 @@ def _on_document_change(self, target: str, event: Any) -> None: self._maybe_save_task = asyncio.create_task(self._maybe_save_document()) self._maybe_save_task.add_done_callback(self._maybe_save_done_callback) - + def _maybe_save_done_callback(self, _future): if not self._should_resave: return @@ -276,13 +274,15 @@ async def _maybe_save_document(self) -> None: # do not write to `self._document` in this `try` block, as that will # trigger the observer and result in a save loop. self.log.info("Saving the content from room %s", self._room_id) - self._save_task = asyncio.create_task(self._file.maybe_save_content( - { - "format": self._file_format, - "type": self._file_type, - "content": self._document.source, - } - )) + self._save_task = asyncio.create_task( + self._file.maybe_save_content( + { + "format": self._file_format, + "type": self._file_type, + "content": self._document.source, + } + ) + ) await self._save_task self._document.dirty = False self._emit(LogLevel.INFO, "save", "Content saved.")