Skip to content

Commit 85c965a

Browse files
committed
fix Y protocol implementation, set binary=True when writing to WS handler
1 parent 3e4607b commit 85c965a

File tree

6 files changed

+51
-37
lines changed

6 files changed

+51
-37
lines changed

jupyter_rtc_core/app.py

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -45,24 +45,19 @@ def initialize_settings(self):
4545
# We cannot access the 'file_id_manager' key immediately because server
4646
# extensions initialize in alphabetical order. 'jupyter_rtc_core' <
4747
# 'jupyter_server_fileid'.
48-
# def get_fileid_manager():
49-
# self.log.info("IN GETTER")
50-
# for k, v in self.settings.items():
51-
# print(f"{k}: {v}")
52-
# print(len(self.settings.items()))
53-
# print(id(self.settings))
54-
# return self.settings["file_id_manager"]
55-
# contents_manager = self.serverapp.contents_manager
56-
# loop = asyncio.get_event_loop_policy().get_event_loop()
57-
# log = self.log
48+
def get_fileid_manager():
49+
return self.serverapp.web_app.settings["file_id_manager"]
50+
contents_manager = self.serverapp.contents_manager
51+
loop = asyncio.get_event_loop_policy().get_event_loop()
52+
log = self.log
5853

59-
# # Initialize YRoomManager
60-
# self.settings["yroom_manager"] = YRoomManager(
61-
# get_fileid_manager=get_fileid_manager,
62-
# contents_manager=contents_manager,
63-
# loop=loop,
64-
# log=log
65-
# )
54+
# Initialize YRoomManager
55+
self.settings["yroom_manager"] = YRoomManager(
56+
get_fileid_manager=get_fileid_manager,
57+
contents_manager=contents_manager,
58+
loop=loop,
59+
log=log
60+
)
6661
pass
6762

6863

jupyter_rtc_core/rooms/yroom.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class YRoom:
2525
"""
2626
The ID of the room. This is a composite ID following the format:
2727
28-
room_id := "{file_type}:{file_format}
28+
room_id := "{file_type}:{file_format}:{file_id}"
2929
"""
3030

