Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions jupyter_rtc_core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,9 @@ class RtcExtensionApp(ExtensionApp):
default_value=YRoomManager,
).tag(config=True)

yroom_manager = Instance(
klass=YRoomManager,
help="An instance of the YRoom Manager.",
allow_none=True
).tag(config=True)

@property
def yroom_manager(self) -> YRoomManager | None:
return self.settings.get("yroom_manager", None)

def initialize(self):
super().initialize()
Expand All @@ -59,7 +56,6 @@ def get_fileid_manager():
loop=loop,
log=log
)
pass


def _link_jupyter_server_extension(self, server_app):
Expand All @@ -71,3 +67,9 @@ def _link_jupyter_server_extension(self, server_app):
c.ServerApp.session_manager_class = "jupyter_rtc_core.session_manager.YDocSessionManager"
server_app.update_config(c)
super()._link_jupyter_server_extension(server_app)

async def stop_extension(self):
self.log.info("Stopping `jupyter_rtc_core` server extension.")
if self.yroom_manager:
await self.yroom_manager.stop()
self.log.info("`jupyter_rtc_core` server extension is shut down. Goodbye!")
90 changes: 56 additions & 34 deletions jupyter_rtc_core/rooms/yroom.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,16 @@ def __init__(
# Start observers on `self.ydoc` and `self.awareness` to ensure new
# updates are broadcast to all clients and saved to disk.
self._awareness_subscription = self._awareness.observe(
self.send_server_awareness
self._on_awareness_update
)
self._ydoc_subscription = self._ydoc.observe(
lambda event: self.write_sync_update(event.update)
lambda event: self._on_ydoc_update(event.update)
)

# Initialize message queue and start background task that routes new
# messages in the message queue to the appropriate handler method.
self._message_queue = asyncio.Queue()
self._loop.create_task(self._on_new_message())
self._loop.create_task(self._process_message_queue())

# Log notification that room is ready
self.log.info(f"Room '{self.room_id}' initialized.")
Expand Down Expand Up @@ -144,7 +144,7 @@ def add_message(self, client_id: str, message: bytes) -> None:
self._message_queue.put_nowait((client_id, message))


async def _on_new_message(self) -> None:
async def _process_message_queue(self) -> None:
"""
Async method that only runs when a new message arrives in the message
queue. This method routes the message to a handler method based on the
Expand All @@ -162,41 +162,54 @@ async def _on_new_message(self) -> None:
except asyncio.QueueShutDown:
break

# Handle Awareness messages
# Determine message type & subtype from header
message_type = message[0]
if message_type == YMessageType.AWARENESS:
sync_message_subtype = "*"
if message_type == YMessageType.SYNC and len(message) >= 2:
sync_message_subtype = message[1]

# Determine if message is invalid
invalid_message_type = message_type not in YMessageType
invalid_sync_message_type = message_type == YMessageType.SYNC and sync_message_subtype not in YSyncMessageSubtype
invalid_message = invalid_message_type or invalid_sync_message_type

# Handle invalid messages by logging a warning and ignoring
if invalid_message:
self.log.warning(
"Ignoring an unrecognized message with header "
f"'{message_type},{sync_message_subtype}' from client "
"'{client_id}'. Messages must have one of the following "
"headers: '0,0' (SyncStep1), '0,1' (SyncStep2), "
"'0,2' (SyncUpdate), or '1,*' (AwarenessUpdate)."
)
# Handle Awareness messages
elif message_type == YMessageType.AWARENESS:
self.log.debug(f"Received AwarenessUpdate from '{client_id}'.")
self.handle_awareness_update(client_id, message)
self.log.debug(f"Handled AwarenessUpdate from '{client_id}'.")
continue

# Handle Sync messages
assert message_type == YMessageType.SYNC
message_subtype = message[1] if len(message) >= 2 else None
if message_subtype == YSyncMessageSubtype.SYNC_STEP1:
elif sync_message_subtype == YSyncMessageSubtype.SYNC_STEP1:
self.log.info(f"Received SS1 from '{client_id}'.")
self.handle_sync_step1(client_id, message)
self.log.info(f"Handled SS1 from '{client_id}'.")
continue
elif message_subtype == YSyncMessageSubtype.SYNC_STEP2:
elif sync_message_subtype == YSyncMessageSubtype.SYNC_STEP2:
self.log.info(f"Received SS2 from '{client_id}'.")
self.handle_sync_step2(client_id, message)
self.log.info(f"Handled SS2 from '{client_id}'.")
continue
elif message_subtype == YSyncMessageSubtype.SYNC_UPDATE:
elif sync_message_subtype == YSyncMessageSubtype.SYNC_UPDATE:
self.log.info(f"Received SyncUpdate from '{client_id}'.")
self.handle_sync_update(client_id, message)
self.log.info(f"Handled SyncUpdate from '{client_id}'.")
continue
else:
self.log.warning(
"Ignoring an unrecognized message with header "
f"'{message_type},{message_subtype}' from client "
"'{client_id}'. Messages must have one of the following "
"headers: '0,0' (SyncStep1), '0,1' (SyncStep2), "
"'0,2' (SyncUpdate), or '1,*' (AwarenessUpdate)."
)
continue

# Finally, inform the asyncio Queue that the task was complete
# This is required for `self._message_queue.join()` to unblock once
# queue is empty in `self.stop()`.
self._message_queue.task_done()

self.log.info(
"Stopped `self._process_message_queue()` background task "
f"for YRoom '{self.room_id}'."
)


def handle_sync_step1(self, client_id: str, message: bytes) -> None:
Expand Down Expand Up @@ -281,7 +294,7 @@ def handle_sync_update(self, client_id: str, message: bytes) -> None:
Handles incoming SyncUpdate messages by applying the update to the YDoc.

A SyncUpdate message will automatically be broadcast to all synced
clients after this method is called via the `self.write_sync_update()`
clients after this method is called via the `self._on_ydoc_update()`
observer.
"""
# Remove client and kill websocket if received SyncUpdate when client is desynced
Expand All @@ -302,7 +315,7 @@ def handle_sync_update(self, client_id: str, message: bytes) -> None:
return


def write_sync_update(self, message_payload: bytes, client_id: str | None = None) -> None:
def _on_ydoc_update(self, message_payload: bytes, client_id: str | None = None) -> None:
"""
This method is an observer on `self.ydoc` which:

Expand Down Expand Up @@ -377,9 +390,10 @@ def _broadcast_message(self, message: bytes, message_type: Literal['AwarenessUpd
)
self.log.exception(e)

def send_server_awareness(self, type: str, changes: tuple[dict[str, Any], Any]) -> None:
def _on_awareness_update(self, type: str, changes: tuple[dict[str, Any], Any]) -> None:
"""
Callback to broadcast the server awareness to clients.
Observer on `self.awareness` that broadcasts AwarenessUpdate messages to
all clients on update.

Arguments:
type: The change type.
Expand All @@ -393,14 +407,22 @@ def send_server_awareness(self, type: str, changes: tuple[dict[str, Any], Any])
message = pycrdt.create_awareness_message(state)
self._broadcast_message(message, "AwarenessUpdate")

def stop(self) -> None:
async def stop(self) -> None:
"""
Stop the YRoom.

TODO: stop file API & stop the message processing loop
Stops the YRoom gracefully.
"""
# First, disconnect all clients by stopping the client group.
self.clients.stop()

# Remove all observers, as updates no longer need to be broadcast
self._ydoc.unobserve(self._ydoc_subscription)
self._awareness.unobserve(self._awareness_subscription)

return
# Finish processing all messages, then stop the queue to end the
# `_process_message_queue()` background task.
await self._message_queue.join()
self._message_queue.shutdown(immediate=True)

# Finally, stop FileAPI and return. This saves the final content of the
# JupyterYDoc in the process.
await self.file_api.stop()
86 changes: 62 additions & 24 deletions jupyter_rtc_core/rooms/yroom_file_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,38 +161,76 @@ def schedule_save(self) -> None:


async def _process_scheduled_saves(self) -> None:
"""
Defines a background task that processes scheduled saves, after waiting
for the JupyterYDoc content to be loaded.
"""

# Wait for content to be loaded before processing scheduled saves
await self._ydoc_content_loaded.wait()

while True:
try:
await self._scheduled_saves.get()
except asyncio.QueueShutDown:
return
break

await self._save_jupyter_ydoc()

try:
assert self.jupyter_ydoc
path = self.get_path()
content = self.jupyter_ydoc.source
file_format = self.file_format
file_type = self.file_type if self.file_type in SAVEABLE_FILE_TYPES else "file"

# Save the YDoc via the ContentsManager
await ensure_async(self._contents_manager.save(
{
"format": file_format,
"type": file_type,
"content": content,
},
path
))

# Mark 'dirty' as `False`. This hides the "unsaved changes" icon
# in the JupyterLab tab rendering this YDoc in the frontend.
self.jupyter_ydoc.dirty = False
except Exception as e:
self.log.error("An exception occurred when saving JupyterYDoc.")
self.log.exception(e)
self.log.info(
"Stopped `self._process_scheduled_save()` background task "
f"for YRoom '{self.room_id}'."
)


async def _save_jupyter_ydoc(self):
"""
Saves the JupyterYDoc to disk immediately.

This is a private method, and should only be called through the
`_process_scheduled_saves()` task and the `stop()` method. Consumers
should instead call `schedule_save()` to save the document.
"""
try:
assert self.jupyter_ydoc
path = self.get_path()
content = self.jupyter_ydoc.source
file_format = self.file_format
file_type = self.file_type if self.file_type in SAVEABLE_FILE_TYPES else "file"

# Save the YDoc via the ContentsManager
await ensure_async(self._contents_manager.save(
{
"format": file_format,
"type": file_type,
"content": content,
},
path
))

# Mark 'dirty' as `False`. This hides the "unsaved changes" icon
# in the JupyterLab tab rendering this YDoc in the frontend.
self.jupyter_ydoc.dirty = False
except Exception as e:
self.log.error("An exception occurred when saving JupyterYDoc.")
self.log.exception(e)

async def stop(self) -> None:
"""
Gracefully stops the YRoomFileAPI, saving the content of
`self.jupyter_ydoc` before exiting.
"""
# Stop the `self._process_scheduled_saves()` background task
# immediately. This is safe since we save before stopping anyways.
self._scheduled_saves.shutdown(immediate=True)

# Do nothing if content was not loaded first. This prevents overwriting
# the existing file with an empty JupyterYDoc.
if not (self._ydoc_content_loaded.is_set()):
return

# Save the file and return.
await self._save_jupyter_ydoc()


# see https://github.com/jupyterlab/jupyter-collaboration/blob/main/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py#L146-L149
Expand Down
28 changes: 19 additions & 9 deletions jupyter_rtc_core/rooms/yroom_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from __future__ import annotations
from typing import Any, Dict, Optional
from traitlets import HasTraits, Instance, default

from .yroom import YRoom
from typing import TYPE_CHECKING
Expand All @@ -27,8 +25,9 @@ def __init__(
self.contents_manager = contents_manager
self.loop = loop
self.log = log
self._rooms_by_id = {}

# Initialize dictionary of YRooms, keyed by room ID
self._rooms_by_id = {}


@property
Expand Down Expand Up @@ -66,16 +65,27 @@ def get_room(self, room_id: str) -> YRoom | None:
return None


def delete_room(self, room_id: str) -> None:
async def delete_room(self, room_id: str) -> None:
"""
Deletes a YRoom given a room ID.

TODO: finish implementing YRoom.stop(), and delete empty rooms w/ no
live kernels automatically in a background task.
Gracefully deletes a YRoom given a room ID. This stops the YRoom first,
which finishes applying all updates & saves the content automatically.
"""
yroom = self._rooms_by_id.get(room_id, None)
if not yroom:
return

yroom.stop()
self.log.info(f"Stopping YRoom '{room_id}'.")
await yroom.stop()
self.log.info(f"Stopped YRoom '{room_id}'.")
del self._rooms_by_id[room_id]


async def stop(self) -> None:
"""
Gracefully deletes each `YRoom`. See `delete_room()` for more info.
"""
self.log.info(f"Stopping `YRoomManager` and deleting all YRooms.")
room_ids = list(self._rooms_by_id.keys())
for room_id in room_ids:
await self.delete_room(room_id)
self.log.info(f"Stopped `YRoomManager` and deleted all YRooms.")
Loading
Loading