Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions jupyter_rtc_core/rooms/yroom.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,16 @@ class YRoom:
"""Event loop"""
_client_group: YjsClientGroup
"""Client group to manage synced and desynced clients"""
_message_queue: asyncio.Queue[Tuple[str, bytes]]
"""A message queue per room to keep websocket messages in order"""
_message_queue: asyncio.Queue[Tuple[str, bytes] | None]
"""
A per-room message queue that stores new messages from clients to process
them in order. If a tuple `(client_id, message)` is enqueued, the message is
queued for processing. If `None` is enqueued, the processing of the message
queue is halted.

The `self._process_message_queue()` background task can be halted by running
`self._message_queue.put_nowait(None)`.
"""
_awareness_subscription: pycrdt.Subscription
"""Subscription to awareness changes."""
_ydoc_subscription: pycrdt.Subscription
Expand Down Expand Up @@ -168,6 +176,9 @@ async def _process_message_queue(self) -> None:
queue. This method routes the message to a handler method based on the
message type & subtype, which are obtained from the first 2 bytes of the
message.

This task can be halted by calling
`self._message_queue.put_nowait(None)`.
"""
# Wait for content to be loaded before processing any messages in the
# message queue
Expand All @@ -176,10 +187,15 @@ async def _process_message_queue(self) -> None:

# Begin processing messages from the message queue
while True:
try:
client_id, message = await self._message_queue.get()
except asyncio.QueueShutDown:
# Wait for next item in the message queue
queue_item = await self._message_queue.get()

# If the next item is `None`, break the loop and stop this task
if queue_item is None:
break

# Otherwise, process the new message
client_id, message = queue_item

# Determine message type & subtype from header
message_type = message[0]
Expand Down Expand Up @@ -471,7 +487,7 @@ async def stop(self) -> None:
# Finish processing all messages, then stop the queue to end the
# `_process_message_queue()` background task.
await self._message_queue.join()
self._message_queue.shutdown(immediate=True)
self._message_queue.put_nowait(None)

# Finally, stop FileAPI and return. This saves the final content of the
# JupyterYDoc in the process.
Expand Down
32 changes: 18 additions & 14 deletions jupyter_rtc_core/rooms/yroom_file_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"""

from __future__ import annotations
from typing import TYPE_CHECKING, Literal
from typing import TYPE_CHECKING
import asyncio
import pycrdt
from jupyter_ydoc.ybasedoc import YBaseDoc
Expand All @@ -14,7 +14,7 @@
import os

if TYPE_CHECKING:
from typing import Awaitable
from typing import Awaitable, Literal
from jupyter_server_fileid.manager import BaseFileIdManager
from jupyter_server.services.contents.manager import AsyncContentsManager, ContentsManager

Expand Down Expand Up @@ -45,7 +45,15 @@ class YRoomFileAPI:
_loop: asyncio.AbstractEventLoop
_ydoc_content_loading: False
_ydoc_content_loaded: asyncio.Event
_scheduled_saves: asyncio.Queue[None]
_scheduled_saves: asyncio.Queue[Literal[0] | None]
"""
Queue of size 1, which may store `0` or `None`. If `0` is enqueued, another
save will occur after the current save is complete. If `None` is enqueued,
the processing of this queue is halted.

The `self._process_scheduled_saves()` background task can be halted by
running `self._scheduled_saves.put_nowait(None)`.
"""

def __init__(
self,
Expand Down Expand Up @@ -157,7 +165,7 @@ def schedule_save(self) -> None:
"""
assert self.jupyter_ydoc
if not self._scheduled_saves.full():
self._scheduled_saves.put_nowait(None)
self._scheduled_saves.put_nowait(0)


async def _process_scheduled_saves(self) -> None:
Expand All @@ -170,12 +178,13 @@ async def _process_scheduled_saves(self) -> None:
await self._ydoc_content_loaded.wait()

while True:
try:
await self._scheduled_saves.get()
except asyncio.QueueShutDown:
queue_item = await self._scheduled_saves.get()
if queue_item is None:
self._scheduled_saves.task_done()
break

await self._save_jupyter_ydoc()
self._scheduled_saves.task_done()

self.log.info(
"Stopped `self._process_scheduled_save()` background task "
Expand Down Expand Up @@ -221,14 +230,9 @@ async def stop(self) -> None:
`self.jupyter_ydoc` before exiting.
"""
# Stop the `self._process_scheduled_saves()` background task
# immediately. This is safe since we save before stopping anyways.
self._scheduled_saves.shutdown(immediate=True)
await self._scheduled_saves.join()
self._scheduled_saves.put_nowait(None)

# Do nothing if content was not loaded first. This prevents overwriting
# the existing file with an empty JupyterYDoc.
if not (self._ydoc_content_loaded.is_set()):
return

Comment on lines -227 to -231
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lines were never called, since the _process_scheduled_saves() task always waits on the content to be loaded. Since the server won't stop until all of the background tasks are stopped, the content is always loaded before shutdown and we don't need to check its status.

We can add a way to cancel the initial content loading while the server is shutting down in a future PR. That change would only speed up server shutdown by 1-100 ms, so I don't think it's a priority for us right now.

# Save the file and return.
await self._save_jupyter_ydoc()

Expand Down
Loading