Skip to content

Commit 4d81264

Browse files
authored
Add basic implementation of YRoom (#17)
* initial WIP implementation of YRoom * add description to server extension * improve YRoom internal documentation * add jupyter_server_fileid to dependencies * add WIP interfaces of other required classes * clean up logging statements * fix mypy errors * replace get_others() with get_all() in YjsClientGroup * add double handshake & YDoc observer * do not raise an exc on SS2 messages
1 parent b2936a8 commit 4d81264

File tree

8 files changed

+488
-3
lines changed

8 files changed

+488
-3
lines changed

jupyter_rtc_core/app.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55

66
class RtcExtensionApp(ExtensionApp):
77
name = "jupyter_rtc_core"
8-
handlers = [
8+
app_name = "Collaboration"
9+
description = "A new implementation of real-time collaboration (RTC) in JupyterLab."
10+
11+
handlers = [ # type:ignore[assignment]
912
# dummy handler that verifies the server extension is installed;
1013
# this can be deleted prior to initial release.
1114
(r"jupyter-rtc-core/get-example/?", RouteHandler),
@@ -14,3 +17,7 @@ class RtcExtensionApp(ExtensionApp):
1417
# ydoc websocket
1518
(r"api/collaboration/room/(.*)", YRoomWebsocket)
1619
]
20+
21+
def initialize(self):
22+
super().initialize()
23+

jupyter_rtc_core/rooms/yroom.py

Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,301 @@
1+
from __future__ import annotations # see PEP-563 for motivation behind this
2+
from typing import TYPE_CHECKING
3+
from logging import Logger
4+
import asyncio
5+
from ..websockets import YjsClientGroup
6+
7+
import pycrdt
8+
from pycrdt import YMessageType, YSyncMessageType as YSyncMessageSubtype
9+
from tornado.websocket import WebSocketHandler
10+
11+
if TYPE_CHECKING:
12+
from typing import Literal, Tuple
13+
14+
class YRoom:
15+
ydoc: pycrdt.Doc
16+
awareness: pycrdt.Awareness
17+
loop: asyncio.AbstractEventLoop
18+
log: Logger
19+
_client_group: YjsClientGroup
20+
_message_queue: asyncio.Queue[Tuple[str, bytes]]
21+
22+
23+
def __init__(self, log: Logger, loop: asyncio.AbstractEventLoop):
24+
# Bind instance attributes
25+
self.log = log
26+
self.loop = loop
27+
28+
# Initialize YDoc, YAwareness, YjsClientGroup, and message queue
29+
self.ydoc = pycrdt.Doc()
30+
self.awareness = pycrdt.Awareness(ydoc=self.ydoc)
31+
self._client_group = YjsClientGroup()
32+
self._message_queue = asyncio.Queue()
33+
34+
# Start background task that routes new messages in the message queue
35+
# to the appropriate handler method.
36+
self.loop.create_task(self._on_new_message())
37+
38+
# Start observer on the `ydoc` to ensure new updates are broadcast to
39+
# all clients and saved to disk.
40+
self.ydoc.observe(lambda event: self.write_sync_update(event.update))
41+
42+
43+
@property
44+
def clients(self) -> YjsClientGroup:
45+
"""
46+
Returns the `YjsClientGroup` for this room, which provides an API for
47+
managing the set of clients connected to this room.
48+
"""
49+
50+
return self._client_group
51+
52+
53+
def add_client(self, websocket: WebSocketHandler) -> str:
54+
"""
55+
Creates a new client from the given Tornado WebSocketHandler and
56+
adds it to the room. Returns the ID of the new client.
57+
"""
58+
59+
return self.clients.add(websocket)
60+
61+
62+
def remove_client(self, client_id: str) -> None:
63+
"""Removes a client from the room, given the client ID."""
64+
65+
self.clients.remove(client_id)
66+
67+
68+
def add_message(self, client_id: str, message: bytes) -> None:
69+
"""
70+
Adds new message to the message queue. Items placed in the message queue
71+
are handled one-at-a-time.
72+
"""
73+
self._message_queue.put_nowait((client_id, message))
74+
75+
76+
async def _on_new_message(self) -> None:
77+
"""
78+
Async method that only runs when a new message arrives in the message
79+
queue. This method routes the message to a handler method based on the
80+
message type & subtype, which are obtained from the first 2 bytes of the
81+
message.
82+
"""
83+
while True:
84+
try:
85+
client_id, message = await self._message_queue.get()
86+
except asyncio.QueueShutDown:
87+
break
88+
89+
# Handle Awareness messages
90+
message_type = message[0]
91+
if message_type == YMessageType.AWARENESS:
92+
self.handle_awareness_update(client_id, message[1:])
93+
continue
94+
95+
# Handle Sync messages
96+
assert message_type == YMessageType.SYNC
97+
message_subtype = message[1] if len(message) >= 2 else None
98+
if message_subtype == YSyncMessageSubtype.SYNC_STEP1:
99+
self.handle_sync_step1(client_id, message)
100+
continue
101+
elif message_subtype == YSyncMessageSubtype.SYNC_STEP2:
102+
self.handle_sync_step2(client_id, message)
103+
continue
104+
elif message_subtype == YSyncMessageSubtype.SYNC_UPDATE:
105+
self.handle_sync_update(client_id, message)
106+
continue
107+
else:
108+
self.log.warning(
109+
"Ignoring an unrecognized message with header "
110+
f"'{message_type},{message_subtype}' from client "
111+
"'{client_id}'. Messages must have one of the following "
112+
"headers: '0,0' (SyncStep1), '0,1' (SyncStep2), "
113+
"'0,2' (SyncUpdate), or '1,*' (AwarenessUpdate)."
114+
)
115+
continue
116+
117+
118+
def handle_sync_step1(self, client_id: str, message: bytes) -> None:
119+
"""
120+
Handles SyncStep1 messages from new clients by:
121+
122+
- Computing a SyncStep2 reply,
123+
- Sending the reply to the client over WS, and
124+
- Sending a new SyncStep1 message immediately after.
125+
"""
126+
# Mark client as desynced
127+
new_client = self.clients.get(client_id, synced_only=False)
128+
self.clients.mark_desynced(client_id)
129+
130+
# Compute SyncStep2 reply
131+
try:
132+
message_payload = message[1:]
133+
sync_step2_message = pycrdt.handle_sync_message(message_payload, self.ydoc)
134+
assert isinstance(sync_step2_message, bytes)
135+
except Exception as e:
136+
self.log.error(
137+
"An exception occurred when computing the SyncStep2 reply "
138+
f"to new client '{new_client.id}':"
139+
)
140+
self.log.exception(e)
141+
return
142+
143+
# Write SyncStep2 reply to the client's WebSocket
144+
# Marks client as synced.
145+
try:
146+
# TODO: remove the assert once websocket is made required
147+
assert isinstance(new_client.websocket, WebSocketHandler)
148+
new_client.websocket.write_message(sync_step2_message)
149+
self.clients.mark_synced(client_id)
150+
except Exception as e:
151+
self.log.error(
152+
"An exception occurred when writing the SyncStep2 reply "
153+
f"to new client '{new_client.id}':"
154+
)
155+
self.log.exception(e)
156+
return
157+
158+
# Send SyncStep1 message
159+
try:
160+
assert isinstance(new_client.websocket, WebSocketHandler)
161+
sync_step1_message = pycrdt.create_sync_message(self.ydoc)
162+
new_client.websocket.write_message(sync_step1_message)
163+
except Exception as e:
164+
self.log.error(
165+
"An exception occurred when writing a SyncStep1 message "
166+
f"to newly-synced client '{new_client.id}':"
167+
)
168+
self.log.exception(e)
169+
170+
171+
def handle_sync_step2(self, client_id: str, message: bytes) -> None:
172+
"""
173+
Handles SyncStep2 messages from newly-synced clients by applying the
174+
SyncStep2 message to YDoc.
175+
176+
A SyncUpdate message will automatically be broadcast to all synced
177+
clients after this method is called via the `self.write_sync_update()`
178+
observer.
179+
"""
180+
try:
181+
message_payload = message[1:]
182+
pycrdt.handle_sync_message(message_payload, self.ydoc)
183+
except Exception as e:
184+
self.log.error(
185+
"An exception occurred when applying a SyncStep2 message "
186+
f"from client '{client_id}':"
187+
)
188+
self.log.exception(e)
189+
return
190+
191+
192+
def handle_sync_update(self, client_id: str, message: bytes) -> None:
193+
"""
194+
Handles incoming SyncUpdate messages by applying the update to the YDoc.
195+
196+
A SyncUpdate message will automatically be broadcast to all synced
197+
clients after this method is called via the `self.write_sync_update()`
198+
observer.
199+
"""
200+
# Ignore the message if client is desynced
201+
if self._should_ignore_update(client_id, "SyncUpdate"):
202+
return
203+
204+
# Apply the SyncUpdate to the YDoc
205+
try:
206+
message_payload = message[1:]
207+
pycrdt.handle_sync_message(message_payload, self.ydoc)
208+
except Exception as e:
209+
self.log.error(
210+
"An exception occurred when applying a SyncUpdate message "
211+
f"from client '{client_id}':"
212+
)
213+
self.log.exception(e)
214+
return
215+
216+
217+
def write_sync_update(self, message_payload: bytes, client_id: str | None = None) -> None:
218+
"""
219+
This method is an observer on `self.ydoc` which:
220+
221+
- Broadcasts a SyncUpdate message payload to all connected clients by
222+
writing to their respective WebSockets,
223+
224+
- Persists the contents of the updated YDoc by writing to disk.
225+
226+
This method can also be called manually.
227+
"""
228+
# Broadcast the message:
229+
message = pycrdt.create_update_message(message_payload)
230+
self._broadcast_message(message, message_type="SyncUpdate")
231+
232+
# Save the file to disk.
233+
# TODO: requires YRoomLoader implementation
234+
return
235+
236+
237+
def handle_awareness_update(self, client_id: str, message: bytes) -> None:
238+
# Ignore the message if client is desynced
239+
if self._should_ignore_update(client_id, "AwarenessUpdate"):
240+
return
241+
242+
# Apply the AwarenessUpdate message
243+
try:
244+
message_payload = message[1:]
245+
self.awareness.apply_awareness_update(message_payload, origin=self)
246+
except Exception as e:
247+
self.log.error(
248+
"An exception occurred when applying an AwarenessUpdate"
249+
f"message from client '{client_id}':"
250+
)
251+
self.log.exception(e)
252+
return
253+
254+
# Broadcast AwarenessUpdate message to all other synced clients
255+
self._broadcast_message(message, message_type="AwarenessUpdate")
256+
257+
258+
def _should_ignore_update(self, client_id: str, message_type: Literal['AwarenessUpdate', 'SyncUpdate']) -> bool:
259+
"""
260+
Returns whether a handler method should ignore an AwarenessUpdate or
261+
SyncUpdate message from a client because it is desynced. Automatically
262+
logs a warning if returning `True`. `message_type` is used to produce
263+
more readable warnings.
264+
"""
265+
266+
client = self.clients.get(client_id, synced_only=False)
267+
if not client.synced:
268+
self.log.warning(
269+
f"Ignoring a {message_type} message from client "
270+
f"'{client_id}' because the client is not synced."
271+
)
272+
return True
273+
274+
return False
275+
276+
277+
def _broadcast_message(self, message: bytes, message_type: Literal['AwarenessUpdate', 'SyncUpdate']):
278+
"""
279+
Broadcasts a given message from a given client to all other clients.
280+
This method automatically logs warnings when writing to a WebSocket
281+
fails. `message_type` is used to produce more readable warnings.
282+
"""
283+
clients = self.clients.get_all()
284+
for client in clients:
285+
try:
286+
# TODO: remove this assertion once websocket is made required
287+
assert isinstance(client.websocket, WebSocketHandler)
288+
client.websocket.write_message(message)
289+
except Exception as e:
290+
self.log.warning(
291+
f"An exception occurred when broadcasting a "
292+
f"{message_type} message "
293+
f"to client '{client.id}:'"
294+
)
295+
self.log.exception(e)
296+
297+
298+
def stop(self) -> None:
299+
# TODO: requires YRoomLoader implementation
300+
return
301+
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
"""
2+
WIP.
3+
4+
This file just contains interfaces to be filled out later.
5+
"""
6+
7+
from __future__ import annotations
8+
from typing import TYPE_CHECKING
9+
import asyncio
10+
11+
if TYPE_CHECKING:
12+
from jupyter_server_fileid.manager import BaseFileIdManager
13+
from jupyter_server.services.contents.manager import AsyncContentsManager, ContentsManager
14+
15+
class YRoomLoader:
16+
file_format: str
17+
file_type: str
18+
file_id: str
19+
20+
def __init__(
21+
self,
22+
file_format: str,
23+
file_type: str,
24+
file_id: str,
25+
file_id_manager: BaseFileIdManager,
26+
contents_manager: AsyncContentsManager | ContentsManager,
27+
loop: asyncio.AbstractEventLoop
28+
):
29+
# Bind instance attributes
30+
self.file_format = file_format
31+
self.file_type = file_type
32+
self.file_id = file_id
33+
self.file_id_manager = file_id_manager
34+
self.contents_manager = contents_manager
35+
self.loop = loop
36+
37+
async def _load(self) -> None:
38+
"""
39+
Loads the file from disk asynchronously. Uses the `FileIdManager`
40+
provided by `jupyter_server_fileid` to resolve the file ID to a path,
41+
then uses the ConfigManager to retrieve the contents of the file.
42+
43+
TODO
44+
"""
45+
return
46+
47+
def schedule_write(self) -> None:
48+
"""
49+
Schedules a write to the disk. If any other write is scheduled to the
50+
disk when this method is called, the other write is cancelled and
51+
replaced with this write request.
52+
53+
TODO
54+
"""
55+
return
56+

0 commit comments

Comments
 (0)