Skip to content

Commit 01ee5df

Browse files
Jialin Zhangjzhang20133
authored andcommitted
handle exception when websocket server start room failed
1 parent 926c4d0 commit 01ee5df

File tree

4 files changed

+125
-8
lines changed

4 files changed

+125
-8
lines changed

projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from jupyter_server.auth import authorized
1313
from jupyter_server.base.handlers import APIHandler, JupyterHandler
14+
from jupyter_server.utils import ensure_async
1415
from jupyter_ydoc import ydocs as YDOCS
1516
from pycrdt_websocket.websocket_server import YRoom
1617
from pycrdt_websocket.ystore import BaseYStore
@@ -70,6 +71,7 @@ def create_task(self, aw):
7071
task.add_done_callback(self._background_tasks.discard)
7172

7273
async def prepare(self):
74+
await ensure_async(super().prepare())
7375
if not self._websocket_server.started.is_set():
7476
self.create_task(self._websocket_server.start())
7577
await self._websocket_server.started.wait()
@@ -110,12 +112,26 @@ async def prepare(self):
110112
# it is a transient document (e.g. awareness)
111113
self.room = TransientRoom(self._room_id, self.log)
112114

113-
await self._websocket_server.start_room(self.room)
114-
self._websocket_server.add_room(self._room_id, self.room)
115+
try:
116+
await self._websocket_server.start_room(self.room)
117+
except Exception as e:
118+
self.log.error("Room %s failed to start on websocket server", self._room_id)
119+
# Clean room
120+
await self.room.stop()
121+
self.log.info("Room %s deleted", self._room_id)
122+
self._emit(LogLevel.INFO, "clean", "Room deleted.")
115123

116-
res = super().prepare()
117-
if res is not None:
118-
return await res
124+
# Clean the file loader in file loader mapping if there are not any rooms using it
125+
_, _, file_id = decode_file_path(self._room_id)
126+
file = self._file_loaders[file_id]
127+
if file.number_of_subscriptions == 0 or (
128+
file.number_of_subscriptions == 1 and self._room_id in file._subscriptions
129+
):
130+
self.log.info("Deleting file %s", file.path)
131+
await self._file_loaders.remove(file_id)
132+
self._emit(LogLevel.INFO, "clean", "file loader removed.")
133+
raise e
134+
self._websocket_server.add_room(self._room_id, self.room)
119135

