Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 74 additions & 6 deletions jupyter_server_documents/rooms/yroom.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ class YRoom:
_ydoc_subscription: pycrdt.Subscription
"""Subscription to YDoc changes."""

_on_stop: callable[[], Any] | None
"""
Callback to run after stopping, provided in the constructor.
"""

_fileid_manager: BaseFileIdManager
_contents_manager: AsyncContentsManager | ContentsManager

Expand All @@ -78,13 +83,15 @@ def __init__(
loop: asyncio.AbstractEventLoop,
fileid_manager: BaseFileIdManager,
contents_manager: AsyncContentsManager | ContentsManager,
on_stop: callable[[], Any] | None = None
):
# Bind instance attributes
self.room_id = room_id
self.log = log
self._loop = loop
self._fileid_manager = fileid_manager
self._contents_manager = contents_manager
self._on_stop = on_stop

# Initialize YjsClientGroup, YDoc, YAwareness, JupyterYDoc
self._client_group = YjsClientGroup(room_id=room_id, log=self.log, loop=self._loop)
Expand All @@ -106,7 +113,9 @@ def __init__(
loop=self._loop,
fileid_manager=self._fileid_manager,
contents_manager=self._contents_manager,
on_outofband_change=self.reload_ydoc
on_outofband_change=self.reload_ydoc,
on_outofband_move=self.handle_outofband_move,
on_inband_deletion=self.handle_inband_deletion
)

# Load the YDoc content after initializing
Expand Down Expand Up @@ -589,7 +598,9 @@ def reload_ydoc(self) -> None:
loop=self._loop,
fileid_manager=self._fileid_manager,
contents_manager=self._contents_manager,
on_outofband_change=self.reload_ydoc
on_outofband_change=self.reload_ydoc,
on_outofband_move=self.handle_outofband_move,
on_inband_deletion=self.handle_inband_deletion
)
self.file_api.load_ydoc_content()

Expand All @@ -603,9 +614,63 @@ def reload_ydoc(self) -> None:
self._jupyter_ydoc.observe(self._on_jupyter_ydoc_update)


def handle_outofband_move(self) -> None:
"""
Handles an out-of-band move/deletion by stopping the YRoom immediately
with close code 4001.
"""
self.stop_immediately(close_code=4001)


def handle_inband_deletion(self) -> None:
"""
Handles an in-band file deletion by stopping the YRoom immediately with
close code 4002.
"""
self.stop_immediately(close_code=4002)


def stop_immediately(self, close_code: int) -> None:
"""
Stops the YRoom immediately, closing all Websockets with the given
`close_code`. This is similar to `self.stop()` with some key
differences:

- This does not apply any pending YDoc updates from other clients.
- This does not save the file before exiting.

This should be reserved for scenarios where it does not make sense to
apply pending updates or save the file, e.g. when the file has been
deleted from disk.
"""
# Disconnect all clients with given `close_code`
self.clients.stop(close_code=close_code)

# Remove all observers
self._ydoc.unobserve(self._ydoc_subscription)
self._awareness.unobserve(self._awareness_subscription)

# Purge the message queue immediately, dropping all queued messages
while not self._message_queue.empty():
self._message_queue.get_nowait()
self._message_queue.task_done()

# Enqueue `None` to stop the `_process_message_queue()` background task
self._message_queue.put_nowait(None)

# Stop FileAPI immediately (without saving)
if self.file_api:
self.file_api.stop()

# Finally, run the provided callback (if any) and return
if self._on_stop:
self._on_stop()


async def stop(self) -> None:
"""
Stops the YRoom gracefully.
Stops the YRoom gracefully by disconnecting all clients with close code
1001, applying all pending updates, and saving the YDoc before exiting.
"""
# First, disconnect all clients by stopping the client group.
self.clients.stop()
Expand All @@ -616,16 +681,19 @@ async def stop(self) -> None:
if self._jupyter_ydoc:
self._jupyter_ydoc.unobserve()

# Finish processing all messages, then stop the queue to end the
# Finish processing all messages, then enqueue `None` to stop the
# `_process_message_queue()` background task.
await self._message_queue.join()
self._message_queue.put_nowait(None)

# Finally, stop FileAPI and return. This saves the final content of the
# JupyterYDoc in the process.
# Stop FileAPI, saving the content before doing so
if self.file_api:
await self.file_api.stop_then_save()

