diff --git a/jupyter_collaboration/rooms.py b/jupyter_collaboration/rooms.py index e4e83264..b2dc3bc3 100644 --- a/jupyter_collaboration/rooms.py +++ b/jupyter_collaboration/rooms.py @@ -9,6 +9,7 @@ from jupyter_events import EventLogger from jupyter_ydoc import ydocs as YDOCS +from pycrdt import MapEvent from pycrdt_websocket.websocket_server import YRoom from pycrdt_websocket.ystore import BaseYStore, YDocNotFound @@ -43,11 +44,27 @@ def __init__( self._logger = logger self._save_delay = save_delay - self._update_lock = asyncio.Lock() - self._initialization_lock = asyncio.Lock() self._cleaner: asyncio.Task | None = None - self._saving_document: asyncio.Task | None = None - self._messages: dict[str, asyncio.Lock] = {} + + # the current `self._maybe_save_document()` task. + self._maybe_save_task: asyncio.Task | None = None + + # the task currently saving to disk via FileLoader, if any. + self._save_task: asyncio.Task | None = None + + # flag that indicates whether a previous call to + # `self._maybe_save_document()` is waiting to save. + # + # if `self._maybe_save_document()` is called while this flag is `True`, + # then the call does nothing, as a previous task will save the Ydoc + # within `self._save_delay` seconds. + self._waiting_to_save = False + + # flag that indicates whether a previous call to + # `self._maybe_save_document()` should call itself again after + # `self._save_task` finishes. this is set to `True` when a document + # update occurs while `self._save_task` is running. + self._should_resave = False # Listen for document changes self._document.observe(self._on_document_change) @@ -78,7 +95,8 @@ async def initialize(self) -> None: """ Initializes the room. - This method is thread safe so only one client can initialize the room. + This method is not coroutine-safe, i.e. consumers must await this method + before calling any other methods. To initialize the room, we check if the content was already in the store as a Y updates and if it is up to date with the content on disk. In this @@ -89,64 +107,54 @@ async def initialize(self) -> None: It is important to set the ready property in the parent class (`self.ready = True`), this setter will subscribe for updates on the shared document. """ - async with self._initialization_lock: - if self.ready: # type: ignore[has-type] - return + if self.ready: # type: ignore[has-type] + return - self.log.info("Initializing room %s", self._room_id) + self.log.info("Initializing room %s", self._room_id) + model = await self._file.load_content(self._file_format, self._file_type) - model = await self._file.load_content(self._file_format, self._file_type) + # try to apply Y updates from the YStore for this document + read_from_source = True + if self.ystore is not None: + try: + await self.ystore.apply_updates(self.ydoc) + self._emit( + LogLevel.INFO, + "load", + f"Content loaded from the store {self.ystore.__class__.__qualname__}", + ) + self.log.info( + "Content in room %s loaded from the ystore %s", + self._room_id, + self.ystore.__class__.__name__, + ) + read_from_source = False + except YDocNotFound: + # YDoc not found in the YStore, create the document from the source file (no change history) + pass + + if not read_from_source and self._document.source != model["content"]: + # TODO: Delete document from the store. + self._emit(LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore.") + self.log.info( + "Content in file %s is out-of-sync with the ystore %s", + self._file.path, + self.ystore.__class__.__name__, + ) + read_from_source = True + + # if YStore updates and source file are out-of-sync, resync updates with source + if read_from_source: + self._emit(LogLevel.INFO, "load", "Content loaded from disk.") + self.log.info("Content in room %s loaded from file %s", self._room_id, self._file.path) + self._document.source = model["content"] - async with self._update_lock: - # try to apply Y updates from the YStore for this document - read_from_source = True - if self.ystore is not None: - try: - await self.ystore.apply_updates(self.ydoc) - self._emit( - LogLevel.INFO, - "load", - "Content loaded from the store {}".format( - self.ystore.__class__.__qualname__ - ), - ) - self.log.info( - "Content in room %s loaded from the ystore %s", - self._room_id, - self.ystore.__class__.__name__, - ) - read_from_source = False - except YDocNotFound: - # YDoc not found in the YStore, create the document from the source file (no change history) - pass - - if not read_from_source: - # if YStore updates and source file are out-of-sync, resync updates with source - if self._document.source != model["content"]: - # TODO: Delete document from the store. - self._emit( - LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore." - ) - self.log.info( - "Content in file %s is out-of-sync with the ystore %s", - self._file.path, - self.ystore.__class__.__name__, - ) - read_from_source = True - - if read_from_source: - self._emit(LogLevel.INFO, "load", "Content loaded from disk.") - self.log.info( - "Content in room %s loaded from file %s", self._room_id, self._file.path - ) - self._document.source = model["content"] - - if self.ystore: - await self.ystore.encode_state_as_update(self.ydoc) - - self._document.dirty = False - self.ready = True - self._emit(LogLevel.INFO, "initialize", "Room initialized") + if self.ystore: + await self.ystore.encode_state_as_update(self.ydoc) + + self._document.dirty = False + self.ready = True + self._emit(LogLevel.INFO, "initialize", "Room initialized") def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None: data = {"level": level.value, "room": self._room_id, "path": self._file.path} @@ -165,8 +173,8 @@ def stop(self) -> None: """ super().stop() # TODO: Should we cancel or wait ? - if self._saving_document: - self._saving_document.cancel() + if self._save_task: + self._save_task.cancel() self._document.unobserve() self._file.unobserve(self.room_id) @@ -193,9 +201,8 @@ async def _on_outofband_change(self) -> None: self._emit(LogLevel.ERROR, None, msg) return - async with self._update_lock: - self._document.source = model["content"] - self._document.dirty = False + self._document.source = model["content"] + self._document.dirty = False def _on_document_change(self, target: str, event: Any) -> None: """ @@ -212,47 +219,72 @@ def _on_document_change(self, target: str, event: Any) -> None: document. This tasks are debounced (60 seconds by default) so we need to cancel previous tasks before creating a new one. """ - if self._update_lock.locked(): + if ( + target == "state" + and isinstance(event, MapEvent) + and list(event.keys.keys()) == ["dirty"] + ): + # do not write when we are just setting the `dirty` attribute to + # `False` for the JupyterLab UI. this prevents a save loop, as this + # is set when the Ydoc is saved. return - self._saving_document = asyncio.create_task( - self._maybe_save_document(self._saving_document) - ) + if self._maybe_save_task and not self._maybe_save_task.done(): + # only one `self._maybe_save_task` needs to be running. + # if this method is called after the save delay, then we need to set + # `self._should_resave` to `True` to reschedule + # `self._maybe_save_document()` on the event loop after the current + # `self._maybe_save_task` completes. + if not self._waiting_to_save: + self._should_resave = True + return + + self._maybe_save_task = asyncio.create_task(self._maybe_save_document()) + self._maybe_save_task.add_done_callback(self._maybe_save_done_callback) + + def _maybe_save_done_callback(self, _future): + if not self._should_resave: + return + + self._should_resave = False + self._maybe_save_task = asyncio.create_task(self._maybe_save_document()) + self._maybe_save_task.add_done_callback(self._maybe_save_done_callback) - async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> None: + async def _maybe_save_document(self) -> None: """ Saves the content of the document to disk. ### Note: - There is a save delay to debounce the save since we could receive a high + There is a save delay to throttle the save since we could receive a high amount of changes in a short period of time. This way we can cancel the previous save. """ if self._save_delay is None: + # TODO: fix this; if _save_delay is unset, then this never writes to disk return - if saving_document is not None and not saving_document.done(): - # the document is being saved, cancel that - saving_document.cancel() + # save after `self._save_delay` seconds of inactivity + self._waiting_to_save = True + await asyncio.sleep(self._save_delay) + self._waiting_to_save = False # all async code (i.e. await statements) must be part of this try/except block # because this coroutine is run in a cancellable task and cancellation is handled here - try: - # save after X seconds of inactivity - await asyncio.sleep(self._save_delay) - + # do not write to `self._document` in this `try` block, as that will + # trigger the observer and result in a save loop. self.log.info("Saving the content from room %s", self._room_id) - await self._file.maybe_save_content( - { - "format": self._file_format, - "type": self._file_type, - "content": self._document.source, - } + self._save_task = asyncio.create_task( + self._file.maybe_save_content( + { + "format": self._file_format, + "type": self._file_type, + "content": self._document.source, + } + ) ) - async with self._update_lock: - self._document.dirty = False - + await self._save_task + self._document.dirty = False self._emit(LogLevel.INFO, "save", "Content saved.") except asyncio.CancelledError: @@ -268,10 +300,8 @@ async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> No self._emit(LogLevel.ERROR, None, msg) return None - async with self._update_lock: - self._document.source = model["content"] - self._document.dirty = False - + self._document.source = model["content"] + self._document.dirty = False self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes while saving.") except Exception as e: