Skip to content

Commit 8b748dd

Browse files
committed
fix Y protocol implementation, set binary=True when writing to WS handler
1 parent 3c80ae8 commit 8b748dd

File tree

6 files changed

+52
-38
lines changed

6 files changed

+52
-38
lines changed

jupyter_rtc_core/app.py

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,24 +32,19 @@ def initialize_settings(self):
3232
# We cannot access the 'file_id_manager' key immediately because server
3333
# extensions initialize in alphabetical order. 'jupyter_rtc_core' <
3434
# 'jupyter_server_fileid'.
35-
# def get_fileid_manager():
36-
# self.log.info("IN GETTER")
37-
# for k, v in self.settings.items():
38-
# print(f"{k}: {v}")
39-
# print(len(self.settings.items()))
40-
# print(id(self.settings))
41-
# return self.settings["file_id_manager"]
42-
# contents_manager = self.serverapp.contents_manager
43-
# loop = asyncio.get_event_loop_policy().get_event_loop()
44-
# log = self.log
45-
46-
# # Initialize YRoomManager
47-
# self.settings["yroom_manager"] = YRoomManager(
48-
# get_fileid_manager=get_fileid_manager,
49-
# contents_manager=contents_manager,
50-
# loop=loop,
51-
# log=log
52-
# )
35+
def get_fileid_manager():
36+
return self.serverapp.web_app.settings["file_id_manager"]
37+
contents_manager = self.serverapp.contents_manager
38+
loop = asyncio.get_event_loop_policy().get_event_loop()
39+
log = self.log
40+
41+
# Initialize YRoomManager
42+
self.settings["yroom_manager"] = YRoomManager(
43+
get_fileid_manager=get_fileid_manager,
44+
contents_manager=contents_manager,
45+
loop=loop,
46+
log=log
47+
)
5348
pass
5449

5550

jupyter_rtc_core/rooms/yroom.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class YRoom:
2525
"""
2626
The ID of the room. This is a composite ID following the format:
2727
28-
room_id := "{file_type}:{file_format}
28+
room_id := "{file_type}:{file_format}:{file_id}"
2929
"""
3030