120136
def initialize(
121137
self,
@@ -134,6 +150,8 @@ def initialize(
134150
self._document_save_delay = document_save_delay
135151
self._websocket_server = ywebsocket_server
136152
self._message_queue = asyncio.Queue()
153+
self._room_id = ""
154+
self.room = None
137155

138156
@property
139157
def path(self):
@@ -227,7 +245,7 @@ async def send(self, message):
227245
try:
228246
self.write_message(message, binary=True)
229247
except Exception as e:
230-
self.log.debug("Failed to write message", exc_info=e)
248+
self.log.error("Failed to write message", exc_info=e)
231249

232250
async def recv(self):
233251
"""

projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,10 @@ async def clean(self) -> None:
7474
if self._watcher is not None:
7575
if not self._watcher.cancelled():
7676
self._watcher.cancel()
77-
await self._watcher
77+
try:
78+
await self._watcher
79+
except asyncio.CancelledError:
80+
self._log.info(f"file watcher for '{self.file_id}' is cancelled now")
7881

7982
def observe(self, id: str, callback: Callable[[], Coroutine[Any, Any, None]]) -> None:
8083
"""

projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,10 @@ async def stop(self) -> None:
161161
162162
Cancels the save task and unsubscribes from the file.
163163
"""
164-
await super().stop()
164+
try:
165+
await super().stop()
166+
except RuntimeError:
167+
pass
165168
# TODO: Should we cancel or wait ?
166169
if self._saving_document:
167170
self._saving_document.cancel()
@@ -299,3 +302,12 @@ async def _broadcast_updates(self):
299302
await super()._broadcast_updates()
300303
except asyncio.CancelledError:
301304
pass
305+
306+
async def stop(self) -> None:
307+
"""
308+
Stop the room.
309+
"""
310+
try:
311+
await super().stop()
312+
except RuntimeError:
313+
pass

tests/test_handlers.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,87 @@ async def my_listener(logger: EventLogger, schema_id: str, data: dict) -> None:
131131
assert collected_data[1]["action"] == "leave"
132132
assert collected_data[1]["roomid"] == "text:file:" + fim.get_id("test.txt")
133133
assert collected_data[1]["username"] is not None
134+
135+
136+
async def test_room_handler_doc_client_should_cleanup_room_file(
137+
rtc_create_file, rtc_connect_doc_client, jp_serverapp
138+
):
139+
path, _ = await rtc_create_file("test.txt", "test")
140+
141+
event = Event()
142+
143+
def _on_document_change(target: str, e: Any) -> None:
144+
if target == "source":
145+
event.set()
146+
147+
doc = YUnicode()
148+
doc.observe(_on_document_change)
149+
150+
async with await rtc_connect_doc_client("text", "file", path) as ws, WebsocketProvider(
151+
doc.ydoc, ws
152+
):
153+
await event.wait()
154+
await sleep(0.1)
155+
156+
# kill websocketserver to mimic task group inactive failure
157+
await jp_serverapp.web_app.settings["jupyter_server_ydoc"].ywebsocket_server.stop()
158+
159+
listener_was_called = False
160+
collected_data = []
161+
162+
async def my_listener(logger: EventLogger, schema_id: str, data: dict) -> None:
163+
nonlocal listener_was_called
164+
collected_data.append(data)
165+
listener_was_called = True
166+
167+
event_logger = jp_serverapp.event_logger
168+
event_logger.add_listener(
169+
schema_id="https://schema.jupyter.org/jupyter_collaboration/session/v1",
170+
listener=my_listener,
171+
)
172+
173+
path2, _ = await rtc_create_file("test2.txt", "test2")
174+
175+
try:
176+
async with await rtc_connect_doc_client("text2", "file2", path2) as ws, WebsocketProvider(
177+
doc.ydoc, ws
178+
):
179+
await event.wait()
180+
await sleep(0.1)
181+
except Exception:
182+
pass
183+
184+
try:
185+
async with await rtc_connect_doc_client("text2", "file2", path2) as ws, WebsocketProvider(
186+
doc.ydoc, ws
187+
):
188+
await event.wait()
189+
await sleep(0.1)
190+
except Exception:
191+
pass
192+
193+
fim = jp_serverapp.web_app.settings["file_id_manager"]
194+
195+
assert listener_was_called is True
196+
assert len(collected_data) == 4
197+
# no two collaboration events are emitted.
198+
# [{'level': 'WARNING', 'msg': 'There is another collaborative session accessing the same file.\nThe synchronization bet...ou might lose some of your changes.', 'path': 'test2.txt', 'room': 'text2:file2:51b7e24f-f534-47fb-882f-5cc45ba867fe'}]
199+
assert collected_data[0]["path"] == "test2.txt"
200+
assert collected_data[0]["room"] == "text2:file2:" + fim.get_id("test2.txt")
201+
assert collected_data[0]["action"] == "clean"
202+
assert collected_data[0]["msg"] == "Room deleted."
203+
assert collected_data[1]["path"] == "test2.txt"
204+
assert collected_data[1]["room"] == "text2:file2:" + fim.get_id("test2.txt")
205+
assert collected_data[1]["action"] == "clean"
206+
assert collected_data[1]["msg"] == "file loader removed."
207+
assert collected_data[2]["path"] == "test2.txt"
208+
assert collected_data[2]["room"] == "text2:file2:" + fim.get_id("test2.txt")
209+
assert collected_data[2]["action"] == "clean"
210+
assert collected_data[2]["msg"] == "Room deleted."
211+
assert collected_data[3]["path"] == "test2.txt"
212+
assert collected_data[3]["room"] == "text2:file2:" + fim.get_id("test2.txt")
213+
assert collected_data[3]["action"] == "clean"
214+
assert collected_data[3]["msg"] == "file loader removed."
215+
216+
await jp_serverapp.web_app.settings["jupyter_server_ydoc"].stop_extension()
217+
del jp_serverapp.web_app.settings["file_id_manager"]

0 commit comments

Comments
 (0)