# Finally, run the provided callback (if any) and return
if self._on_stop:
self._on_stop()

def should_ignore_state_update(event: pycrdt.MapEvent) -> bool:
"""
Returns whether an update to the `state` dictionary should be ignored by
Expand Down
135 changes: 109 additions & 26 deletions jupyter_server_documents/rooms/yroom_file_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from jupyter_ydoc.ybasedoc import YBaseDoc
from jupyter_server.utils import ensure_async
import logging
from tornado.web import HTTPError

if TYPE_CHECKING:
from typing import Any, Callable, Literal
Expand Down Expand Up @@ -45,7 +46,33 @@ class YRoomFileAPI:
_save_scheduled: bool
_ydoc_content_loading: bool
_ydoc_content_loaded: asyncio.Event

_last_modified: datetime | None
"""
The last file modified timestamp known to this instance. If this value
changes unexpectedly, that indicates an out-of-band change to the file.
"""

_last_path: str | None
"""
The last file path known to this instance. If this value changes
unexpectedly, that indicates an out-of-band move/deletion on the file.
"""

_on_outofband_change: Callable[[], Any]
"""
The callback to run when an out-of-band file change is detected.
"""

_on_outofband_move: Callable[[], Any]
"""
The callback to run when an out-of-band file move/deletion is detected.
"""

_on_inband_deletion: Callable[[], Any]
"""
The callback to run when an in-band move file deletion is detected.
"""

_save_loop_task: asyncio.Task

Expand All @@ -58,7 +85,9 @@ def __init__(
fileid_manager: BaseFileIdManager,
contents_manager: AsyncContentsManager | ContentsManager,
loop: asyncio.AbstractEventLoop,
on_outofband_change: Callable[[], Any]
on_outofband_change: Callable[[], Any],
on_outofband_move: Callable[[], Any],
on_inband_deletion: Callable[[], Any]
):
# Bind instance attributes
self.room_id = room_id
Expand All @@ -69,7 +98,10 @@ def __init__(
self._fileid_manager = fileid_manager
self._contents_manager = contents_manager
self._on_outofband_change = on_outofband_change
self._on_outofband_move = on_outofband_move
self._on_inband_deletion = on_inband_deletion
self._save_scheduled = False
self._last_path = None
self._last_modified = None

# Initialize loading & loaded states
Expand All @@ -80,21 +112,15 @@ def __init__(
self._save_loop_task = self._loop.create_task(self._watch_file())


def get_path(self) -> str:
def get_path(self) -> str | None:
"""
Returns the relative path to the file by querying the FileIdManager. The
path is relative to the `ServerApp.root_dir` configurable trait.

Raises a `RuntimeError` if the file ID does not refer to a valid file
path.
"""
rel_path = self._fileid_manager.get_path(self.file_id)
if not rel_path:
raise RuntimeError(
f"Unable to locate file with ID: '{self.file_id}'."
)

return rel_path
return self._fileid_manager.get_path(self.file_id)


@property
Expand Down Expand Up @@ -129,14 +155,19 @@ def load_ydoc_content(self) -> None:
if self._ydoc_content_loaded.is_set() or self._ydoc_content_loading:
return

self.log.info(f"Loading content for room ID '{self.room_id}'.")
self._ydoc_content_loading = True
self._loop.create_task(self._load_ydoc_content())


async def _load_ydoc_content(self) -> None:
# Load the content of the file from the given file ID.
# Get the path specified on the file ID
path = self.get_path()
if not path:
raise RuntimeError(f"Could not find path for room '{self.room_id}'.")
self._last_path = path

# Load the content of the file from the path
self.log.info(f"Loading content for room ID '{self.room_id}', found at path: '{path}'.")
file_data = await ensure_async(self._contents_manager.get(
path,
type=self.file_type,
Expand Down Expand Up @@ -168,7 +199,8 @@ def schedule_save(self) -> None:

async def _watch_file(self) -> None:
"""
Defines a background task that continuously saves the YDoc every 500ms.
Defines a background task that continuously saves the YDoc every 500ms,
checking for out-of-band changes before doing so.

