Skip to content

Commit 2851ddf

Browse files
jzhang20133Jialin Zhang
andauthored
Backport 'handle exception when websocket server start room failed' #289 (#298)
Co-authored-by: Jialin Zhang <[email protected]>
1 parent f284083 commit 2851ddf

File tree

5 files changed

+132
-9
lines changed

5 files changed

+132
-9
lines changed

jupyter_collaboration/handlers.py

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from jupyter_server.auth import authorized
1313
from jupyter_server.base.handlers import APIHandler, JupyterHandler
14+
from jupyter_server.utils import ensure_async
1415
from jupyter_ydoc import ydocs as YDOCS
1516
from pycrdt_websocket.websocket_server import YRoom
1617
from pycrdt_websocket.ystore import BaseYStore
@@ -26,6 +27,7 @@
2627
LogLevel,
2728
MessageType,
2829
decode_file_path,
30+
room_id_from_encoded_path,
2931
)
3032
from .websocketserver import JupyterWebsocketServer
3133

@@ -69,12 +71,13 @@ def create_task(self, aw):
6971
task.add_done_callback(self._background_tasks.discard)
7072

7173
async def prepare(self):
74+
await ensure_async(super().prepare())
7275
if not self._websocket_server.started.is_set():
7376
self.create_task(self._websocket_server.start())
7477
await self._websocket_server.started.wait()
7578

7679
# Get room
77-
self._room_id: str = self.request.path.split("/")[-1]
80+
self._room_id: str = room_id_from_encoded_path(self.request.path)
7881

7982
async with self._room_lock(self._room_id):
8083
if self._websocket_server.room_exists(self._room_id):
@@ -109,12 +112,26 @@ async def prepare(self):
109112
# it is a transient document (e.g. awareness)
110113
self.room = TransientRoom(self._room_id, self.log)
111114

112-
await self._websocket_server.start_room(self.room)
113-
self._websocket_server.add_room(self._room_id, self.room)
115+
try:
116+
await self._websocket_server.start_room(self.room)
117+
except Exception as e:
118+
self.log.error("Room %s failed to start on websocket server", self._room_id)
119+
# Clean room
120+
await self.room.stop()
121+
self.log.info("Room %s deleted", self._room_id)
122+
self._emit(LogLevel.INFO, "clean", "Room deleted.")
114123

115-
res = super().prepare()
116-
if res is not None:
117-
return await res
124+
# Clean the file loader in file loader mapping if there are not any rooms using it
125+
_, _, file_id = decode_file_path(self._room_id)
126+
file = self._file_loaders[file_id]
127+
if file.number_of_subscriptions == 0 or (
128+
file.number_of_subscriptions == 1 and self._room_id in file._subscriptions
129+
):
130+
self.log.info("Deleting file %s", file.path)
131+
await self._file_loaders.remove(file_id)
132+
self._emit(LogLevel.INFO, "clean", "file loader removed.")
133+
raise e
134+
self._websocket_server.add_room(self._room_id, self.room)
118135

