Skip to content

Commit f5a3157

Browse files
Move YRoom temporarily (#205)
* Move YRoom temporarily * Automatic application of license header --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 5162fd2 commit f5a3157

File tree

3 files changed

+236
-2
lines changed

3 files changed

+236
-2
lines changed

jupyter_collaboration/handlers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ async def open(self, room_id):
121121
On connection open.
122122
"""
123123
self._room_id = self.request.path.split("/")[-1]
124+
self.log.info("New client connecting to room: %s", self._room_id)
124125

125126
try:
126127
# Get room

jupyter_collaboration/rooms/base.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,8 @@
77
import uuid
88
from logging import Logger
99

10-
from ypy_websocket.websocket_server import YRoom
11-
1210
from ..stores import BaseYStore
11+
from .yroom import YRoom
1312

1413

1514
class BaseRoom(YRoom):

jupyter_collaboration/rooms/yroom.py

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
# Copyright (c) Jupyter Development Team.
2+
# Distributed under the terms of the Modified BSD License.
3+
4+
from __future__ import annotations
5+
6+
from contextlib import AsyncExitStack
7+
from functools import partial
8+
from inspect import isawaitable
9+
from logging import Logger, getLogger
10+
from typing import Awaitable, Callable
11+
12+
import y_py as Y
13+
from anyio import (
14+
TASK_STATUS_IGNORED,
15+
Event,
16+
create_memory_object_stream,
17+
create_task_group,
18+
)
19+
from anyio.abc import TaskGroup, TaskStatus
20+
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
21+
from ypy_websocket.awareness import Awareness
22+
from ypy_websocket.websocket import Websocket
23+
from ypy_websocket.yutils import (
24+
YMessageType,
25+
create_update_message,
26+
process_sync_message,
27+
put_updates,
28+
sync,
29+
)
30+
31+
from ..stores import BaseYStore
32+
33+
34+
class YRoom:
35+
36+
clients: list
37+
ydoc: Y.YDoc
38+
ystore: BaseYStore | None
39+
_on_message: Callable[[bytes], Awaitable[bool] | bool] | None
40+
_update_send_stream: MemoryObjectSendStream
41+
_update_receive_stream: MemoryObjectReceiveStream
42+
_ready: bool
43+
_task_group: TaskGroup | None
44+
_started: Event | None
45+
_starting: bool
46+
47+
def __init__(
48+
self, ready: bool = True, ystore: BaseYStore | None = None, log: Logger | None = None
49+
):
50+
"""Initialize the object.
51+
52+
The YRoom instance should preferably be used as an async context manager:
53+
```py
54+
async with room:
55+
...
56+
```
57+
However, a lower-level API can also be used:
58+
```py
59+
task = asyncio.create_task(room.start())
60+
await room.started.wait()
61+
...
62+
room.stop()
63+
```
64+
65+
Arguments:
66+
ready: Whether the internal YDoc is ready to be synchronized right away.
67+
ystore: An optional store in which to persist document updates.
68+
log: An optional logger.
69+
"""
70+
self.ydoc = Y.YDoc()
71+
self.awareness = Awareness(self.ydoc)
72+
self._update_send_stream, self._update_receive_stream = create_memory_object_stream(
73+
max_buffer_size=65536
74+
)
75+
self._ready = False
76+
self.ready = ready
77+
self.ystore = ystore
78+
self.log = log or getLogger(__name__)
79+
self.clients = []
80+
self._on_message = None
81+
self._started = None
82+
self._starting = False
83+
self._task_group = None
84+
85+
@property
86+
def started(self):
87+
"""An async event that is set when the YRoom provider has started."""
88+
if self._started is None:
89+
self._started = Event()
90+
return self._started
91+
92+
@property
93+
def ready(self) -> bool:
94+
"""
95+
Returns:
96+
True is the internal YDoc is ready to be synchronized.
97+
"""
98+
return self._ready
99+
100+
@ready.setter
101+
def ready(self, value: bool) -> None:
102+
"""
103+
Arguments:
104+
value: True if the internal YDoc is ready to be synchronized, False otherwise."""
105+
self._ready = value
106+
if value:
107+
self.ydoc.observe_after_transaction(partial(put_updates, self._update_send_stream))
108+
109+
@property
110+
def on_message(self) -> Callable[[bytes], Awaitable[bool] | bool] | None:
111+
"""
112+
Returns:
113+
The optional callback to call when a message is received.
114+
"""
115+
return self._on_message
116+
117+
@on_message.setter
118+
def on_message(self, value: Callable[[bytes], Awaitable[bool] | bool] | None):
119+
"""
120+
Arguments:
121+
value: An optional callback to call when a message is received. If the callback returns True, the message is skipped.
122+
"""
123+
self._on_message = value
124+
125+
async def _broadcast_updates(self):
126+
async with self._update_receive_stream:
127+
async for update in self._update_receive_stream:
128+
if self._task_group.cancel_scope.cancel_called:
129+
return
130+
# broadcast internal ydoc's update to all clients, that includes changes from the
131+
# clients and changes from the backend (out-of-band changes)
132+
for client in self.clients:
133+
self.log.debug("Sending Y update to client with endpoint: %s", client.path)
134+
message = create_update_message(update)
135+
self._task_group.start_soon(client.send, message)
136+
if self.ystore:
137+
self.log.debug("Writing Y update to YStore")
138+
self._task_group.start_soon(self.ystore.write, client.path, update)
139+
140+
async def __aenter__(self) -> YRoom:
141+
if self._task_group is not None:
142+
raise RuntimeError("YRoom already running")
143+
144+
async with AsyncExitStack() as exit_stack:
145+
tg = create_task_group()
146+
self._task_group = await exit_stack.enter_async_context(tg)
147+
self._exit_stack = exit_stack.pop_all()
148+
tg.start_soon(self._broadcast_updates)
149+
self.started.set()
150+
151+
return self
152+
153+
async def __aexit__(self, exc_type, exc_value, exc_tb):
154+
if self._task_group is None:
155+
raise RuntimeError("YRoom not running")
156+
157+
self._task_group.cancel_scope.cancel()
158+
self._task_group = None
159+
return await self._exit_stack.__aexit__(exc_type, exc_value, exc_tb)
160+
161+
async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
162+
"""Start the room.
163+
164+
Arguments:
165+
task_status: The status to set when the task has started.
166+
"""
167+
if self._starting:
168+
return
169+
else:
170+
self._starting = True
171+
172+
if self._task_group is not None:
173+
raise RuntimeError("YRoom already running")
174+
175+
async with create_task_group() as self._task_group:
176+
self._task_group.start_soon(self._broadcast_updates)
177+
self.started.set()
178+
self._starting = False
179+
task_status.started()
180+
181+
def stop(self):
182+
"""Stop the room."""
183+
if self._task_group is None:
184+
raise RuntimeError("YRoom not running")
185+
186+
self._task_group.cancel_scope.cancel()
187+
self._task_group = None
188+
189+
async def serve(self, websocket: Websocket):
190+
"""Serve a client.
191+
192+
Arguments:
193+
websocket: The WebSocket through which to serve the client.
194+
"""
195+
async with create_task_group() as tg:
196+
self.clients.append(websocket)
197+
await sync(self.ydoc, websocket, self.log)
198+
try:
199+
async for message in websocket:
200+
# filter messages (e.g. awareness)
201+
skip = False
202+
if self.on_message:
203+
_skip = self.on_message(message)
204+
skip = await _skip if isawaitable(_skip) else _skip
205+
if skip:
206+
continue
207+
message_type = message[0]
208+
if message_type == YMessageType.SYNC:
209+
# update our internal state in the background
210+
# changes to the internal state are then forwarded to all clients
211+
# and stored in the YStore (if any)
212+
tg.start_soon(
213+
process_sync_message, message[1:], self.ydoc, websocket, self.log
214+
)
215+
elif message_type == YMessageType.AWARENESS:
216+
# forward awareness messages from this client to all clients,
217+
# including itself, because it's used to keep the connection alive
218+
self.log.debug(
219+
"Received %s message from endpoint: %s",
220+
YMessageType.AWARENESS.name,
221+
websocket.path,
222+
)
223+
for client in self.clients:
224+
self.log.debug(
225+
"Sending Y awareness from client with endpoint %s to client with endpoint: %s",
226+
websocket.path,
227+
client.path,
228+
)
229+
tg.start_soon(client.send, message)
230+
except Exception as e:
231+
self.log.debug("Error serving endpoint: %s", websocket.path, exc_info=e)
232+
233+
# remove this client
234+
self.clients = [c for c in self.clients if c != websocket]

0 commit comments

Comments
 (0)