Skip to content

Commit f584002

Browse files
authored
Reach functional backend using jupyter_collaboration frontend (#29)
* add 2nd dev env with JCollab frontend * enable YRoomWebsocket handler * first version receiving Yjs messages * fix Y protocol implementation, set binary=True when writing to WS handler * add untitled files to gitignore * implement file auto-save * clean up logging statements * reject globalAwareness connections gracefully, fixes latency * fix bug introduced by merge resolution
1 parent 04c7852 commit f584002

File tree

10 files changed

+138
-32
lines changed

10 files changed

+138
-32
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,7 @@ dmypy.json
123123

124124
# Yarn cache
125125
.yarn/
126+
127+
# Ignore files used to test in real-time
128+
untitled*
129+
Untitled*

devenv-jcollab-frontend.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
name: rtccore-jcollab-frontend
2+
channels:
3+
- conda-forge
4+
dependencies:
5+
- python
6+
- nodejs=22
7+
- uv
8+
- jupyterlab
9+
- pip:
10+
- jupyter_docprovider>=2.0.2,<3
11+
- jupyter_collaboration_ui>=2.0.2,<3

jupyter_rtc_core/app.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@
22
from traitlets.config import Config
33
import asyncio
44

5-
from traitlets import Instance
6-
from traitlets import Type
7-
from .handlers import RouteHandler
5+
from traitlets import Instance, Type
6+
from .handlers import RouteHandler, YRoomSessionHandler
87
from .websockets import GlobalAwarenessWebsocket, YRoomWebsocket
98
from .rooms.yroom_manager import YRoomManager
109

@@ -20,7 +19,9 @@ class RtcExtensionApp(ExtensionApp):
2019
# global awareness websocket
2120
# (r"api/collaboration/room/JupyterLab:globalAwareness/?", GlobalAwarenessWebsocket),
2221
# # ydoc websocket
23-
# (r"api/collaboration/room/(.*)", YRoomWebsocket)
22+
(r"api/collaboration/room/(.*)", YRoomWebsocket),
23+
# handler that just adds compatibility with Jupyter Collaboration's frontend
24+
(r"api/collaboration/session/(.*)", YRoomSessionHandler)
2425
]
2526

2627
yroom_manager_class = Type(
@@ -44,7 +45,8 @@ def initialize_settings(self):
4445
# We cannot access the 'file_id_manager' key immediately because server
4546
# extensions initialize in alphabetical order. 'jupyter_rtc_core' <
4647
# 'jupyter_server_fileid'.
47-
get_fileid_manager = lambda: self.settings["file_id_manager"]
48+
def get_fileid_manager():
49+
return self.serverapp.web_app.settings["file_id_manager"]
4850
contents_manager = self.serverapp.contents_manager
4951
loop = asyncio.get_event_loop_policy().get_event_loop()
5052
log = self.log
@@ -56,6 +58,7 @@ def initialize_settings(self):
5658
loop=loop,
5759
log=log
5860
)
61+
pass
5962

6063

6164
def _link_jupyter_server_extension(self, server_app):

jupyter_rtc_core/handlers.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import uuid
23

34
from jupyter_server.base.handlers import APIHandler
45
import tornado
@@ -14,4 +15,33 @@ def get(self):
1415
}))
1516

1617

18+
# TODO: remove this by v1.0.0 if deemed unnecessary. Just adding this for
19+
# compatibility with the `jupyter_collaboration` frontend.
20+
class YRoomSessionHandler(APIHandler):
21+
SESSION_ID = str(uuid.uuid4())
22+
23+
@tornado.web.authenticated
24+
def put(self, path):
25+
body = json.loads(self.request.body)
26+
format = body["format"]
27+
content_type = body["type"]
28+
# self.log.info("IN HANDLER")
29+
# for k, v in self.settings.items():
30+
# print(f"{k}: {v}")
31+
# print(len(self.settings.items()))
32+
# print(id(self.settings))
33+
34+
file_id_manager = self.settings["file_id_manager"]
35+
file_id = file_id_manager.index(path)
36+
37+
data = json.dumps(
38+
{
39+
"format": format,
40+
"type": content_type,
41+
"fileId": file_id,
42+
"sessionId": self.SESSION_ID,
43+
}
44+
)
45+
self.set_status(200)
46+
self.finish(data)
1747