119136
def initialize(
120137
self,
@@ -133,6 +150,8 @@ def initialize(
133150
self._document_save_delay = document_save_delay
134151
self._websocket_server = ywebsocket_server
135152
self._message_queue = asyncio.Queue()
153+
self._room_id = ""
154+
self.room = None
136155

137156
@property
138157
def path(self):
@@ -226,7 +245,7 @@ async def send(self, message):
226245
try:
227246
self.write_message(message, binary=True)
228247
except Exception as e:
229-
self.log.debug("Failed to write message", exc_info=e)
248+
self.log.error("Failed to write message", exc_info=e)
230249

231250
async def recv(self):
232251
"""

jupyter_collaboration/loaders.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,10 @@ async def clean(self) -> None:
7474
if self._watcher is not None:
7575
if not self._watcher.cancelled():
7676
self._watcher.cancel()
77-
await self._watcher
77+
try:
78+
await self._watcher
79+
except asyncio.CancelledError:
80+
self._log.info(f"file watcher for '{self.file_id}' is cancelled now")
7881

7982
def observe(self, id: str, callback: Callable[[], Coroutine[Any, Any, None]]) -> None:
8083
"""

jupyter_collaboration/rooms.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,10 @@ async def stop(self) -> None:
161161
162162
Cancels the save task and unsubscribes from the file.
163163
"""
164-
await super().stop()
164+
try:
165+
await super().stop()
166+
except RuntimeError:
167+
pass
165168
# TODO: Should we cancel or wait ?
166169
if self._saving_document:
167170
self._saving_document.cancel()
@@ -299,3 +302,12 @@ async def _broadcast_updates(self):
299302
await super()._broadcast_updates()
300303
except asyncio.CancelledError:
301304
pass
305+
306+
async def stop(self) -> None:
307+
"""
308+
Stop the room.
309+
"""
310+
try:
311+
await super().stop()
312+
except RuntimeError:
313+
pass

jupyter_collaboration/utils.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,8 @@ def encode_file_path(format: str, file_type: str, file_id: str) -> str:
7070
path (str): File path.
7171
"""
7272
return f"{format}:{file_type}:{file_id}"
73+
74+
75+
def room_id_from_encoded_path(encoded_path: str) -> str:
76+
"""Transforms the encoded path into a stable room identifier."""
77+
return encoded_path.split("/")[-1]

tests/test_handlers.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,87 @@ async def my_listener(logger: EventLogger, schema_id: str, data: dict) -> None:
131131
assert collected_data[1]["action"] == "leave"
132132
assert collected_data[1]["roomid"] == "text:file:" + fim.get_id("test.txt")
133133
assert collected_data[1]["username"] is not None
134+
135+
136+
async def test_room_handler_doc_client_should_cleanup_room_file(
137+
rtc_create_file, rtc_connect_doc_client, jp_serverapp
138+
):
139+
path, _ = await rtc_create_file("test.txt", "test")
140+
141+
event = Event()
142+
143+
def _on_document_change(target: str, e: Any) -> None:
144+
if target == "source":
145+
event.set()
146+
147+
doc = YUnicode()
148+
doc.observe(_on_document_change)
149+
150+
async with await rtc_connect_doc_client("text", "file", path) as ws, WebsocketProvider(
151+
doc.ydoc, ws
152+
):
153+
await event.wait()
154+
await sleep(0.1)
155+
156+
# kill websocketserver to mimic task group inactive failure
157+
await jp_serverapp.web_app.settings["jupyter_collaboration"].ywebsocket_server.stop()
158+
159+
listener_was_called = False
160+
collected_data = []
161+
162+
async def my_listener(logger: EventLogger, schema_id: str, data: dict) -> None:
163+
nonlocal listener_was_called
164+
collected_data.append(data)
165+
listener_was_called = True
166+
167+
event_logger = jp_serverapp.event_logger
168+
event_logger.add_listener(
169+
schema_id="https://schema.jupyter.org/jupyter_collaboration/session/v1",
170+
listener=my_listener,
171+
)
172+
173+
path2, _ = await rtc_create_file("test2.txt", "test2")
174+
175+
try:
176+
async with await rtc_connect_doc_client("text2", "file2", path2) as ws, WebsocketProvider(
177+
doc.ydoc, ws
178+
):
179+
await event.wait()
180+
await sleep(0.1)
181+
except Exception:
182+
pass
183+
184+
try:
185+
async with await rtc_connect_doc_client("text2", "file2", path2) as ws, WebsocketProvider(
186+
doc.ydoc, ws
187+
):
188+
await event.wait()
189+
await sleep(0.1)
190+
except Exception:
191+
pass
192+
193+
fim = jp_serverapp.web_app.settings["file_id_manager"]
194+
195+
assert listener_was_called is True
196+
assert len(collected_data) == 4
197+
# no two collaboration events are emitted.
198+
# [{'level': 'WARNING', 'msg': 'There is another collaborative session accessing the same file.\nThe synchronization bet...ou might lose some of your changes.', 'path': 'test2.txt', 'room': 'text2:file2:51b7e24f-f534-47fb-882f-5cc45ba867fe'}]
199+
assert collected_data[0]["path"] == "test2.txt"
200+
assert collected_data[0]["room"] == "text2:file2:" + fim.get_id("test2.txt")
201+
assert collected_data[0]["action"] == "clean"
202+
assert collected_data[0]["msg"] == "Room deleted."
203+
assert collected_data[1]["path"] == "test2.txt"
204+
assert collected_data[1]["room"] == "text2:file2:" + fim.get_id("test2.txt")
205+
assert collected_data[1]["action"] == "clean"
206+
assert collected_data[1]["msg"] == "file loader removed."
207+
assert collected_data[2]["path"] == "test2.txt"
208+
assert collected_data[2]["room"] == "text2:file2:" + fim.get_id("test2.txt")
209+
assert collected_data[2]["action"] == "clean"
210+
assert collected_data[2]["msg"] == "Room deleted."
211+
assert collected_data[3]["path"] == "test2.txt"
212+
assert collected_data[3]["room"] == "text2:file2:" + fim.get_id("test2.txt")
213+
assert collected_data[3]["action"] == "clean"
214+
assert collected_data[3]["msg"] == "file loader removed."
215+
216+
await jp_serverapp.web_app.settings["jupyter_collaboration"].stop_extension()
217+
del jp_serverapp.web_app.settings["file_id_manager"]

0 commit comments

Comments
 (0)