diff --git a/jupyter_rtc_core/rooms/yroom.py b/jupyter_rtc_core/rooms/yroom.py index fac8655..4c9d93d 100644 --- a/jupyter_rtc_core/rooms/yroom.py +++ b/jupyter_rtc_core/rooms/yroom.py @@ -39,8 +39,16 @@ class YRoom: """Event loop""" _client_group: YjsClientGroup """Client group to manage synced and desynced clients""" - _message_queue: asyncio.Queue[Tuple[str, bytes]] - """A message queue per room to keep websocket messages in order""" + _message_queue: asyncio.Queue[Tuple[str, bytes] | None] + """ + A per-room message queue that stores new messages from clients to process + them in order. If a tuple `(client_id, message)` is enqueued, the message is + queued for processing. If `None` is enqueued, the processing of the message + queue is halted. + + The `self._process_message_queue()` background task can be halted by running + `self._message_queue.put_nowait(None)`. + """ _awareness_subscription: pycrdt.Subscription """Subscription to awareness changes.""" _ydoc_subscription: pycrdt.Subscription @@ -168,6 +176,9 @@ async def _process_message_queue(self) -> None: queue. This method routes the message to a handler method based on the message type & subtype, which are obtained from the first 2 bytes of the message. + + This task can be halted by calling + `self._message_queue.put_nowait(None)`. """ # Wait for content to be loaded before processing any messages in the # message queue @@ -176,10 +187,15 @@ async def _process_message_queue(self) -> None: # Begin processing messages from the message queue while True: - try: - client_id, message = await self._message_queue.get() - except asyncio.QueueShutDown: + # Wait for next item in the message queue + queue_item = await self._message_queue.get() + + # If the next item is `None`, break the loop and stop this task + if queue_item is None: break + + # Otherwise, process the new message + client_id, message = queue_item # Determine message type & subtype from header message_type = message[0] @@ -471,7 +487,7 @@ async def stop(self) -> None: # Finish processing all messages, then stop the queue to end the # `_process_message_queue()` background task. await self._message_queue.join() - self._message_queue.shutdown(immediate=True) + self._message_queue.put_nowait(None) # Finally, stop FileAPI and return. This saves the final content of the # JupyterYDoc in the process. diff --git a/jupyter_rtc_core/rooms/yroom_file_api.py b/jupyter_rtc_core/rooms/yroom_file_api.py index bdd433a..9d56076 100644 --- a/jupyter_rtc_core/rooms/yroom_file_api.py +++ b/jupyter_rtc_core/rooms/yroom_file_api.py @@ -5,7 +5,7 @@ """ from __future__ import annotations -from typing import TYPE_CHECKING, Literal +from typing import TYPE_CHECKING import asyncio import pycrdt from jupyter_ydoc.ybasedoc import YBaseDoc @@ -14,7 +14,7 @@ import os if TYPE_CHECKING: - from typing import Awaitable + from typing import Awaitable, Literal from jupyter_server_fileid.manager import BaseFileIdManager from jupyter_server.services.contents.manager import AsyncContentsManager, ContentsManager @@ -45,7 +45,15 @@ class YRoomFileAPI: _loop: asyncio.AbstractEventLoop _ydoc_content_loading: False _ydoc_content_loaded: asyncio.Event - _scheduled_saves: asyncio.Queue[None] + _scheduled_saves: asyncio.Queue[Literal[0] | None] + """ + Queue of size 1, which may store `0` or `None`. If `0` is enqueued, another + save will occur after the current save is complete. If `None` is enqueued, + the processing of this queue is halted. + + The `self._process_scheduled_saves()` background task can be halted by + running `self._scheduled_saves.put_nowait(None)`. + """ def __init__( self, @@ -157,7 +165,7 @@ def schedule_save(self) -> None: """ assert self.jupyter_ydoc if not self._scheduled_saves.full(): - self._scheduled_saves.put_nowait(None) + self._scheduled_saves.put_nowait(0) async def _process_scheduled_saves(self) -> None: @@ -170,12 +178,13 @@ async def _process_scheduled_saves(self) -> None: await self._ydoc_content_loaded.wait() while True: - try: - await self._scheduled_saves.get() - except asyncio.QueueShutDown: + queue_item = await self._scheduled_saves.get() + if queue_item is None: + self._scheduled_saves.task_done() break await self._save_jupyter_ydoc() + self._scheduled_saves.task_done() self.log.info( "Stopped `self._process_scheduled_save()` background task " @@ -221,14 +230,9 @@ async def stop(self) -> None: `self.jupyter_ydoc` before exiting. """ # Stop the `self._process_scheduled_saves()` background task - # immediately. This is safe since we save before stopping anyways. - self._scheduled_saves.shutdown(immediate=True) + await self._scheduled_saves.join() + self._scheduled_saves.put_nowait(None) - # Do nothing if content was not loaded first. This prevents overwriting - # the existing file with an empty JupyterYDoc. - if not (self._ydoc_content_loaded.is_set()): - return - # Save the file and return. await self._save_jupyter_ydoc()