Note that consumers must call `self.schedule_save()` for the next tick
of this task to save.
Expand All @@ -180,7 +212,7 @@ async def _watch_file(self) -> None:
while True:
try:
await asyncio.sleep(0.5)
await self._check_oob_changes()
await self._check_file()
if self._save_scheduled:
# `asyncio.shield()` prevents the save task from being
# cancelled halfway and corrupting the file. We need to
Expand All @@ -204,26 +236,77 @@ async def _watch_file(self) -> None:
f"for YRoom '{self.room_id}'."
)

async def _check_oob_changes(self):
async def _check_file(self):
"""
Checks for out-of-band changes. Called in the `self._watch_file()`
background task.

Calls the `on_outofband_change()` function passed to the constructor if
an out-of-band change is detected. This is guaranteed to always run
before each save through the `ContentsManager`.
Checks for in-band/out-of-band file operations in the
`self._watch_file()` background task. This is guaranteed to always run
before each save in `self._watch_file()` This detects the following
events and acts in response:

- In-band move: logs warning (no handling needed)
- In-band deletion: calls `self._on_inband_deletion()`
- Out-of-band move/deletion: calls `self._on_outofband_move()`
- Out-of-band change: calls `self._on_outofband_change()`
"""
# Build arguments to `CM.get()`
# Ensure that the last known path is defined. This should always be set
# by `load_ydoc_content()`.
if not self._last_path:
raise RuntimeError(f"No last known path for '{self.room_id}'. This should never happen.")

# Get path. If the path does not match the last known path, the file was
# moved/deleted in-band via the `ContentsManager`, as it was detected by
# `jupyter_server_fileid.manager:ArbitraryFileIdManager`.
# If this happens, run the designated callback and return early.
path = self.get_path()
if path != self._last_path:
if path:
self.log.warning(
f"File was moved to '{path}'. "
f"Room ID: '{self.room_id}', "
f"Last known path: '{self._last_path}'."
)
else:
self.log.warning(
"File was deleted. "
f"Room ID: '{self.room_id}', "
f"Last known path: '{self._last_path}'."
)
self._on_inband_deletion()
return

# Otherwise, set the last known path
self._last_path = path

# Build arguments to `CM.get()`
file_format = self.file_format
file_type = self.file_type if self.file_type in SAVEABLE_FILE_TYPES else "file"

# Check for out-of-band file changes
file_data = await ensure_async(self._contents_manager.get(
path=path, format=file_format, type=file_type, content=False
))
# Get the file metadata from the `ContentsManager`.
# If this raises `HTTPError(404)`, that indicates the file was
# moved/deleted out-of-band.
try:
file_data = await ensure_async(self._contents_manager.get(
path=path, format=file_format, type=file_type, content=False
))
except HTTPError as e:
# If not 404, re-raise the exception as it is unknown
if (e.status_code != 404):
raise e

# Otherwise, this indicates the file was moved/deleted out-of-band.
# Run the designated callback and return early.
self.log.warning(
"File was deleted out-of-band. "
f"Room ID: '{self.room_id}', "
f"Last known path: '{self._last_path}'."
)
self._on_outofband_move()
return


# If an out-of-band file change is detected, run the designated callback
# Finally, if the file was not moved/deleted, check for out-of-band
# changes to the file content using the metadata.
# If an out-of-band file change is detected, run the designated callback.
if self._last_modified != file_data['last_modified']:
self.log.warning(
"Out-of-band file change detected. "
Expand Down
12 changes: 11 additions & 1 deletion jupyter_server_documents/rooms/yroom_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def get_room(self, room_id: str) -> YRoom | None:
loop=self.loop,
fileid_manager=self.fileid_manager,
contents_manager=self.contents_manager,
on_stop=lambda: self._handle_yroom_stop(room_id),
)
self._rooms_by_id[room_id] = yroom
return yroom
Expand All @@ -63,7 +64,16 @@ def get_room(self, room_id: str) -> YRoom | None:
exc_info=True
)
return None



def _handle_yroom_stop(self, room_id: str) -> None:
"""
Callback that is run when the YRoom is stopped. This ensures the room is
removed from the dictionary for garbage collection, even if the room was
stopped directly without `YRoomManager.delete_room()`.
"""
self._rooms_by_id.pop(room_id, None)


async def delete_room(self, room_id: str) -> None:
"""
Expand Down
Loading
Loading