Skip to content

Commit 3d89ea7

Browse files
Jialin Zhangandrii-i
authored andcommitted
handle exception when websocket server start room failed
1 parent 0ab44fd commit 3d89ea7

File tree

4 files changed

+125
-8
lines changed

4 files changed

+125
-8
lines changed

projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from jupyter_server.auth import authorized
1212
from jupyter_server.base.handlers import APIHandler, JupyterHandler
13+
from jupyter_server.utils import ensure_async
1314
from jupyter_ydoc import ydocs as YDOCS
1415
from pycrdt_websocket.websocket_server import YRoom
1516
from pycrdt_websocket.ystore import BaseYStore
@@ -71,6 +72,7 @@ def create_task(self, aw):
7172
task.add_done_callback(self._background_tasks.discard)
7273

7374
async def prepare(self):
75+
await ensure_async(super().prepare())
7476
if not self._websocket_server.started.is_set():
7577
self.create_task(self._websocket_server.start())
7678
await self._websocket_server.started.wait()
@@ -111,12 +113,26 @@ async def prepare(self):
111113
# it is a transient document (e.g. awareness)
112114
self.room = TransientRoom(self._room_id, self.log)
113115

114-
await self._websocket_server.start_room(self.room)
115-
self._websocket_server.add_room(self._room_id, self.room)
116+
try:
117+
await self._websocket_server.start_room(self.room)
118+
except Exception as e:
119+
self.log.error("Room %s failed to start on websocket server", self._room_id)
120+
# Clean room
121+
await self.room.stop()
122+
self.log.info("Room %s deleted", self._room_id)
123+
self._emit(LogLevel.INFO, "clean", "Room deleted.")
116124

117-
res = super().prepare()
118-
if res is not None:
119-
return await res
125+
# Clean the file loader in file loader mapping if there are not any rooms using it
126+
_, _, file_id = decode_file_path(self._room_id)
127+
file = self._file_loaders[file_id]
128+
if file.number_of_subscriptions == 0 or (
129+
file.number_of_subscriptions == 1 and self._room_id in file._subscriptions
130+
):
131+
self.log.info("Deleting file %s", file.path)
132+
await self._file_loaders.remove(file_id)
133+
self._emit(LogLevel.INFO, "clean", "file loader removed.")
134+
raise e
135+
self._websocket_server.add_room(self._room_id, self.room)
120136