3131
_jupyter_ydoc: YBaseDoc
@@ -95,6 +95,9 @@ def __init__(
9595
# messages in the message queue to the appropriate handler method.
9696
self._message_queue = asyncio.Queue()
9797
self._loop.create_task(self._on_new_message())
98+
99+
# Log notification that room is ready
100+
self.log.info(f"Room '{self.room_id}' initialized.")
98101

99102

100103
@property
@@ -156,13 +159,15 @@ async def _on_new_message(self) -> None:
156159
while True:
157160
try:
158161
client_id, message = await self._message_queue.get()
162+
self.log.info(f"HANDLING NEW MESSAGE FROM '{client_id}'")
163+
self.log.info(f"Message: {message}")
159164
except asyncio.QueueShutDown:
160165
break
161166

162167
# Handle Awareness messages
163168
message_type = message[0]
164169
if message_type == YMessageType.AWARENESS:
165-
self.handle_awareness_update(client_id, message[1:])
170+
self.handle_awareness_update(client_id, message)
166171
continue
167172

168173
# Handle Sync messages
@@ -196,6 +201,8 @@ def handle_sync_step1(self, client_id: str, message: bytes) -> None:
196201
- Sending the reply to the client over WS, and
197202
- Sending a new SyncStep1 message immediately after.
198203
"""
204+
self.log.info(f"Handling SS1 message from client '{client_id}'.")
205+
199206
# Mark client as desynced
200207
new_client = self.clients.get(client_id)
201208
self.clients.mark_desynced(client_id)
@@ -218,7 +225,7 @@ def handle_sync_step1(self, client_id: str, message: bytes) -> None:
218225
try:
219226
# TODO: remove the assert once websocket is made required
220227
assert isinstance(new_client.websocket, WebSocketHandler)
221-
new_client.websocket.write_message(sync_step2_message)
228+
new_client.websocket.write_message(sync_step2_message, binary=True)
222229
except Exception as e:
223230
self.log.error(
224231
"An exception occurred when writing the SyncStep2 reply "
@@ -228,18 +235,21 @@ def handle_sync_step1(self, client_id: str, message: bytes) -> None:
228235
return
229236

230237
self.clients.mark_synced(client_id)
238+
self.log.info(f"Sent SS2 reply to client '{client_id}'.")
231239

232240
# Send SyncStep1 message
233241
try:
234242
assert isinstance(new_client.websocket, WebSocketHandler)
235243
sync_step1_message = pycrdt.create_sync_message(self._ydoc)
236-
new_client.websocket.write_message(sync_step1_message)
244+
new_client.websocket.write_message(sync_step1_message, binary=True)
237245
except Exception as e:
238246
self.log.error(
239247
"An exception occurred when writing a SyncStep1 message "
240248
f"to newly-synced client '{new_client.id}':"
241249
)
242250
self.log.exception(e)
251+
self.log.info(f"Sent SS1 message to client '{client_id}'.")
252+
self.log.info(f"Message: {sync_step1_message}")
243253

244254

245255
def handle_sync_step2(self, client_id: str, message: bytes) -> None:
@@ -251,6 +261,7 @@ def handle_sync_step2(self, client_id: str, message: bytes) -> None:
251261
clients after this method is called via the `self.write_sync_update()`
252262
observer.
253263
"""
264+
self.log.info("HANDLING SS2 MESSAGE")
254265
try:
255266
message_payload = message[1:]
256267
pycrdt.handle_sync_message(message_payload, self._ydoc)
@@ -271,6 +282,7 @@ def handle_sync_update(self, client_id: str, message: bytes) -> None:
271282
clients after this method is called via the `self.write_sync_update()`
272283
observer.
273284
"""
285+
self.log.info("HANDLING SYNCUPDATE")
274286
# Remove client and kill websocket if received SyncUpdate when client is desynced
275287
if self._should_ignore_update(client_id, "SyncUpdate"):
276288
self.log.error(f"Should not receive SyncUpdate message when double handshake is not completed for client '{client_id}' and room '{self.room_id}'")
@@ -310,6 +322,7 @@ def write_sync_update(self, message_payload: bytes, client_id: str | None = None
310322

311323

312324
def handle_awareness_update(self, client_id: str, message: bytes) -> None:
325+
self.log.info("HANDLING AWARENESS UPDATE")
313326
# Apply the AwarenessUpdate message
314327
try:
315328
message_payload = pycrdt.read_message(message[1:])
@@ -356,7 +369,7 @@ def _broadcast_message(self, message: bytes, message_type: Literal['AwarenessUpd
356369
try:
357370
# TODO: remove this assertion once websocket is made required
358371
assert isinstance(client.websocket, WebSocketHandler)
359-
client.websocket.write_message(message)
372+
client.websocket.write_message(message, binary=True)
360373
except Exception as e:
361374
self.log.warning(
362375
f"An exception occurred when broadcasting a "

jupyter_rtc_core/rooms/yroom_file_api.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class YRoomFileAPI:
3333

3434
# See `filemanager.py` in `jupyter_server` for references on supported file
3535
# formats & file types.
36+
room_id: str
3637
file_format: Literal["text", "base64"]
3738
file_type: Literal["file", "notebook"]
3839
file_id: str
@@ -57,6 +58,7 @@ def __init__(
5758
loop: asyncio.AbstractEventLoop
5859
):
5960
# Bind instance attributes
61+
self.room_id = room_id
6062
self.file_format, self.file_type, self.file_id = room_id.split(":")
6163
self.jupyter_ydoc = jupyter_ydoc
6264
self.log = log
@@ -113,6 +115,8 @@ def load_ydoc_content(self) -> None:
113115
# Otherwise, set loading to `True` and start the loading task.
114116
if self._ydoc_content_loaded.is_set() or self._ydoc_content_loading:
115117
return
118+
119+
self.log.info(f"Loading content for room ID '{self.room_id}'.")
116120
self._ydoc_content_loading = True
117121
self._loop.create_task(self._load_ydoc_content())
118122

@@ -134,6 +138,8 @@ async def _load_ydoc_content(self) -> None:
134138
# Also set loading to `False` for consistency
135139
self._ydoc_content_loaded.set()
136140
self._ydoc_content_loading = False
141+
self.log.info(f"Loaded content for room ID '{self.room_id}'.")
142+
self.log.info(f"Content: {self.jupyter_ydoc.source}")
137143

138144

139145
def schedule_save(self) -> None:

jupyter_rtc_core/rooms/yroom_manager.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
if TYPE_CHECKING:
66
import asyncio
77
import logging
8-
from typing import Callable
98
from jupyter_server_fileid.manager import BaseFileIdManager
109
from jupyter_server.services.contents.manager import AsyncContentsManager, ContentsManager
1110

@@ -15,13 +14,13 @@ class YRoomManager:
1514
def __init__(
1615
self,
1716
*,
18-
fileid_manager: BaseFileIdManager,
17+
get_fileid_manager: callable[[], BaseFileIdManager],
1918
contents_manager: AsyncContentsManager | ContentsManager,
2019
loop: asyncio.AbstractEventLoop,
2120
log: logging.Logger,
2221
):
2322
# Bind instance attributes
24-
self.fileid_manager = fileid_manager
23+
self._get_fileid_manager = get_fileid_manager
2524
self.contents_manager = contents_manager
2625
self.loop = loop
2726
self.log = log
@@ -30,6 +29,11 @@ def __init__(
3029
self._rooms_by_id = {}
3130

3231

32+
@property
33+
def fileid_manager(self) -> BaseFileIdManager:
34+
return self._get_fileid_manager()
35+
36+
3337
def get_room(self, room_id: str) -> YRoom | None:
3438
"""
3539
Retrieves a YRoom given a room ID. If the YRoom does not exist, this
@@ -42,6 +46,7 @@ def get_room(self, room_id: str) -> YRoom | None:
4246

4347
# Otherwise, create a new room
4448
try:
49+
self.log.info(f"Initializing room '{room_id}'.")
4550
yroom = YRoom(
4651
room_id=room_id,
4752
log=self.log,

jupyter_rtc_core/websockets/clients.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def __init__(self, *, room_id: str, log: Logger, loop: asyncio.AbstractEventLoop
7777
self.desynced: dict[str, YjsClient] = {}
7878
self.log = log
7979
self.loop = loop
80-
self.loop.create_task(self._clean_desynced())
80+
# self.loop.create_task(self._clean_desynced())
8181
self._poll_interval_seconds = poll_interval_seconds
8282
self.desynced_timeout_seconds = desynced_timeout_seconds
8383

jupyter_rtc_core/websockets/yroom_ws.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,16 @@ class YRoomWebsocket(WebSocketHandler):
1515
yroom: YRoom
1616
room_id: str
1717
client_id: str
18+
# TODO: change this. we should pass `self.log` from our
19+
# `ExtensionApp` to log messages w/ "RtcCoreExtension" prefix
20+
log = logging.Logger("TEMP")
21+
1822

1923
@property
2024
def yroom_manager(self) -> YRoomManager:
21-
if "yroom_manager" not in self.settings:
22-
self.settings["yroom_manager"] = YRoomManager(
23-
fileid_manager=self.fileid_manager,
24-
contents_manager=self.contents_manager,
25-
loop=asyncio.get_event_loop_policy().get_event_loop(),
26-
# TODO: change this. we should pass `self.log` from our
27-
# `ExtensionApp` to log messages w/ "RtcCoreExtension" prefix
28-
log=logging.Logger("TEMP")
29-
30-
)
3125
return self.settings["yroom_manager"]
3226

27+
3328
@property
3429
def fileid_manager(self) -> BaseFileIdManager:
3530
return self.settings["file_id_manager"]
@@ -69,9 +64,9 @@ def open(self, *_, **__):
6964

7065
def on_message(self, message: bytes):
7166
# Route all messages to the YRoom for processing
72-
print(message)
7367
self.yroom.add_message(self.client_id, message)
7468

7569

7670
def on_close(self):
71+
self.log.info("WEBSOCKET CLOSED")
7772
self.yroom.clients.remove(self.client_id)

0 commit comments

Comments
 (0)