jupyter_rtc_core/rooms/yroom.py

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from .yroom_file_api import YRoomFileAPI
1313

1414
if TYPE_CHECKING:
15-
from typing import Literal, Tuple
15+
from typing import Literal, Tuple, Any
1616
from jupyter_server_fileid.manager import BaseFileIdManager
1717
from jupyter_server.services.contents.manager import AsyncContentsManager, ContentsManager
1818

@@ -22,7 +22,11 @@ class YRoom:
2222
log: Logger
2323
"""Log object"""
2424
room_id: str
25-
"""Room Id"""
25+
"""
26+
The ID of the room. This is a composite ID following the format:
27+
28+
room_id := "{file_type}:{file_format}:{file_id}"
29+
"""
2630

2731
_jupyter_ydoc: YBaseDoc
2832
"""JupyterYDoc"""
@@ -60,9 +64,10 @@ def __init__(
6064
self._client_group = YjsClientGroup(room_id=room_id, log=self.log, loop=self._loop)
6165
self._ydoc = pycrdt.Doc()
6266
self._awareness = pycrdt.Awareness(ydoc=self._ydoc)
67+
_, file_type, _ = self.room_id.split(":")
6368
JupyterYDocClass = cast(
6469
type[YBaseDoc],
65-
jupyter_ydoc_classes.get(self.file_type, jupyter_ydoc_classes["file"])
70+
jupyter_ydoc_classes.get(file_type, jupyter_ydoc_classes["file"])
6671
)
6772
self.jupyter_ydoc = JupyterYDocClass(ydoc=self._ydoc, awareness=self._awareness)
6873

@@ -90,6 +95,9 @@ def __init__(
9095
# messages in the message queue to the appropriate handler method.
9196
self._message_queue = asyncio.Queue()
9297
self._loop.create_task(self._on_new_message())
98+
99+
# Log notification that room is ready
100+
self.log.info(f"Room '{self.room_id}' initialized.")
93101

94102

95103
@property
@@ -157,20 +165,28 @@ async def _on_new_message(self) -> None:
157165
# Handle Awareness messages
158166
message_type = message[0]
159167
if message_type == YMessageType.AWARENESS:
160-
self.handle_awareness_update(client_id, message[1:])
168+
self.log.debug(f"Received AwarenessUpdate from '{client_id}'.")
169+
self.handle_awareness_update(client_id, message)
170+
self.log.debug(f"Handled AwarenessUpdate from '{client_id}'.")
161171
continue
162172

163173
# Handle Sync messages
164174
assert message_type == YMessageType.SYNC
165175
message_subtype = message[1] if len(message) >= 2 else None
166176
if message_subtype == YSyncMessageSubtype.SYNC_STEP1:
177+
self.log.info(f"Received SS1 from '{client_id}'.")
167178
self.handle_sync_step1(client_id, message)
179+
self.log.info(f"Handled SS1 from '{client_id}'.")
168180
continue
169181
elif message_subtype == YSyncMessageSubtype.SYNC_STEP2:
182+
self.log.info(f"Received SS2 from '{client_id}'.")
170183
self.handle_sync_step2(client_id, message)
184+
self.log.info(f"Handled SS2 from '{client_id}'.")
171185
continue
172186
elif message_subtype == YSyncMessageSubtype.SYNC_UPDATE:
187+
self.log.info(f"Received SyncUpdate from '{client_id}'.")
173188
self.handle_sync_update(client_id, message)
189+
self.log.info(f"Handled SyncUpdate from '{client_id}'.")
174190
continue
175191
else:
176192
self.log.warning(
@@ -213,7 +229,8 @@ def handle_sync_step1(self, client_id: str, message: bytes) -> None:
213229
try:
214230
# TODO: remove the assert once websocket is made required
215231
assert isinstance(new_client.websocket, WebSocketHandler)
216-
new_client.websocket.write_message(sync_step2_message)
232+
new_client.websocket.write_message(sync_step2_message, binary=True)
233+
self.log.info(f"Sent SS2 reply to client '{client_id}'.")
217234
except Exception as e:
218235
self.log.error(
219236
"An exception occurred when writing the SyncStep2 reply "
@@ -228,7 +245,8 @@ def handle_sync_step1(self, client_id: str, message: bytes) -> None:
228245
try:
229246
assert isinstance(new_client.websocket, WebSocketHandler)
230247
sync_step1_message = pycrdt.create_sync_message(self._ydoc)
231-
new_client.websocket.write_message(sync_step1_message)
248+
new_client.websocket.write_message(sync_step1_message, binary=True)
249+
self.log.info(f"Sent SS1 message to client '{client_id}'.")
232250
except Exception as e:
233251
self.log.error(
234252
"An exception occurred when writing a SyncStep1 message "
@@ -295,23 +313,22 @@ def write_sync_update(self, message_payload: bytes, client_id: str | None = None
295313
296314
This method can also be called manually.
297315
"""
298-
# Broadcast the message:
316+
# Broadcast the message
299317
message = pycrdt.create_update_message(message_payload)
300318
self._broadcast_message(message, message_type="SyncUpdate")
301319

302-
# Save the file to disk.
303-
# TODO: requires YRoomLoader implementation
304-
return
320+
# Save the file to disk
321+
self.file_api.schedule_save()
305322

306323

307324
def handle_awareness_update(self, client_id: str, message: bytes) -> None:
308325
# Apply the AwarenessUpdate message
309326
try:
310-
message_payload = message[1:]
327+
message_payload = pycrdt.read_message(message[1:])
311328
self._awareness.apply_awareness_update(message_payload, origin=self)
312329
except Exception as e:
313330
self.log.error(
314-
"An exception occurred when applying an AwarenessUpdate"
331+
"An exception occurred when applying an AwarenessUpdate "
315332
f"message from client '{client_id}':"
316333
)
317334
self.log.exception(e)
@@ -351,7 +368,7 @@ def _broadcast_message(self, message: bytes, message_type: Literal['AwarenessUpd
351368
try:
352369
# TODO: remove this assertion once websocket is made required
353370
assert isinstance(client.websocket, WebSocketHandler)
354-
client.websocket.write_message(message)
371+
client.websocket.write_message(message, binary=True)
355372
except Exception as e:
356373
self.log.warning(
357374
f"An exception occurred when broadcasting a "

jupyter_rtc_core/rooms/yroom_file_api.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class YRoomFileAPI:
3333

3434
# See `filemanager.py` in `jupyter_server` for references on supported file
3535
# formats & file types.
36+
room_id: str
3637
file_format: Literal["text", "base64"]
3738
file_type: Literal["file", "notebook"]
3839
file_id: str
@@ -57,6 +58,7 @@ def __init__(
5758
loop: asyncio.AbstractEventLoop
5859
):
5960
# Bind instance attributes
61+
self.room_id = room_id
6062
self.file_format, self.file_type, self.file_id = room_id.split(":")
6163
self.jupyter_ydoc = jupyter_ydoc
6264
self.log = log
@@ -113,6 +115,8 @@ def load_ydoc_content(self) -> None:
113115
# Otherwise, set loading to `True` and start the loading task.
114116
if self._ydoc_content_loaded.is_set() or self._ydoc_content_loading:
115117
return
118+
119+
self.log.info(f"Loading content for room ID '{self.room_id}'.")
116120
self._ydoc_content_loading = True
117121
self._loop.create_task(self._load_ydoc_content())
118122

@@ -134,6 +138,7 @@ async def _load_ydoc_content(self) -> None:
134138
# Also set loading to `False` for consistency
135139
self._ydoc_content_loaded.set()
136140
self._ydoc_content_loading = False
141+
self.log.info(f"Loaded content for room ID '{self.room_id}'.")
137142

138143

139144
def schedule_save(self) -> None:
@@ -172,6 +177,7 @@ async def _process_scheduled_saves(self) -> None:
172177
file_format = self.file_format
173178
file_type = self.file_type if self.file_type in SAVEABLE_FILE_TYPES else "file"
174179

180+
# Save the YDoc via the ContentsManager
175181
await ensure_async(self._contents_manager.save(
176182
{
177183
"format": file_format,
@@ -180,6 +186,10 @@ async def _process_scheduled_saves(self) -> None:
180186
},
181187
path
182188
))
189+
190+
# Mark 'dirty' as `False`. This hides the "unsaved changes" icon
191+
# in the JupyterLab tab rendering this YDoc in the frontend.
192+
self.jupyter_ydoc.dirty = False
183193
except Exception as e:
184194
self.log.error("An exception occurred when saving JupyterYDoc.")
185195
self.log.exception(e)

jupyter_rtc_core/rooms/yroom_manager.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
1+
from __future__ import annotations
12
from typing import Any, Dict, Optional
23
from traitlets import HasTraits, Instance, default
3-
from __future__ import annotations
44

55
from .yroom import YRoom
66
from typing import TYPE_CHECKING
77

88
if TYPE_CHECKING:
99
import asyncio
1010
import logging
11-
from typing import Callable
1211
from jupyter_server_fileid.manager import BaseFileIdManager
1312
from jupyter_server.services.contents.manager import AsyncContentsManager, ContentsManager
1413

@@ -18,7 +17,7 @@ class YRoomManager():
1817
def __init__(
1918
self,
2019
*,
21-
get_fileid_manager: Callable[[], BaseFileIdManager],
20+
get_fileid_manager: callable[[], BaseFileIdManager],
2221
contents_manager: AsyncContentsManager | ContentsManager,
2322
loop: asyncio.AbstractEventLoop,
2423
log: logging.Logger,
@@ -35,7 +34,7 @@ def __init__(
3534
@property
3635
def fileid_manager(self) -> BaseFileIdManager:
3736
return self._get_fileid_manager()
38-
37+
3938

4039
def get_room(self, room_id: str) -> YRoom | None:
4140
"""
@@ -49,6 +48,7 @@ def get_room(self, room_id: str) -> YRoom | None:
4948

5049
# Otherwise, create a new room
5150
try:
51+
self.log.info(f"Initializing room '{room_id}'.")
5252
yroom = YRoom(
5353
room_id=room_id,
5454
log=self.log,
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
from .global_awareness_ws import GlobalAwarenessWebsocket
2-
from .yroom_ws import YRoomWebsocket
32
from .clients import YjsClient, YjsClientGroup
3+
from .yroom_ws import YRoomWebsocket

jupyter_rtc_core/websockets/clients.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55
"""
66

77
from __future__ import annotations
8-
from datetime import timedelta, timezone
9-
import datetime
8+
from datetime import timedelta, timezone, datetime
109
from logging import Logger
1110
from typing import TYPE_CHECKING
1211
import uuid
@@ -43,7 +42,7 @@ def synced(self):
4342

4443
@synced.setter
4544
def synced(self, v: bool):
46-
self.synced = v
45+
self._synced = v
4746
self.last_modified = datetime.now(timezone.utc)
4847

4948
class YjsClientGroup:
@@ -67,7 +66,7 @@ class YjsClientGroup:
6766
"""Log object"""
6867
loop: asyncio.AbstractEventLoop
6968
"""Event loop"""
70-
poll_interval_seconds: int
69+
_poll_interval_seconds: int
7170
"""The poll time interval used while auto removing desynced clients"""
7271
desynced_timeout_seconds: int
7372
"""The max time period in seconds that a desynced client does not become synced before get auto removed from desynced dict"""
@@ -78,8 +77,8 @@ def __init__(self, *, room_id: str, log: Logger, loop: asyncio.AbstractEventLoop
7877
self.desynced: dict[str, YjsClient] = {}
7978
self.log = log
8079
self.loop = loop
81-
self.loop.create_task(self._clean_desynced())
82-
self.poll_interval_seconds = poll_interval_seconds
80+
# self.loop.create_task(self._clean_desynced())
81+
self._poll_interval_seconds = poll_interval_seconds
8382
self.desynced_timeout_seconds = desynced_timeout_seconds
8483

8584
def add(self, websocket: WebSocketHandler) -> str:

0 commit comments

Comments
 (0)