From db4d69857d25bd8eb1ba7aa754ca33d189c30270 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Tue, 13 May 2025 08:30:23 -0700 Subject: [PATCH 1/9] add 2nd dev env with JCollab frontend --- devenv-jcollab-frontend.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 devenv-jcollab-frontend.yml diff --git a/devenv-jcollab-frontend.yml b/devenv-jcollab-frontend.yml new file mode 100644 index 0000000..21c78d1 --- /dev/null +++ b/devenv-jcollab-frontend.yml @@ -0,0 +1,11 @@ +name: rtccore-jcollab-frontend +channels: + - conda-forge +dependencies: + - python + - nodejs=22 + - uv + - jupyterlab + - pip: + - jupyter_docprovider>=2.0.2,<3 + - jupyter_collaboration_ui>=2.0.2,<3 From 09b4ee2d209df8e1206187580b4c73540e67414a Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Tue, 13 May 2025 08:30:49 -0700 Subject: [PATCH 2/9] enable YRoomWebsocket handler --- jupyter_rtc_core/app.py | 2 +- jupyter_rtc_core/websockets/yroom_ws.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/jupyter_rtc_core/app.py b/jupyter_rtc_core/app.py index 57d423b..9b1cae0 100644 --- a/jupyter_rtc_core/app.py +++ b/jupyter_rtc_core/app.py @@ -20,7 +20,7 @@ class RtcExtensionApp(ExtensionApp): # global awareness websocket # (r"api/collaboration/room/JupyterLab:globalAwareness/?", GlobalAwarenessWebsocket), # # ydoc websocket - # (r"api/collaboration/room/(.*)", YRoomWebsocket) + (r"api/collaboration/room/(.*)", YRoomWebsocket) ] yroom_manager_class = Type( diff --git a/jupyter_rtc_core/websockets/yroom_ws.py b/jupyter_rtc_core/websockets/yroom_ws.py index 1cfd528..eb35c6a 100644 --- a/jupyter_rtc_core/websockets/yroom_ws.py +++ b/jupyter_rtc_core/websockets/yroom_ws.py @@ -27,6 +27,10 @@ def prepare(self): request_path: str = self.request.path self.room_id = request_path.strip("/").split("/")[-1] + # TODO: remove this once globalawareness is implemented + if self.room_id == "JupyterLab:globalAwareness": + raise HTTPError(404) + # Verify the file ID contained in the room ID points to a valid file. fileid = self.room_id.split(":")[-1] path = self.fileid_manager.get_path(fileid) From 3e4607be965d1fb1c29d3a7bea56cfd8185c7c36 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Tue, 13 May 2025 10:01:43 -0700 Subject: [PATCH 3/9] first version receiving Yjs messages --- jupyter_rtc_core/app.py | 38 +++++++++++++++---------- jupyter_rtc_core/handlers.py | 30 +++++++++++++++++++ jupyter_rtc_core/rooms/yroom.py | 15 ++++++---- jupyter_rtc_core/rooms/yroom_manager.py | 9 ++---- jupyter_rtc_core/websockets/__init__.py | 2 +- jupyter_rtc_core/websockets/clients.py | 9 +++--- jupyter_rtc_core/websockets/yroom_ws.py | 25 ++++++++++++++-- 7 files changed, 92 insertions(+), 36 deletions(-) diff --git a/jupyter_rtc_core/app.py b/jupyter_rtc_core/app.py index 9b1cae0..53bd55f 100644 --- a/jupyter_rtc_core/app.py +++ b/jupyter_rtc_core/app.py @@ -2,9 +2,8 @@ from traitlets.config import Config import asyncio -from traitlets import Instance -from traitlets import Type -from .handlers import RouteHandler +from traitlets import Instance, Type +from .handlers import RouteHandler, YRoomSessionHandler from .websockets import GlobalAwarenessWebsocket, YRoomWebsocket from .rooms.yroom_manager import YRoomManager @@ -20,7 +19,9 @@ class RtcExtensionApp(ExtensionApp): # global awareness websocket # (r"api/collaboration/room/JupyterLab:globalAwareness/?", GlobalAwarenessWebsocket), # # ydoc websocket - (r"api/collaboration/room/(.*)", YRoomWebsocket) + (r"api/collaboration/room/(.*)", YRoomWebsocket), + # handler that just adds compatibility with Jupyter Collaboration's frontend + (r"api/collaboration/session/(.*)", YRoomSessionHandler) ] yroom_manager_class = Type( @@ -44,18 +45,25 @@ def initialize_settings(self): # We cannot access the 'file_id_manager' key immediately because server # extensions initialize in alphabetical order. 'jupyter_rtc_core' < # 'jupyter_server_fileid'. - get_fileid_manager = lambda: self.settings["file_id_manager"] - contents_manager = self.serverapp.contents_manager - loop = asyncio.get_event_loop_policy().get_event_loop() - log = self.log + # def get_fileid_manager(): + # self.log.info("IN GETTER") + # for k, v in self.settings.items(): + # print(f"{k}: {v}") + # print(len(self.settings.items())) + # print(id(self.settings)) + # return self.settings["file_id_manager"] + # contents_manager = self.serverapp.contents_manager + # loop = asyncio.get_event_loop_policy().get_event_loop() + # log = self.log - # Initialize YRoomManager - self.settings["yroom_manager"] = YRoomManager( - get_fileid_manager=get_fileid_manager, - contents_manager=contents_manager, - loop=loop, - log=log - ) + # # Initialize YRoomManager + # self.settings["yroom_manager"] = YRoomManager( + # get_fileid_manager=get_fileid_manager, + # contents_manager=contents_manager, + # loop=loop, + # log=log + # ) + pass def _link_jupyter_server_extension(self, server_app): diff --git a/jupyter_rtc_core/handlers.py b/jupyter_rtc_core/handlers.py index 61106db..f3cd186 100644 --- a/jupyter_rtc_core/handlers.py +++ b/jupyter_rtc_core/handlers.py @@ -1,4 +1,5 @@ import json +import uuid from jupyter_server.base.handlers import APIHandler import tornado @@ -14,4 +15,33 @@ def get(self): })) +# TODO: remove this by v1.0.0 if deemed unnecessary. Just adding this for +# compatibility with the `jupyter_collaboration` frontend. +class YRoomSessionHandler(APIHandler): + SESSION_ID = str(uuid.uuid4()) + + @tornado.web.authenticated + def put(self, path): + body = json.loads(self.request.body) + format = body["format"] + content_type = body["type"] + # self.log.info("IN HANDLER") + # for k, v in self.settings.items(): + # print(f"{k}: {v}") + # print(len(self.settings.items())) + # print(id(self.settings)) + + file_id_manager = self.settings["file_id_manager"] + file_id = file_id_manager.index(path) + + data = json.dumps( + { + "format": format, + "type": content_type, + "fileId": file_id, + "sessionId": self.SESSION_ID, + } + ) + self.set_status(200) + self.finish(data) diff --git a/jupyter_rtc_core/rooms/yroom.py b/jupyter_rtc_core/rooms/yroom.py index d7b1472..91a6cff 100644 --- a/jupyter_rtc_core/rooms/yroom.py +++ b/jupyter_rtc_core/rooms/yroom.py @@ -12,7 +12,7 @@ from .yroom_file_api import YRoomFileAPI if TYPE_CHECKING: - from typing import Literal, Tuple + from typing import Literal, Tuple, Any from jupyter_server_fileid.manager import BaseFileIdManager from jupyter_server.services.contents.manager import AsyncContentsManager, ContentsManager @@ -22,7 +22,11 @@ class YRoom: log: Logger """Log object""" room_id: str - """Room Id""" + """ + The ID of the room. This is a composite ID following the format: + + room_id := "{file_type}:{file_format} + """ _jupyter_ydoc: YBaseDoc """JupyterYDoc""" @@ -60,9 +64,10 @@ def __init__( self._client_group = YjsClientGroup(room_id=room_id, log=self.log, loop=self._loop) self._ydoc = pycrdt.Doc() self._awareness = pycrdt.Awareness(ydoc=self._ydoc) + _, file_type, _ = self.room_id.split(":") JupyterYDocClass = cast( type[YBaseDoc], - jupyter_ydoc_classes.get(self.file_type, jupyter_ydoc_classes["file"]) + jupyter_ydoc_classes.get(file_type, jupyter_ydoc_classes["file"]) ) self.jupyter_ydoc = JupyterYDocClass(ydoc=self._ydoc, awareness=self._awareness) @@ -307,11 +312,11 @@ def write_sync_update(self, message_payload: bytes, client_id: str | None = None def handle_awareness_update(self, client_id: str, message: bytes) -> None: # Apply the AwarenessUpdate message try: - message_payload = message[1:] + message_payload = pycrdt.read_message(message[1:]) self._awareness.apply_awareness_update(message_payload, origin=self) except Exception as e: self.log.error( - "An exception occurred when applying an AwarenessUpdate" + "An exception occurred when applying an AwarenessUpdate " f"message from client '{client_id}':" ) self.log.exception(e) diff --git a/jupyter_rtc_core/rooms/yroom_manager.py b/jupyter_rtc_core/rooms/yroom_manager.py index 2813734..705a79d 100644 --- a/jupyter_rtc_core/rooms/yroom_manager.py +++ b/jupyter_rtc_core/rooms/yroom_manager.py @@ -18,13 +18,13 @@ class YRoomManager(): def __init__( self, *, - get_fileid_manager: Callable[[], BaseFileIdManager], + fileid_manager: BaseFileIdManager, contents_manager: AsyncContentsManager | ContentsManager, loop: asyncio.AbstractEventLoop, log: logging.Logger, ): # Bind instance attributes - self._get_fileid_manager = get_fileid_manager + self.fileid_manager = fileid_manager self.contents_manager = contents_manager self.loop = loop self.log = log @@ -32,11 +32,6 @@ def __init__( # Initialize dictionary of YRooms, keyed by room ID - @property - def fileid_manager(self) -> BaseFileIdManager: - return self._get_fileid_manager() - - def get_room(self, room_id: str) -> YRoom | None: """ Retrieves a YRoom given a room ID. If the YRoom does not exist, this diff --git a/jupyter_rtc_core/websockets/__init__.py b/jupyter_rtc_core/websockets/__init__.py index 00d8474..76c656a 100644 --- a/jupyter_rtc_core/websockets/__init__.py +++ b/jupyter_rtc_core/websockets/__init__.py @@ -1,3 +1,3 @@ from .global_awareness_ws import GlobalAwarenessWebsocket -from .yroom_ws import YRoomWebsocket from .clients import YjsClient, YjsClientGroup +from .yroom_ws import YRoomWebsocket diff --git a/jupyter_rtc_core/websockets/clients.py b/jupyter_rtc_core/websockets/clients.py index 8599dad..8151bff 100644 --- a/jupyter_rtc_core/websockets/clients.py +++ b/jupyter_rtc_core/websockets/clients.py @@ -5,8 +5,7 @@ """ from __future__ import annotations -from datetime import timedelta, timezone -import datetime +from datetime import timedelta, timezone, datetime from logging import Logger from typing import TYPE_CHECKING import uuid @@ -43,7 +42,7 @@ def synced(self): @synced.setter def synced(self, v: bool): - self.synced = v + self._synced = v self.last_modified = datetime.now(timezone.utc) class YjsClientGroup: @@ -67,7 +66,7 @@ class YjsClientGroup: """Log object""" loop: asyncio.AbstractEventLoop """Event loop""" - poll_interval_seconds: int + _poll_interval_seconds: int """The poll time interval used while auto removing desynced clients""" desynced_timeout_seconds: int """The max time period in seconds that a desynced client does not become synced before get auto removed from desynced dict""" @@ -79,7 +78,7 @@ def __init__(self, *, room_id: str, log: Logger, loop: asyncio.AbstractEventLoop self.log = log self.loop = loop self.loop.create_task(self._clean_desynced()) - self.poll_interval_seconds = poll_interval_seconds + self._poll_interval_seconds = poll_interval_seconds self.desynced_timeout_seconds = desynced_timeout_seconds def add(self, websocket: WebSocketHandler) -> str: diff --git a/jupyter_rtc_core/websockets/yroom_ws.py b/jupyter_rtc_core/websockets/yroom_ws.py index eb35c6a..b5e8da9 100644 --- a/jupyter_rtc_core/websockets/yroom_ws.py +++ b/jupyter_rtc_core/websockets/yroom_ws.py @@ -2,10 +2,14 @@ from tornado.httpclient import HTTPError from tornado.websocket import WebSocketHandler from typing import TYPE_CHECKING +import asyncio +from ..rooms import YRoomManager +import logging if TYPE_CHECKING: from jupyter_server_fileid.manager import BaseFileIdManager - from ..rooms import YRoom, YRoomManager + from jupyter_server.services.contents.manager import AsyncContentsManager, ContentsManager + from ..rooms import YRoom class YRoomWebsocket(WebSocketHandler): yroom: YRoom @@ -14,12 +18,26 @@ class YRoomWebsocket(WebSocketHandler): @property def yroom_manager(self) -> YRoomManager: + if "yroom_manager" not in self.settings: + self.settings["yroom_manager"] = YRoomManager( + fileid_manager=self.fileid_manager, + contents_manager=self.contents_manager, + loop=asyncio.get_event_loop_policy().get_event_loop(), + # TODO: change this. we should pass `self.log` from our + # `ExtensionApp` to log messages w/ "RtcCoreExtension" prefix + log=logging.Logger("TEMP") + + ) return self.settings["yroom_manager"] - @property def fileid_manager(self) -> BaseFileIdManager: return self.settings["file_id_manager"] + + + @property + def contents_manager(self) -> AsyncContentsManager | ContentsManager: + return self.settings["contents_manager"] def prepare(self): @@ -38,7 +56,7 @@ def prepare(self): raise HTTPError(404, f"No file with ID '{fileid}'.") - def open(self): + def open(self, *_, **__): # Create the YRoom yroom = self.yroom_manager.get_room(self.room_id) if not yroom: @@ -51,6 +69,7 @@ def open(self): def on_message(self, message: bytes): # Route all messages to the YRoom for processing + print(message) self.yroom.add_message(self.client_id, message) From 85c965a989bf9b11aa0c2f1d452f3137c6de4607 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Tue, 13 May 2025 14:22:49 -0700 Subject: [PATCH 4/9] fix Y protocol implementation, set binary=True when writing to WS handler --- jupyter_rtc_core/app.py | 29 ++++++++++-------------- jupyter_rtc_core/rooms/yroom.py | 23 +++++++++++++++---- jupyter_rtc_core/rooms/yroom_file_api.py | 6 +++++ jupyter_rtc_core/rooms/yroom_manager.py | 11 ++++++--- jupyter_rtc_core/websockets/clients.py | 2 +- jupyter_rtc_core/websockets/yroom_ws.py | 17 +++++--------- 6 files changed, 51 insertions(+), 37 deletions(-) diff --git a/jupyter_rtc_core/app.py b/jupyter_rtc_core/app.py index 53bd55f..0b8f46b 100644 --- a/jupyter_rtc_core/app.py +++ b/jupyter_rtc_core/app.py @@ -45,24 +45,19 @@ def initialize_settings(self): # We cannot access the 'file_id_manager' key immediately because server # extensions initialize in alphabetical order. 'jupyter_rtc_core' < # 'jupyter_server_fileid'. - # def get_fileid_manager(): - # self.log.info("IN GETTER") - # for k, v in self.settings.items(): - # print(f"{k}: {v}") - # print(len(self.settings.items())) - # print(id(self.settings)) - # return self.settings["file_id_manager"] - # contents_manager = self.serverapp.contents_manager - # loop = asyncio.get_event_loop_policy().get_event_loop() - # log = self.log + def get_fileid_manager(): + return self.serverapp.web_app.settings["file_id_manager"] + contents_manager = self.serverapp.contents_manager + loop = asyncio.get_event_loop_policy().get_event_loop() + log = self.log - # # Initialize YRoomManager - # self.settings["yroom_manager"] = YRoomManager( - # get_fileid_manager=get_fileid_manager, - # contents_manager=contents_manager, - # loop=loop, - # log=log - # ) + # Initialize YRoomManager + self.settings["yroom_manager"] = YRoomManager( + get_fileid_manager=get_fileid_manager, + contents_manager=contents_manager, + loop=loop, + log=log + ) pass diff --git a/jupyter_rtc_core/rooms/yroom.py b/jupyter_rtc_core/rooms/yroom.py index 91a6cff..f37efb9 100644 --- a/jupyter_rtc_core/rooms/yroom.py +++ b/jupyter_rtc_core/rooms/yroom.py @@ -25,7 +25,7 @@ class YRoom: """ The ID of the room. This is a composite ID following the format: - room_id := "{file_type}:{file_format} + room_id := "{file_type}:{file_format}:{file_id}" """ _jupyter_ydoc: YBaseDoc @@ -95,6 +95,9 @@ def __init__( # messages in the message queue to the appropriate handler method. self._message_queue = asyncio.Queue() self._loop.create_task(self._on_new_message()) + + # Log notification that room is ready + self.log.info(f"Room '{self.room_id}' initialized.") @property @@ -156,13 +159,15 @@ async def _on_new_message(self) -> None: while True: try: client_id, message = await self._message_queue.get() + self.log.info(f"HANDLING NEW MESSAGE FROM '{client_id}'") + self.log.info(f"Message: {message}") except asyncio.QueueShutDown: break # Handle Awareness messages message_type = message[0] if message_type == YMessageType.AWARENESS: - self.handle_awareness_update(client_id, message[1:]) + self.handle_awareness_update(client_id, message) continue # Handle Sync messages @@ -196,6 +201,8 @@ def handle_sync_step1(self, client_id: str, message: bytes) -> None: - Sending the reply to the client over WS, and - Sending a new SyncStep1 message immediately after. """ + self.log.info(f"Handling SS1 message from client '{client_id}'.") + # Mark client as desynced new_client = self.clients.get(client_id) self.clients.mark_desynced(client_id) @@ -218,7 +225,7 @@ def handle_sync_step1(self, client_id: str, message: bytes) -> None: try: # TODO: remove the assert once websocket is made required assert isinstance(new_client.websocket, WebSocketHandler) - new_client.websocket.write_message(sync_step2_message) + new_client.websocket.write_message(sync_step2_message, binary=True) except Exception as e: self.log.error( "An exception occurred when writing the SyncStep2 reply " @@ -228,18 +235,21 @@ def handle_sync_step1(self, client_id: str, message: bytes) -> None: return self.clients.mark_synced(client_id) + self.log.info(f"Sent SS2 reply to client '{client_id}'.") # Send SyncStep1 message try: assert isinstance(new_client.websocket, WebSocketHandler) sync_step1_message = pycrdt.create_sync_message(self._ydoc) - new_client.websocket.write_message(sync_step1_message) + new_client.websocket.write_message(sync_step1_message, binary=True) except Exception as e: self.log.error( "An exception occurred when writing a SyncStep1 message " f"to newly-synced client '{new_client.id}':" ) self.log.exception(e) + self.log.info(f"Sent SS1 message to client '{client_id}'.") + self.log.info(f"Message: {sync_step1_message}") def handle_sync_step2(self, client_id: str, message: bytes) -> None: @@ -251,6 +261,7 @@ def handle_sync_step2(self, client_id: str, message: bytes) -> None: clients after this method is called via the `self.write_sync_update()` observer. """ + self.log.info("HANDLING SS2 MESSAGE") try: message_payload = message[1:] pycrdt.handle_sync_message(message_payload, self._ydoc) @@ -271,6 +282,7 @@ def handle_sync_update(self, client_id: str, message: bytes) -> None: clients after this method is called via the `self.write_sync_update()` observer. """ + self.log.info("HANDLING SYNCUPDATE") # Remove client and kill websocket if received SyncUpdate when client is desynced if self._should_ignore_update(client_id, "SyncUpdate"): self.log.error(f"Should not receive SyncUpdate message when double handshake is not completed for client '{client_id}' and room '{self.room_id}'") @@ -310,6 +322,7 @@ def write_sync_update(self, message_payload: bytes, client_id: str | None = None def handle_awareness_update(self, client_id: str, message: bytes) -> None: + self.log.info("HANDLING AWARENESS UPDATE") # Apply the AwarenessUpdate message try: message_payload = pycrdt.read_message(message[1:]) @@ -356,7 +369,7 @@ def _broadcast_message(self, message: bytes, message_type: Literal['AwarenessUpd try: # TODO: remove this assertion once websocket is made required assert isinstance(client.websocket, WebSocketHandler) - client.websocket.write_message(message) + client.websocket.write_message(message, binary=True) except Exception as e: self.log.warning( f"An exception occurred when broadcasting a " diff --git a/jupyter_rtc_core/rooms/yroom_file_api.py b/jupyter_rtc_core/rooms/yroom_file_api.py index 0bf547d..df960c1 100644 --- a/jupyter_rtc_core/rooms/yroom_file_api.py +++ b/jupyter_rtc_core/rooms/yroom_file_api.py @@ -33,6 +33,7 @@ class YRoomFileAPI: # See `filemanager.py` in `jupyter_server` for references on supported file # formats & file types. + room_id: str file_format: Literal["text", "base64"] file_type: Literal["file", "notebook"] file_id: str @@ -57,6 +58,7 @@ def __init__( loop: asyncio.AbstractEventLoop ): # Bind instance attributes + self.room_id = room_id self.file_format, self.file_type, self.file_id = room_id.split(":") self.jupyter_ydoc = jupyter_ydoc self.log = log @@ -113,6 +115,8 @@ def load_ydoc_content(self) -> None: # Otherwise, set loading to `True` and start the loading task. if self._ydoc_content_loaded.is_set() or self._ydoc_content_loading: return + + self.log.info(f"Loading content for room ID '{self.room_id}'.") self._ydoc_content_loading = True self._loop.create_task(self._load_ydoc_content()) @@ -134,6 +138,8 @@ async def _load_ydoc_content(self) -> None: # Also set loading to `False` for consistency self._ydoc_content_loaded.set() self._ydoc_content_loading = False + self.log.info(f"Loaded content for room ID '{self.room_id}'.") + self.log.info(f"Content: {self.jupyter_ydoc.source}") def schedule_save(self) -> None: diff --git a/jupyter_rtc_core/rooms/yroom_manager.py b/jupyter_rtc_core/rooms/yroom_manager.py index 705a79d..c9f243e 100644 --- a/jupyter_rtc_core/rooms/yroom_manager.py +++ b/jupyter_rtc_core/rooms/yroom_manager.py @@ -8,7 +8,6 @@ if TYPE_CHECKING: import asyncio import logging - from typing import Callable from jupyter_server_fileid.manager import BaseFileIdManager from jupyter_server.services.contents.manager import AsyncContentsManager, ContentsManager @@ -18,13 +17,13 @@ class YRoomManager(): def __init__( self, *, - fileid_manager: BaseFileIdManager, + get_fileid_manager: callable[[], BaseFileIdManager], contents_manager: AsyncContentsManager | ContentsManager, loop: asyncio.AbstractEventLoop, log: logging.Logger, ): # Bind instance attributes - self.fileid_manager = fileid_manager + self._get_fileid_manager = get_fileid_manager self.contents_manager = contents_manager self.loop = loop self.log = log @@ -32,6 +31,11 @@ def __init__( # Initialize dictionary of YRooms, keyed by room ID + @property + def fileid_manager(self) -> BaseFileIdManager: + return self._get_fileid_manager() + + def get_room(self, room_id: str) -> YRoom | None: """ Retrieves a YRoom given a room ID. If the YRoom does not exist, this @@ -44,6 +48,7 @@ def get_room(self, room_id: str) -> YRoom | None: # Otherwise, create a new room try: + self.log.info(f"Initializing room '{room_id}'.") yroom = YRoom( room_id=room_id, log=self.log, diff --git a/jupyter_rtc_core/websockets/clients.py b/jupyter_rtc_core/websockets/clients.py index 8151bff..a4f2735 100644 --- a/jupyter_rtc_core/websockets/clients.py +++ b/jupyter_rtc_core/websockets/clients.py @@ -77,7 +77,7 @@ def __init__(self, *, room_id: str, log: Logger, loop: asyncio.AbstractEventLoop self.desynced: dict[str, YjsClient] = {} self.log = log self.loop = loop - self.loop.create_task(self._clean_desynced()) + # self.loop.create_task(self._clean_desynced()) self._poll_interval_seconds = poll_interval_seconds self.desynced_timeout_seconds = desynced_timeout_seconds diff --git a/jupyter_rtc_core/websockets/yroom_ws.py b/jupyter_rtc_core/websockets/yroom_ws.py index b5e8da9..0981760 100644 --- a/jupyter_rtc_core/websockets/yroom_ws.py +++ b/jupyter_rtc_core/websockets/yroom_ws.py @@ -15,21 +15,16 @@ class YRoomWebsocket(WebSocketHandler): yroom: YRoom room_id: str client_id: str + # TODO: change this. we should pass `self.log` from our + # `ExtensionApp` to log messages w/ "RtcCoreExtension" prefix + log = logging.Logger("TEMP") + @property def yroom_manager(self) -> YRoomManager: - if "yroom_manager" not in self.settings: - self.settings["yroom_manager"] = YRoomManager( - fileid_manager=self.fileid_manager, - contents_manager=self.contents_manager, - loop=asyncio.get_event_loop_policy().get_event_loop(), - # TODO: change this. we should pass `self.log` from our - # `ExtensionApp` to log messages w/ "RtcCoreExtension" prefix - log=logging.Logger("TEMP") - - ) return self.settings["yroom_manager"] + @property def fileid_manager(self) -> BaseFileIdManager: return self.settings["file_id_manager"] @@ -69,9 +64,9 @@ def open(self, *_, **__): def on_message(self, message: bytes): # Route all messages to the YRoom for processing - print(message) self.yroom.add_message(self.client_id, message) def on_close(self): + self.log.info("WEBSOCKET CLOSED") self.yroom.clients.remove(self.client_id) From 53fd9a0e17feb6fd6cf2ca953399e4c83705fc0a Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Tue, 13 May 2025 14:23:01 -0700 Subject: [PATCH 5/9] add untitled files to gitignore --- .gitignore | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.gitignore b/.gitignore index 5c6e90d..e7e076d 100644 --- a/.gitignore +++ b/.gitignore @@ -123,3 +123,7 @@ dmypy.json # Yarn cache .yarn/ + +# Ignore files used to test in real-time +untitled* +Untitled* From 5aeb056f84b4749a2252ea548be2144e5e9ae768 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Tue, 13 May 2025 16:00:50 -0700 Subject: [PATCH 6/9] implement file auto-save --- jupyter_rtc_core/rooms/yroom.py | 7 +++---- jupyter_rtc_core/rooms/yroom_file_api.py | 5 +++++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/jupyter_rtc_core/rooms/yroom.py b/jupyter_rtc_core/rooms/yroom.py index f37efb9..b6c4501 100644 --- a/jupyter_rtc_core/rooms/yroom.py +++ b/jupyter_rtc_core/rooms/yroom.py @@ -312,13 +312,12 @@ def write_sync_update(self, message_payload: bytes, client_id: str | None = None This method can also be called manually. """ - # Broadcast the message: + # Broadcast the message message = pycrdt.create_update_message(message_payload) self._broadcast_message(message, message_type="SyncUpdate") - # Save the file to disk. - # TODO: requires YRoomLoader implementation - return + # Save the file to disk + self.file_api.schedule_save() def handle_awareness_update(self, client_id: str, message: bytes) -> None: diff --git a/jupyter_rtc_core/rooms/yroom_file_api.py b/jupyter_rtc_core/rooms/yroom_file_api.py index df960c1..a9ce8c0 100644 --- a/jupyter_rtc_core/rooms/yroom_file_api.py +++ b/jupyter_rtc_core/rooms/yroom_file_api.py @@ -178,6 +178,7 @@ async def _process_scheduled_saves(self) -> None: file_format = self.file_format file_type = self.file_type if self.file_type in SAVEABLE_FILE_TYPES else "file" + # Save the YDoc via the ContentsManager await ensure_async(self._contents_manager.save( { "format": file_format, @@ -186,6 +187,10 @@ async def _process_scheduled_saves(self) -> None: }, path )) + + # Mark 'dirty' as `False`. This hides the "unsaved changes" icon + # in the JupyterLab tab rendering this YDoc in the frontend. + self.jupyter_ydoc.dirty = False except Exception as e: self.log.error("An exception occurred when saving JupyterYDoc.") self.log.exception(e) From 18d01c75d268d164ddd9aca6e7183622582cc81c Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Tue, 13 May 2025 16:04:11 -0700 Subject: [PATCH 7/9] clean up logging statements --- jupyter_rtc_core/rooms/yroom.py | 20 ++++++++++---------- jupyter_rtc_core/rooms/yroom_file_api.py | 1 - 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/jupyter_rtc_core/rooms/yroom.py b/jupyter_rtc_core/rooms/yroom.py index b6c4501..90b260a 100644 --- a/jupyter_rtc_core/rooms/yroom.py +++ b/jupyter_rtc_core/rooms/yroom.py @@ -159,28 +159,34 @@ async def _on_new_message(self) -> None: while True: try: client_id, message = await self._message_queue.get() - self.log.info(f"HANDLING NEW MESSAGE FROM '{client_id}'") - self.log.info(f"Message: {message}") except asyncio.QueueShutDown: break # Handle Awareness messages message_type = message[0] if message_type == YMessageType.AWARENESS: + self.log.debug(f"Received AwarenessUpdate from '{client_id}'.") self.handle_awareness_update(client_id, message) + self.log.debug(f"Handled AwarenessUpdate from '{client_id}'.") continue # Handle Sync messages assert message_type == YMessageType.SYNC message_subtype = message[1] if len(message) >= 2 else None if message_subtype == YSyncMessageSubtype.SYNC_STEP1: + self.log.info(f"Received SS1 from '{client_id}'.") self.handle_sync_step1(client_id, message) + self.log.info(f"Handled SS1 from '{client_id}'.") continue elif message_subtype == YSyncMessageSubtype.SYNC_STEP2: + self.log.info(f"Received SS2 from '{client_id}'.") self.handle_sync_step2(client_id, message) + self.log.info(f"Handled SS2 from '{client_id}'.") continue elif message_subtype == YSyncMessageSubtype.SYNC_UPDATE: + self.log.info(f"Received SyncUpdate from '{client_id}'.") self.handle_sync_update(client_id, message) + self.log.info(f"Handled SyncUpdate from '{client_id}'.") continue else: self.log.warning( @@ -201,8 +207,6 @@ def handle_sync_step1(self, client_id: str, message: bytes) -> None: - Sending the reply to the client over WS, and - Sending a new SyncStep1 message immediately after. """ - self.log.info(f"Handling SS1 message from client '{client_id}'.") - # Mark client as desynced new_client = self.clients.get(client_id) self.clients.mark_desynced(client_id) @@ -226,6 +230,7 @@ def handle_sync_step1(self, client_id: str, message: bytes) -> None: # TODO: remove the assert once websocket is made required assert isinstance(new_client.websocket, WebSocketHandler) new_client.websocket.write_message(sync_step2_message, binary=True) + self.log.info(f"Sent SS2 reply to client '{client_id}'.") except Exception as e: self.log.error( "An exception occurred when writing the SyncStep2 reply " @@ -235,21 +240,19 @@ def handle_sync_step1(self, client_id: str, message: bytes) -> None: return self.clients.mark_synced(client_id) - self.log.info(f"Sent SS2 reply to client '{client_id}'.") # Send SyncStep1 message try: assert isinstance(new_client.websocket, WebSocketHandler) sync_step1_message = pycrdt.create_sync_message(self._ydoc) new_client.websocket.write_message(sync_step1_message, binary=True) + self.log.info(f"Sent SS1 message to client '{client_id}'.") except Exception as e: self.log.error( "An exception occurred when writing a SyncStep1 message " f"to newly-synced client '{new_client.id}':" ) self.log.exception(e) - self.log.info(f"Sent SS1 message to client '{client_id}'.") - self.log.info(f"Message: {sync_step1_message}") def handle_sync_step2(self, client_id: str, message: bytes) -> None: @@ -261,7 +264,6 @@ def handle_sync_step2(self, client_id: str, message: bytes) -> None: clients after this method is called via the `self.write_sync_update()` observer. """ - self.log.info("HANDLING SS2 MESSAGE") try: message_payload = message[1:] pycrdt.handle_sync_message(message_payload, self._ydoc) @@ -282,7 +284,6 @@ def handle_sync_update(self, client_id: str, message: bytes) -> None: clients after this method is called via the `self.write_sync_update()` observer. """ - self.log.info("HANDLING SYNCUPDATE") # Remove client and kill websocket if received SyncUpdate when client is desynced if self._should_ignore_update(client_id, "SyncUpdate"): self.log.error(f"Should not receive SyncUpdate message when double handshake is not completed for client '{client_id}' and room '{self.room_id}'") @@ -321,7 +322,6 @@ def write_sync_update(self, message_payload: bytes, client_id: str | None = None def handle_awareness_update(self, client_id: str, message: bytes) -> None: - self.log.info("HANDLING AWARENESS UPDATE") # Apply the AwarenessUpdate message try: message_payload = pycrdt.read_message(message[1:]) diff --git a/jupyter_rtc_core/rooms/yroom_file_api.py b/jupyter_rtc_core/rooms/yroom_file_api.py index a9ce8c0..c05e959 100644 --- a/jupyter_rtc_core/rooms/yroom_file_api.py +++ b/jupyter_rtc_core/rooms/yroom_file_api.py @@ -139,7 +139,6 @@ async def _load_ydoc_content(self) -> None: self._ydoc_content_loaded.set() self._ydoc_content_loading = False self.log.info(f"Loaded content for room ID '{self.room_id}'.") - self.log.info(f"Content: {self.jupyter_ydoc.source}") def schedule_save(self) -> None: From 7b302cef37192dca265ee335df0657c9009fcc4f Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Tue, 13 May 2025 16:16:36 -0700 Subject: [PATCH 8/9] reject globalAwareness connections gracefully, fixes latency --- jupyter_rtc_core/websockets/yroom_ws.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/jupyter_rtc_core/websockets/yroom_ws.py b/jupyter_rtc_core/websockets/yroom_ws.py index 0981760..c91ba19 100644 --- a/jupyter_rtc_core/websockets/yroom_ws.py +++ b/jupyter_rtc_core/websockets/yroom_ws.py @@ -42,7 +42,8 @@ def prepare(self): # TODO: remove this once globalawareness is implemented if self.room_id == "JupyterLab:globalAwareness": - raise HTTPError(404) + self.close(1011) + return # Verify the file ID contained in the room ID points to a valid file. fileid = self.room_id.split(":")[-1] @@ -52,6 +53,11 @@ def prepare(self): def open(self, *_, **__): + # TODO: remove this later + if self.room_id == "JupyterLab:globalAwareness": + self.close(1011) + return + # Create the YRoom yroom = self.yroom_manager.get_room(self.room_id) if not yroom: @@ -63,10 +69,18 @@ def open(self, *_, **__): def on_message(self, message: bytes): + # TODO: remove this later + if self.room_id == "JupyterLab:globalAwareness": + return + # Route all messages to the YRoom for processing self.yroom.add_message(self.client_id, message) def on_close(self): - self.log.info("WEBSOCKET CLOSED") + # TODO: remove this later + if self.room_id == "JupyterLab:globalAwareness": + return + + self.log.info(f"Closed Websocket to client '{self.client_id}'.") self.yroom.clients.remove(self.client_id) From a0851a63f9a7b81d461f49e45d2c5720ef6f4e9f Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Tue, 13 May 2025 16:18:47 -0700 Subject: [PATCH 9/9] fix bug introduced by merge resolution --- jupyter_rtc_core/rooms/yroom_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jupyter_rtc_core/rooms/yroom_manager.py b/jupyter_rtc_core/rooms/yroom_manager.py index c9f243e..91dbba4 100644 --- a/jupyter_rtc_core/rooms/yroom_manager.py +++ b/jupyter_rtc_core/rooms/yroom_manager.py @@ -1,6 +1,6 @@ +from __future__ import annotations from typing import Any, Dict, Optional from traitlets import HasTraits, Instance, default -from __future__ import annotations from .yroom import YRoom from typing import TYPE_CHECKING