3131
_jupyter_ydoc: YBaseDoc
@@ -95,6 +95,9 @@ def __init__(
9595
# messages in the message queue to the appropriate handler method.
9696
self._message_queue = asyncio.Queue()
9797
self._loop.create_task(self._on_new_message())
98+
99+
# Log notification that room is ready
100+
self.log.info(f"Room '{self.room_id}' initialized.")
98101

99102

100103
@property
@@ -156,13 +159,15 @@ async def _on_new_message(self) -> None:
156159
while True:
157160
try:
158161
client_id, message = await self._message_queue.get()
162+
self.log.info(f"HANDLING NEW MESSAGE FROM '{client_id}'")
163+
self.log.info(f"Message: {message}")
159164
except asyncio.QueueShutDown:
160165
break
161166

162167
# Handle Awareness messages
163168
message_type = message[0]
164169
if message_type == YMessageType.AWARENESS:
165-
self.handle_awareness_update(client_id, message[1:])
170+
self.handle_awareness_update(client_id, message)
166171
continue
167172

168173
# Handle Sync messages
@@ -196,6 +201,8 @@ def handle_sync_step1(self, client_id: str, message: bytes) -> None:
196201
- Sending the reply to the client over WS, and
197202
- Sending a new SyncStep1 message immediately after.
198203
"""
204+
self.log.info(f"Handling SS1 message from client '{client_id}'.")
205+
199206
# Mark client as desynced
200207
new_client = self.clients.get(client_id)
201208
self.clients.mark_desynced(client_id)
@@ -218,7 +225,7 @@ def handle_sync_step1(self, client_id: str, message: bytes) -> None:
218225
try:
219226
# TODO: remove the assert once websocket is made required
220227
assert isinstance(new_client.websocket, WebSocketHandler)
221-
new_client.websocket.write_message(sync_step2_message)
228+
new_client.websocket.write_message(sync_step2_message, binary=True)
222229
except Exception as e:
223230
self.log.error(
224231
"An exception occurred when writing the SyncStep2 reply "
@@ -228,18 +235,21 @@ def handle_sync_step1(self, client_id: str, message: bytes) -> None:
228235
return
229236

230237
self.clients.mark_synced(client_id)
238+
self.log.info(f"Sent SS2 reply to client '{client_id}'.")
231239

232240
# Send SyncStep1 message
233241
try:
234242
assert isinstance(new_client.websocket, WebSocketHandler)
235243
sync_step1_message = pycrdt.create_sync_message(self._ydoc)
236-
new_client.websocket.write_message(sync_step1_message)
244+
new_client.websocket.write_message(sync_step1_message, binary=True)
237245
except Exception as e:
238246
self.log.error(
239247
"An exception occurred when writing a SyncStep1 message "
240248
f"to newly-synced client '{new_client.id}':"
241249
)
242250
self.log.exception(e)
251+
self.log.info(f"Sent SS1 message to client '{client_id}'.")
252+
self.log.info(f"Message: {sync_step1_message}")
243253

244254

245255
def handle_sync_step2(self, client_id: str, message: bytes) -> None:
@@ -251,6 +261,7 @@ def handle_sync_step2(self, client_id: str, message: bytes) -> None:
251261
clients after this method is called via the `self.write_sync_update()`
252262
observer.
253263
"""
264+
self.log.info("HANDLING SS2 MESSAGE")
254265
try:
255266
message_payload = message[1:]
256267
pycrdt.handle_sync_message(message_payload, self._ydoc)
@@ -271,6 +282,7 @@ def handle_sync_update(self, client_id: str, message: bytes) -> None:
271282
clients after this method is called via the `self.write_sync_update()`
272283
observer.
273284
"""
285+
self.log.info("HANDLING SYNCUPDATE")
274286
# Remove client and kill websocket if received SyncUpdate when client is desynced
275287
if self._should_ignore_update(client_id, "SyncUpdate"):
276288
self.log.error(f"Should not receive SyncUpdate message when double handshake is not completed for client '{client_id}' and room '{self.room_id}'")
@@ -310,6 +322,7 @@ def write_sync_update(self, message_payload: bytes, client_id: str | None = None
310322

311323

312324
def handle_awareness_update(self, client_id: str, message: bytes) -> None:
325+
self.log.info("HANDLING AWARENESS UPDATE")
313326
# Apply the AwarenessUpdate message
314327
try:
315328
message_payload = pycrdt.read_message(message[1:])
@@ -356,7 +369,7 @@ def _broadcast_message(self, message: bytes, message_type: Literal['AwarenessUpd
356369
try:
357370
# TODO: remove this assertion once websocket is made required
358371
assert isinstance(client.websocket, WebSocketHandler)
359-
client.websocket.write_message(message)
372+
client.websocket.write_message(message, binary=True)
360373
except Exception as e:
361374
self.log.warning(
362375
f"An exception occurred when broadcasting a "

jupyter_rtc_core/rooms/yroom_file_api.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class YRoomFileAPI:
3333

3434
# See `filemanager.py` in `jupyter_server` for references on supported file
3535
# formats & file types.
36+
room_id: str
3637
file_format: Literal["text", "base64"]
3738
file_type: Literal["file", "notebook"]
3839
file_id: str
@@ -57,6 +58,7 @@ def __init__(
5758
loop: asyncio.AbstractEventLoop
5859
):
5960
# Bind instance attributes
61+
self.room_id = room_id
6062
self.file_format, self.file_type, self.file_id = room_id.split(":")
6163
self.jupyter_ydoc = jupyter_ydoc
6264
self.log = log
@@ -113,6 +115,8 @@ def load_ydoc_content(self) -> None:
113115
# Otherwise, set loading to `True` and start the loading task.
114116
if self._ydoc_content_loaded.is_set() or self._ydoc_content_loading:
115117
return
118+
119+
self.log.info(f"Loading content for room ID '{self.room_id}'.")
116120
self._ydoc_content_loading = True
117121
self._loop.create_task(self._load_ydoc_content())
118122

@@ -134,6 +138,8 @@ async def _load_ydoc_content(self) -> None:
134138
# Also set loading to `False` for consistency
135139
self._ydoc_content_loaded.set()
136140
self._ydoc_content_loading = False
141+
self.log.info(f"Loaded content for room ID '{self.room_id}'.")
142+
self.log.info(f"Content: {self.jupyter_ydoc.source}")
137143

138144

139145
def schedule_save(self) -> None:

jupyter_rtc_core/rooms/yroom_manager.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
if TYPE_CHECKING:
99
import asyncio
1010
import logging
11-
from typing import Callable
1211
from jupyter_server_fileid.manager import BaseFileIdManager
1312
from jupyter_server.services.contents.manager import AsyncContentsManager, ContentsManager
1413

@@ -18,20 +17,25 @@ class YRoomManager():
1817
def __init__(
1918
self,
2019
*,
21-
fileid_manager: BaseFileIdManager,
20+
get_fileid_manager: callable[[], BaseFileIdManager],
2221
contents_manager: AsyncContentsManager | ContentsManager,
2322
loop: asyncio.AbstractEventLoop,
2423
log: logging.Logger,
2524
):
2625
# Bind instance attributes
27-
self.fileid_manager = fileid_manager
26+
self._get_fileid_manager = get_fileid_manager
2827
self.contents_manager = contents_manager
2928
self.loop = loop
3029
self.log = log
3130
self._rooms_by_id = {}
3231
# Initialize dictionary of YRooms, keyed by room ID
3332

3433

34+
@property
35+
def fileid_manager(self) -> BaseFileIdManager:
36+
return self._get_fileid_manager()
37+
38+
3539
def get_room(self, room_id: str) -> YRoom | None:
3640
"""
3741
Retrieves a YRoom given a room ID. If the YRoom does not exist, this
@@ -44,6 +48,7 @@ def get_room(self, room_id: str) -> YRoom | None:
4448

4549
# Otherwise, create a new room
4650
try:
51+
self.log.info(f"Initializing room '{room_id}'.")
4752
yroom = YRoom(
4853
room_id=room_id,
4954
log=self.log,

jupyter_rtc_core/websockets/clients.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def __init__(self, *, room_id: str, log: Logger, loop: asyncio.AbstractEventLoop
7777
self.desynced: dict[str, YjsClient] = {}
7878
self.log = log
7979
self.loop = loop
80-
self.loop.create_task(self._clean_desynced())
80+
# self.loop.create_task(self._clean_desynced())
8181
self._poll_interval_seconds = poll_interval_seconds
8282
self.desynced_timeout_seconds = desynced_timeout_seconds
8383

jupyter_rtc_core/websockets/yroom_ws.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,16 @@ class YRoomWebsocket(WebSocketHandler):
1515
yroom: YRoom
1616
room_id: str
1717
client_id: str
18+
# TODO: change this. we should pass `self.log` from our
19+
# `ExtensionApp` to log messages w/ "RtcCoreExtension" prefix
20+
log = logging.Logger("TEMP")
21+
1822

1923
@property
2024
def yroom_manager(self) -> YRoomManager:
21-
if "yroom_manager" not in self.settings:
22-
self.settings["yroom_manager"] = YRoomManager(
23-
fileid_manager=self.fileid_manager,
24-
contents_manager=self.contents_manager,
25-
loop=asyncio.get_event_loop_policy().get_event_loop(),
26-
# TODO: change this. we should pass `self.log` from our
27-
# `ExtensionApp` to log messages w/ "RtcCoreExtension" prefix
28-
log=logging.Logger("TEMP")
29-
30-
)
3125
return self.settings["yroom_manager"]
3226

27+
3328
@property
3429
def fileid_manager(self) -> BaseFileIdManager:
3530
return self.settings["file_id_manager"]
@@ -69,9 +64,9 @@ def open(self, *_, **__):
6964

7065
def on_message(self, message: bytes):
7166
# Route all messages to the YRoom for processing
72-
print(message)
7367
self.yroom.add_message(self.client_id, message)
7468

7569

7670
def on_close(self):
71+
self.log.info("WEBSOCKET CLOSED")
7772
self.yroom.clients.remove(self.client_id)

0 commit comments

Comments
 (0)