Skip to content

Commit 98482ff

Browse files
authored
Add YRoomWebsocket and YRoomManager (#27)
* add yroom websocket * fix add_message() call and impl prepare() * add YRoomManager * fix runtime errors caused by changes * fix import in kernels module to allow JL to start * load fileid_manager lazily to allow JL to start
1 parent 44b5024 commit 98482ff

File tree

6 files changed

+158
-27
lines changed

6 files changed

+158
-27
lines changed

jupyter_rtc_core/app.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
from jupyter_server.extension.application import ExtensionApp
22
from traitlets.config import Config
3+
import asyncio
34

45
from .handlers import RouteHandler
56
from .websockets import GlobalAwarenessWebsocket, YRoomWebsocket
7+
from .rooms import YRoomManager
68

79
class RtcExtensionApp(ExtensionApp):
810
name = "jupyter_rtc_core"
@@ -21,6 +23,25 @@ class RtcExtensionApp(ExtensionApp):
2123

2224
def initialize(self):
2325
super().initialize()
26+
27+
28+
def initialize_settings(self):
29+
# Get YRoomManager arguments from server extension context.
30+
# We cannot access the 'file_id_manager' key immediately because server
31+
# extensions initialize in alphabetical order. 'jupyter_rtc_core' <
32+
# 'jupyter_server_fileid'.
33+
get_fileid_manager = lambda: self.settings["file_id_manager"]
34+
contents_manager = self.serverapp.contents_manager
35+
loop = asyncio.get_event_loop_policy().get_event_loop()
36+
log = self.log
37+
38+
# Initialize YRoomManager
39+
self.settings["yroom_manager"] = YRoomManager(
40+
get_fileid_manager=get_fileid_manager,
41+
contents_manager=contents_manager,
42+
loop=loop,
43+
log=log
44+
)
2445

2546

2647
def _link_jupyter_server_extension(self, server_app):

jupyter_rtc_core/kernels/websocket_connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from jupyter_server.services.kernels.connection.base import (
1515
BaseKernelWebsocketConnection,
1616
)
17-
from .states import LIFECYCLE_DEAD_STATES
17+
from .states import LifecycleStates
1818
from jupyter_server.services.kernels.connection.base import deserialize_msg_from_ws_v1, serialize_msg_to_ws_v1
1919
from jupyter_client.session import Session
2020

jupyter_rtc_core/rooms/yroom.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class YRoom:
2323
"""Log object"""
2424
room_id: str
2525
"""Room Id"""
26+
2627
_jupyter_ydoc: YBaseDoc
2728
"""JupyterYDoc"""
2829
_ydoc: pycrdt.Doc
@@ -35,6 +36,10 @@ class YRoom:
3536
"""Client group to manage synced and desynced clients"""
3637
_message_queue: asyncio.Queue[Tuple[str, bytes]]
3738
"""A message queue per room to keep websocket messages in order"""
39+
_awareness_subscription: pycrdt.Subscription
40+
"""Subscription to awareness changes."""
41+
_ydoc_subscription: pycrdt.Subscription
42+
"""Subscription to YDoc changes."""
3843

3944

4045
def __init__(
@@ -74,8 +79,12 @@ def __init__(
7479

7580
# Start observers on `self.ydoc` and `self.awareness` to ensure new
7681
# updates are broadcast to all clients and saved to disk.
77-
self._awareness.observe(self.send_server_awareness)
78-
self._ydoc.observe(lambda event: self.write_sync_update(event.update))
82+
self._awareness_subscription = self._awareness.observe(
83+
self.send_server_awareness
84+
)
85+
self._ydoc_subscription = self._ydoc.observe(
86+
lambda event: self.write_sync_update(event.update)
87+
)
7988

8089
# Initialize message queue and start background task that routes new
8190
# messages in the message queue to the appropriate handler method.
@@ -368,6 +377,13 @@ def send_server_awareness(self, type: str, changes: tuple[dict[str, Any], Any])
368377
self._broadcast_message(message, "AwarenessUpdate")
369378

370379
def stop(self) -> None:
371-
# TODO: requires YRoomLoader implementation
380+
"""
381+
Stop the YRoom.
382+
383+
TODO: stop file API & stop the message processing loop
384+
"""
385+
self._ydoc.unobserve(self._ydoc_subscription)
386+
self._awareness.unobserve(self._awareness_subscription)
387+
372388
return
373389

Lines changed: 70 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,79 @@
1-
"""
2-
WIP.
3-
4-
This file just contains interfaces to be filled out later.
5-
"""
6-
1+
from __future__ import annotations
72
from .yroom import YRoom
3+
from typing import TYPE_CHECKING
4+
5+
if TYPE_CHECKING:
6+
import asyncio
7+
import logging
8+
from typing import Callable
9+
from jupyter_server_fileid.manager import BaseFileIdManager
10+
from jupyter_server.services.contents.manager import AsyncContentsManager, ContentsManager
811

912
class YRoomManager:
1013
_rooms_by_id: dict[str, YRoom]
1114

12-
def __init__(self):
15+
def __init__(
16+
self,
17+
*,
18+
get_fileid_manager: Callable[[], BaseFileIdManager],
19+
contents_manager: AsyncContentsManager | ContentsManager,
20+
loop: asyncio.AbstractEventLoop,
21+
log: logging.Logger,
22+
):
23+
# Bind instance attributes
24+
self._get_fileid_manager = get_fileid_manager
25+
self.contents_manager = contents_manager
26+
self.loop = loop
27+
self.log = log
28+
29+
# Initialize dictionary of YRooms, keyed by room ID
1330
self._rooms_by_id = {}
1431

32+
33+
@property
34+
def fileid_manager(self) -> BaseFileIdManager:
35+
return self._get_fileid_manager()
36+
37+
1538
def get_room(self, room_id: str) -> YRoom | None:
16-
# TODO
17-
return None
39+
"""
40+
Retrieves a YRoom given a room ID. If the YRoom does not exist, this
41+
method will initialize a new YRoom.
42+
"""
43+
44+
# If room exists, then return it immediately
45+
if room_id in self._rooms_by_id:
46+
return self._rooms_by_id[room_id]
47+
48+
# Otherwise, create a new room
49+
try:
50+
yroom = YRoom(
51+
room_id=room_id,
52+
log=self.log,
53+
loop=self.loop,
54+
fileid_manager=self.fileid_manager,
55+
contents_manager=self.contents_manager,
56+
)
57+
self._rooms_by_id[room_id] = yroom
58+
return yroom
59+
except Exception as e:
60+
self.log.error(
61+
f"Unable to initialize YRoom '{room_id}'.",
62+
exc_info=True
63+
)
64+
return None
65+
66+
67+
def delete_room(self, room_id: str) -> None:
68+
"""
69+
Deletes a YRoom given a room ID.
70+
71+
TODO: finish implementing YRoom.stop(), and delete empty rooms w/ no
72+
live kernels automatically in a background task.
73+
"""
74+
yroom = self._rooms_by_id.get(room_id, None)
75+
if not yroom:
76+
return
1877

19-
def delete_room(self, room: YRoom) -> None:
20-
# TODO
21-
return
78+
yroom.stop()
79+
del self._rooms_by_id[room_id]

jupyter_rtc_core/websockets/clients.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import datetime
1010
from logging import Logger
1111
from typing import TYPE_CHECKING
12-
from dataclasses import dataclass
1312
import uuid
1413
import asyncio
1514

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,54 @@
1-
"""
2-
WIP.
3-
4-
This file just contains an interface to be filled out later.
5-
"""
6-
1+
from __future__ import annotations
2+
from tornado.httpclient import HTTPError
73
from tornado.websocket import WebSocketHandler
4+
from typing import TYPE_CHECKING
5+
6+
if TYPE_CHECKING:
7+
from jupyter_server_fileid.manager import BaseFileIdManager
8+
from ..rooms import YRoom, YRoomManager
89

910
class YRoomWebsocket(WebSocketHandler):
11+
yroom: YRoom
12+
room_id: str
13+
client_id: str
14+
15+
@property
16+
def yroom_manager(self) -> YRoomManager:
17+
return self.settings["yroom_manager"]
18+
19+
20+
@property
21+
def fileid_manager(self) -> BaseFileIdManager:
22+
return self.settings["file_id_manager"]
23+
24+
25+
def prepare(self):
26+
# Bind `room_id` attribute
27+
request_path: str = self.request.path
28+
self.room_id = request_path.strip("/").split("/")[-1]
29+
30+
# Verify the file ID contained in the room ID points to a valid file.
31+
fileid = self.room_id.split(":")[-1]
32+
path = self.fileid_manager.get_path(fileid)
33+
if not path:
34+
raise HTTPError(404, f"No file with ID '{fileid}'.")
35+
36+
1037
def open(self):
11-
print("WebSocket opened")
38+
# Create the YRoom
39+
yroom = self.yroom_manager.get_room(self.room_id)
40+
if not yroom:
41+
raise HTTPError(500, f"Unable to initialize YRoom '{self.room_id}'.")
42+
self.yroom = yroom
43+
44+
# Add self as a client to the YRoom
45+
self.client_id = self.yroom.clients.add(self)
46+
47+
48+
def on_message(self, message: bytes):
49+
# Route all messages to the YRoom for processing
50+
self.yroom.add_message(self.client_id, message)
1251

13-
def on_message(self, message):
14-
self.write_message(u"You said: " + message)
1552

1653
def on_close(self):
17-
print("WebSocket closed")
54+
self.yroom.clients.remove(self.client_id)

0 commit comments

Comments
 (0)