diff --git a/.gitignore b/.gitignore index 5c6e90d..e7e076d 100644 --- a/.gitignore +++ b/.gitignore @@ -123,3 +123,7 @@ dmypy.json # Yarn cache .yarn/ + +# Ignore files used to test in real-time +untitled* +Untitled* diff --git a/devenv-jcollab-frontend.yml b/devenv-jcollab-frontend.yml new file mode 100644 index 0000000..21c78d1 --- /dev/null +++ b/devenv-jcollab-frontend.yml @@ -0,0 +1,11 @@ +name: rtccore-jcollab-frontend +channels: + - conda-forge +dependencies: + - python + - nodejs=22 + - uv + - jupyterlab + - pip: + - jupyter_docprovider>=2.0.2,<3 + - jupyter_collaboration_ui>=2.0.2,<3 diff --git a/jupyter_rtc_core/app.py b/jupyter_rtc_core/app.py index 57d423b..0b8f46b 100644 --- a/jupyter_rtc_core/app.py +++ b/jupyter_rtc_core/app.py @@ -2,9 +2,8 @@ from traitlets.config import Config import asyncio -from traitlets import Instance -from traitlets import Type -from .handlers import RouteHandler +from traitlets import Instance, Type +from .handlers import RouteHandler, YRoomSessionHandler from .websockets import GlobalAwarenessWebsocket, YRoomWebsocket from .rooms.yroom_manager import YRoomManager @@ -20,7 +19,9 @@ class RtcExtensionApp(ExtensionApp): # global awareness websocket # (r"api/collaboration/room/JupyterLab:globalAwareness/?", GlobalAwarenessWebsocket), # # ydoc websocket - # (r"api/collaboration/room/(.*)", YRoomWebsocket) + (r"api/collaboration/room/(.*)", YRoomWebsocket), + # handler that just adds compatibility with Jupyter Collaboration's frontend + (r"api/collaboration/session/(.*)", YRoomSessionHandler) ] yroom_manager_class = Type( @@ -44,7 +45,8 @@ def initialize_settings(self): # We cannot access the 'file_id_manager' key immediately because server # extensions initialize in alphabetical order. 'jupyter_rtc_core' < # 'jupyter_server_fileid'. - get_fileid_manager = lambda: self.settings["file_id_manager"] + def get_fileid_manager(): + return self.serverapp.web_app.settings["file_id_manager"] contents_manager = self.serverapp.contents_manager loop = asyncio.get_event_loop_policy().get_event_loop() log = self.log @@ -56,6 +58,7 @@ def initialize_settings(self): loop=loop, log=log ) + pass def _link_jupyter_server_extension(self, server_app): diff --git a/jupyter_rtc_core/handlers.py b/jupyter_rtc_core/handlers.py index 61106db..f3cd186 100644 --- a/jupyter_rtc_core/handlers.py +++ b/jupyter_rtc_core/handlers.py @@ -1,4 +1,5 @@ import json +import uuid from jupyter_server.base.handlers import APIHandler import tornado @@ -14,4 +15,33 @@ def get(self): })) +# TODO: remove this by v1.0.0 if deemed unnecessary. Just adding this for +# compatibility with the `jupyter_collaboration` frontend. +class YRoomSessionHandler(APIHandler): + SESSION_ID = str(uuid.uuid4()) + + @tornado.web.authenticated + def put(self, path): + body = json.loads(self.request.body) + format = body["format"] + content_type = body["type"] + # self.log.info("IN HANDLER") + # for k, v in self.settings.items(): + # print(f"{k}: {v}") + # print(len(self.settings.items())) + # print(id(self.settings)) + + file_id_manager = self.settings["file_id_manager"] + file_id = file_id_manager.index(path) + + data = json.dumps( + { + "format": format, + "type": content_type, + "fileId": file_id, + "sessionId": self.SESSION_ID, + } + ) + self.set_status(200) + self.finish(data) diff --git a/jupyter_rtc_core/rooms/yroom.py b/jupyter_rtc_core/rooms/yroom.py index d7b1472..90b260a 100644 --- a/jupyter_rtc_core/rooms/yroom.py +++ b/jupyter_rtc_core/rooms/yroom.py @@ -12,7 +12,7 @@ from .yroom_file_api import YRoomFileAPI if TYPE_CHECKING: - from typing import Literal, Tuple + from typing import Literal, Tuple, Any from jupyter_server_fileid.manager import BaseFileIdManager from jupyter_server.services.contents.manager import AsyncContentsManager, ContentsManager @@ -22,7 +22,11 @@ class YRoom: log: Logger """Log object""" room_id: str - """Room Id""" + """ + The ID of the room. This is a composite ID following the format: + + room_id := "{file_type}:{file_format}:{file_id}" + """ _jupyter_ydoc: YBaseDoc """JupyterYDoc""" @@ -60,9 +64,10 @@ def __init__( self._client_group = YjsClientGroup(room_id=room_id, log=self.log, loop=self._loop) self._ydoc = pycrdt.Doc() self._awareness = pycrdt.Awareness(ydoc=self._ydoc) + _, file_type, _ = self.room_id.split(":") JupyterYDocClass = cast( type[YBaseDoc], - jupyter_ydoc_classes.get(self.file_type, jupyter_ydoc_classes["file"]) + jupyter_ydoc_classes.get(file_type, jupyter_ydoc_classes["file"]) ) self.jupyter_ydoc = JupyterYDocClass(ydoc=self._ydoc, awareness=self._awareness) @@ -90,6 +95,9 @@ def __init__( # messages in the message queue to the appropriate handler method. self._message_queue = asyncio.Queue() self._loop.create_task(self._on_new_message()) + + # Log notification that room is ready + self.log.info(f"Room '{self.room_id}' initialized.") @property @@ -157,20 +165,28 @@ async def _on_new_message(self) -> None: # Handle Awareness messages message_type = message[0] if message_type == YMessageType.AWARENESS: - self.handle_awareness_update(client_id, message[1:]) + self.log.debug(f"Received AwarenessUpdate from '{client_id}'.") + self.handle_awareness_update(client_id, message) + self.log.debug(f"Handled AwarenessUpdate from '{client_id}'.") continue # Handle Sync messages assert message_type == YMessageType.SYNC message_subtype = message[1] if len(message) >= 2 else None if message_subtype == YSyncMessageSubtype.SYNC_STEP1: + self.log.info(f"Received SS1 from '{client_id}'.") self.handle_sync_step1(client_id, message) + self.log.info(f"Handled SS1 from '{client_id}'.") continue elif message_subtype == YSyncMessageSubtype.SYNC_STEP2: + self.log.info(f"Received SS2 from '{client_id}'.") self.handle_sync_step2(client_id, message) + self.log.info(f"Handled SS2 from '{client_id}'.") continue elif message_subtype == YSyncMessageSubtype.SYNC_UPDATE: + self.log.info(f"Received SyncUpdate from '{client_id}'.") self.handle_sync_update(client_id, message) + self.log.info(f"Handled SyncUpdate from '{client_id}'.") continue else: self.log.warning( @@ -213,7 +229,8 @@ def handle_sync_step1(self, client_id: str, message: bytes) -> None: try: # TODO: remove the assert once websocket is made required assert isinstance(new_client.websocket, WebSocketHandler) - new_client.websocket.write_message(sync_step2_message) + new_client.websocket.write_message(sync_step2_message, binary=True) + self.log.info(f"Sent SS2 reply to client '{client_id}'.") except Exception as e: self.log.error( "An exception occurred when writing the SyncStep2 reply " @@ -228,7 +245,8 @@ def handle_sync_step1(self, client_id: str, message: bytes) -> None: try: assert isinstance(new_client.websocket, WebSocketHandler) sync_step1_message = pycrdt.create_sync_message(self._ydoc) - new_client.websocket.write_message(sync_step1_message) + new_client.websocket.write_message(sync_step1_message, binary=True) + self.log.info(f"Sent SS1 message to client '{client_id}'.") except Exception as e: self.log.error( "An exception occurred when writing a SyncStep1 message " @@ -295,23 +313,22 @@ def write_sync_update(self, message_payload: bytes, client_id: str | None = None This method can also be called manually. """ - # Broadcast the message: + # Broadcast the message message = pycrdt.create_update_message(message_payload) self._broadcast_message(message, message_type="SyncUpdate") - # Save the file to disk. - # TODO: requires YRoomLoader implementation - return + # Save the file to disk + self.file_api.schedule_save() def handle_awareness_update(self, client_id: str, message: bytes) -> None: # Apply the AwarenessUpdate message try: - message_payload = message[1:] + message_payload = pycrdt.read_message(message[1:]) self._awareness.apply_awareness_update(message_payload, origin=self) except Exception as e: self.log.error( - "An exception occurred when applying an AwarenessUpdate" + "An exception occurred when applying an AwarenessUpdate " f"message from client '{client_id}':" ) self.log.exception(e) @@ -351,7 +368,7 @@ def _broadcast_message(self, message: bytes, message_type: Literal['AwarenessUpd try: # TODO: remove this assertion once websocket is made required assert isinstance(client.websocket, WebSocketHandler) - client.websocket.write_message(message) + client.websocket.write_message(message, binary=True) except Exception as e: self.log.warning( f"An exception occurred when broadcasting a " diff --git a/jupyter_rtc_core/rooms/yroom_file_api.py b/jupyter_rtc_core/rooms/yroom_file_api.py index 0bf547d..c05e959 100644 --- a/jupyter_rtc_core/rooms/yroom_file_api.py +++ b/jupyter_rtc_core/rooms/yroom_file_api.py @@ -33,6 +33,7 @@ class YRoomFileAPI: # See `filemanager.py` in `jupyter_server` for references on supported file # formats & file types. + room_id: str file_format: Literal["text", "base64"] file_type: Literal["file", "notebook"] file_id: str @@ -57,6 +58,7 @@ def __init__( loop: asyncio.AbstractEventLoop ): # Bind instance attributes + self.room_id = room_id self.file_format, self.file_type, self.file_id = room_id.split(":") self.jupyter_ydoc = jupyter_ydoc self.log = log @@ -113,6 +115,8 @@ def load_ydoc_content(self) -> None: # Otherwise, set loading to `True` and start the loading task. if self._ydoc_content_loaded.is_set() or self._ydoc_content_loading: return + + self.log.info(f"Loading content for room ID '{self.room_id}'.") self._ydoc_content_loading = True self._loop.create_task(self._load_ydoc_content()) @@ -134,6 +138,7 @@ async def _load_ydoc_content(self) -> None: # Also set loading to `False` for consistency self._ydoc_content_loaded.set() self._ydoc_content_loading = False + self.log.info(f"Loaded content for room ID '{self.room_id}'.") def schedule_save(self) -> None: @@ -172,6 +177,7 @@ async def _process_scheduled_saves(self) -> None: file_format = self.file_format file_type = self.file_type if self.file_type in SAVEABLE_FILE_TYPES else "file" + # Save the YDoc via the ContentsManager await ensure_async(self._contents_manager.save( { "format": file_format, @@ -180,6 +186,10 @@ async def _process_scheduled_saves(self) -> None: }, path )) + + # Mark 'dirty' as `False`. This hides the "unsaved changes" icon + # in the JupyterLab tab rendering this YDoc in the frontend. + self.jupyter_ydoc.dirty = False except Exception as e: self.log.error("An exception occurred when saving JupyterYDoc.") self.log.exception(e) diff --git a/jupyter_rtc_core/rooms/yroom_manager.py b/jupyter_rtc_core/rooms/yroom_manager.py index 2813734..91dbba4 100644 --- a/jupyter_rtc_core/rooms/yroom_manager.py +++ b/jupyter_rtc_core/rooms/yroom_manager.py @@ -1,6 +1,6 @@ +from __future__ import annotations from typing import Any, Dict, Optional from traitlets import HasTraits, Instance, default -from __future__ import annotations from .yroom import YRoom from typing import TYPE_CHECKING @@ -8,7 +8,6 @@ if TYPE_CHECKING: import asyncio import logging - from typing import Callable from jupyter_server_fileid.manager import BaseFileIdManager from jupyter_server.services.contents.manager import AsyncContentsManager, ContentsManager @@ -18,7 +17,7 @@ class YRoomManager(): def __init__( self, *, - get_fileid_manager: Callable[[], BaseFileIdManager], + get_fileid_manager: callable[[], BaseFileIdManager], contents_manager: AsyncContentsManager | ContentsManager, loop: asyncio.AbstractEventLoop, log: logging.Logger, @@ -35,7 +34,7 @@ def __init__( @property def fileid_manager(self) -> BaseFileIdManager: return self._get_fileid_manager() - + def get_room(self, room_id: str) -> YRoom | None: """ @@ -49,6 +48,7 @@ def get_room(self, room_id: str) -> YRoom | None: # Otherwise, create a new room try: + self.log.info(f"Initializing room '{room_id}'.") yroom = YRoom( room_id=room_id, log=self.log, diff --git a/jupyter_rtc_core/websockets/__init__.py b/jupyter_rtc_core/websockets/__init__.py index 00d8474..76c656a 100644 --- a/jupyter_rtc_core/websockets/__init__.py +++ b/jupyter_rtc_core/websockets/__init__.py @@ -1,3 +1,3 @@ from .global_awareness_ws import GlobalAwarenessWebsocket -from .yroom_ws import YRoomWebsocket from .clients import YjsClient, YjsClientGroup +from .yroom_ws import YRoomWebsocket diff --git a/jupyter_rtc_core/websockets/clients.py b/jupyter_rtc_core/websockets/clients.py index 8599dad..a4f2735 100644 --- a/jupyter_rtc_core/websockets/clients.py +++ b/jupyter_rtc_core/websockets/clients.py @@ -5,8 +5,7 @@ """ from __future__ import annotations -from datetime import timedelta, timezone -import datetime +from datetime import timedelta, timezone, datetime from logging import Logger from typing import TYPE_CHECKING import uuid @@ -43,7 +42,7 @@ def synced(self): @synced.setter def synced(self, v: bool): - self.synced = v + self._synced = v self.last_modified = datetime.now(timezone.utc) class YjsClientGroup: @@ -67,7 +66,7 @@ class YjsClientGroup: """Log object""" loop: asyncio.AbstractEventLoop """Event loop""" - poll_interval_seconds: int + _poll_interval_seconds: int """The poll time interval used while auto removing desynced clients""" desynced_timeout_seconds: int """The max time period in seconds that a desynced client does not become synced before get auto removed from desynced dict""" @@ -78,8 +77,8 @@ def __init__(self, *, room_id: str, log: Logger, loop: asyncio.AbstractEventLoop self.desynced: dict[str, YjsClient] = {} self.log = log self.loop = loop - self.loop.create_task(self._clean_desynced()) - self.poll_interval_seconds = poll_interval_seconds + # self.loop.create_task(self._clean_desynced()) + self._poll_interval_seconds = poll_interval_seconds self.desynced_timeout_seconds = desynced_timeout_seconds def add(self, websocket: WebSocketHandler) -> str: diff --git a/jupyter_rtc_core/websockets/yroom_ws.py b/jupyter_rtc_core/websockets/yroom_ws.py index 1cfd528..c91ba19 100644 --- a/jupyter_rtc_core/websockets/yroom_ws.py +++ b/jupyter_rtc_core/websockets/yroom_ws.py @@ -2,24 +2,37 @@ from tornado.httpclient import HTTPError from tornado.websocket import WebSocketHandler from typing import TYPE_CHECKING +import asyncio +from ..rooms import YRoomManager +import logging if TYPE_CHECKING: from jupyter_server_fileid.manager import BaseFileIdManager - from ..rooms import YRoom, YRoomManager + from jupyter_server.services.contents.manager import AsyncContentsManager, ContentsManager + from ..rooms import YRoom class YRoomWebsocket(WebSocketHandler): yroom: YRoom room_id: str client_id: str + # TODO: change this. we should pass `self.log` from our + # `ExtensionApp` to log messages w/ "RtcCoreExtension" prefix + log = logging.Logger("TEMP") + @property def yroom_manager(self) -> YRoomManager: return self.settings["yroom_manager"] - + @property def fileid_manager(self) -> BaseFileIdManager: return self.settings["file_id_manager"] + + + @property + def contents_manager(self) -> AsyncContentsManager | ContentsManager: + return self.settings["contents_manager"] def prepare(self): @@ -27,6 +40,11 @@ def prepare(self): request_path: str = self.request.path self.room_id = request_path.strip("/").split("/")[-1] + # TODO: remove this once globalawareness is implemented + if self.room_id == "JupyterLab:globalAwareness": + self.close(1011) + return + # Verify the file ID contained in the room ID points to a valid file. fileid = self.room_id.split(":")[-1] path = self.fileid_manager.get_path(fileid) @@ -34,7 +52,12 @@ def prepare(self): raise HTTPError(404, f"No file with ID '{fileid}'.") - def open(self): + def open(self, *_, **__): + # TODO: remove this later + if self.room_id == "JupyterLab:globalAwareness": + self.close(1011) + return + # Create the YRoom yroom = self.yroom_manager.get_room(self.room_id) if not yroom: @@ -46,9 +69,18 @@ def open(self): def on_message(self, message: bytes): + # TODO: remove this later + if self.room_id == "JupyterLab:globalAwareness": + return + # Route all messages to the YRoom for processing self.yroom.add_message(self.client_id, message) def on_close(self): + # TODO: remove this later + if self.room_id == "JupyterLab:globalAwareness": + return + + self.log.info(f"Closed Websocket to client '{self.client_id}'.") self.yroom.clients.remove(self.client_id)