121137
def initialize(
122138
self,
@@ -134,6 +150,8 @@ def initialize(
134150

135151
self._serve_task: asyncio.Task | None = None
136152
self._message_queue = asyncio.Queue()
153+
self._room_id = ""
154+
self.room = None
137155

138156
async def prepare(self):
139157
# NOTE: Initialize in the ExtensionApp.start_extension once
@@ -227,7 +245,7 @@ async def send(self, message):
227245
try:
228246
self.write_message(message, binary=True)
229247
except Exception as e:
230-
self.log.debug("Failed to write message", exc_info=e)
248+
self.log.error("Failed to write message", exc_info=e)
231249

232250
async def recv(self):
233251
"""

projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,10 @@ async def clean(self) -> None:
7474
if self._watcher is not None:
7575
if not self._watcher.cancelled():
7676
self._watcher.cancel()
77-
await self._watcher
77+
try:
78+
await self._watcher
79+
except asyncio.CancelledError:
80+
self._log.info(f"file watcher for '{self.file_id}' is cancelled now")
7881

7982
def observe(self, id: str, callback: Callable[[], Coroutine[Any, Any, None]]) -> None:
8083
"""

projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,10 @@ async def stop(self) -> None:
139139
140140
Cancels the save task and unsubscribes from the file.
141141
"""
142-
await super().stop()
142+
try:
143+
await super().stop()
144+
except RuntimeError:
145+
pass
143146
# TODO: Should we cancel or wait ?
144147
if self._saving_document:
145148
self._saving_document.cancel()
@@ -281,3 +284,12 @@ async def _broadcast_updates(self):
281284
await super()._broadcast_updates()
282285
except asyncio.CancelledError:
283286
pass
287+
288+
async def stop(self) -> None:
289+
"""
290+
Stop the room.
291+
"""
292+
try:
293+
await super().stop()
294+
except RuntimeError:
295+
pass

tests/test_handlers.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,87 @@ async def my_listener(logger: EventLogger, schema_id: str, data: dict) -> None:
131131
assert collected_data[1]["action"] == "leave"
132132
assert collected_data[1]["roomid"] == "text:file:" + fim.get_id("test.txt")
133133
assert collected_data[1]["username"] is not None
134+
135+
136+
async def test_room_handler_doc_client_should_cleanup_room_file(
137+
rtc_create_file, rtc_connect_doc_client, jp_serverapp
138+
):
139+
path, _ = await rtc_create_file("test.txt", "test")
140+
141+
event = Event()
142+
143+
def _on_document_change(target: str, e: Any) -> None:
144+
if target == "source":
145+
event.set()
146+
147+
doc = YUnicode()
148+
doc.observe(_on_document_change)
149+
150+
async with await rtc_connect_doc_client("text", "file", path) as ws, WebsocketProvider(
151+
doc.ydoc, ws
152+
):
153+
await event.wait()
154+
await sleep(0.1)
155+
156+
# kill websocketserver to mimic task group inactive failure
157+
await jp_serverapp.web_app.settings["jupyter_server_ydoc"].ywebsocket_server.stop()
158+
159+
listener_was_called = False
160+
collected_data = []
161+
162+
async def my_listener(logger: EventLogger, schema_id: str, data: dict) -> None:
163+
nonlocal listener_was_called
164+
collected_data.append(data)
165+
listener_was_called = True
166+
167+
event_logger = jp_serverapp.event_logger
168+
event_logger.add_listener(
169+
schema_id="https://schema.jupyter.org/jupyter_collaboration/session/v1",
170+
listener=my_listener,
171+
)
172+
173+
path2, _ = await rtc_create_file("test2.txt", "test2")
174+
175+
try:
176+
async with await rtc_connect_doc_client("text2", "file2", path2) as ws, WebsocketProvider(
177+
doc.ydoc, ws
178+
):
179+
await event.wait()
180+
await sleep(0.1)
181+
except Exception:
182+
pass
183+
184+
try:
185+
async with await rtc_connect_doc_client("text2", "file2", path2) as ws, WebsocketProvider(
186+
doc.ydoc, ws
187+
):
188+
await event.wait()
189+
await sleep(0.1)
190+
except Exception:
191+
pass
192+
193+
fim = jp_serverapp.web_app.settings["file_id_manager"]
194+
195+
assert listener_was_called is True
196+
assert len(collected_data) == 4
197+
# no two collaboration events are emitted.
198+
# [{'level': 'WARNING', 'msg': 'There is another collaborative session accessing the same file.\nThe synchronization bet...ou might lose some of your changes.', 'path': 'test2.txt', 'room': 'text2:file2:51b7e24f-f534-47fb-882f-5cc45ba867fe'}]
199+
assert collected_data[0]["path"] == "test2.txt"
200+
assert collected_data[0]["room"] == "text2:file2:" + fim.get_id("test2.txt")
201+
assert collected_data[0]["action"] == "clean"
202+
assert collected_data[0]["msg"] == "Room deleted."
203+
assert collected_data[1]["path"] == "test2.txt"
204+
assert collected_data[1]["room"] == "text2:file2:" + fim.get_id("test2.txt")
205+
assert collected_data[1]["action"] == "clean"
206+
assert collected_data[1]["msg"] == "file loader removed."
207+
assert collected_data[2]["path"] == "test2.txt"
208+
assert collected_data[2]["room"] == "text2:file2:" + fim.get_id("test2.txt")
209+
assert collected_data[2]["action"] == "clean"
210+
assert collected_data[2]["msg"] == "Room deleted."
211+
assert collected_data[3]["path"] == "test2.txt"
212+
assert collected_data[3]["room"] == "text2:file2:" + fim.get_id("test2.txt")
213+
assert collected_data[3]["action"] == "clean"
214+
assert collected_data[3]["msg"] == "file loader removed."
215+
216+
await jp_serverapp.web_app.settings["jupyter_server_ydoc"].stop_extension()
217+
del jp_serverapp.web_app.settings["file_id_manager"]

0 commit comments

Comments
 (0)