@@ -105,6 +105,26 @@ async def prepare(self):
105
105
self ._document_save_delay ,
106
106
)
107
107
108
+ def exception_logger (exception : Exception , log ) -> bool :
109
+ """A function that catches any exceptions raised in the websocket
110
+ server and logs them.
111
+
112
+ The protects the websocket server's task group from cancelling
113
+ anytime an exception is raised.
114
+ """
115
+ room_id = "unknown"
116
+ if self .room .room_id :
117
+ room_id = self .room .room_id
118
+ log .error (
119
+ f"Document Room Exception, (room_id={ room_id } : " ,
120
+ exc_info = exception ,
121
+ )
122
+ return True
123
+
124
+ # Logging exceptions, instead of raising them here to ensure
125
+ # that the y-rooms stay alive even after an exception is seen.
126
+ self .room .exception_handler = exception_logger
127
+
108
128
else :
109
129
# TransientRoom
110
130
# it is a transient document (e.g. awareness)
@@ -203,9 +223,12 @@ async def open(self, room_id):
203
223
self .log .error (f"File { file .path } not found.\n { e !r} " , exc_info = e )
204
224
self .close (1004 , f"File { file .path } not found." )
205
225
else :
206
- self .log .error (f"Error initializing: { file .path } \n { e !r} " , exc_info = e )
226
+ self .log .error (
227
+ f"Error initializing: { file .path } \n { e !r} " , exc_info = e
228
+ )
207
229
self .close (
208
- 1003 , f"Error initializing: { file .path } . You need to close the document."
230
+ 1003 ,
231
+ f"Error initializing: { file .path } . You need to close the document." ,
209
232
)
210
233
211
234
# Clean up the room and delete the file loader
@@ -272,16 +295,24 @@ async def on_message(self, message):
272
295
273
296
user = self .current_user
274
297
data = json .dumps (
275
- {"sender" : user .username , "timestamp" : time .time (), "content" : json .loads (msg )}
298
+ {
299
+ "sender" : user .username ,
300
+ "timestamp" : time .time (),
301
+ "content" : json .loads (msg ),
302
+ }
276
303
).encode ("utf8" )
277
304
278
305
for client in self .room .clients :
279
306
if client != self :
280
307
task = asyncio .create_task (
281
- client .send (bytes ([MessageType .CHAT ]) + write_var_uint (len (data )) + data )
308
+ client .send (
309
+ bytes ([MessageType .CHAT ]) + write_var_uint (len (data )) + data
310
+ )
282
311
)
283
312
self ._websocket_server .background_tasks .add (task )
284
- task .add_done_callback (self ._websocket_server .background_tasks .discard )
313
+ task .add_done_callback (
314
+ self ._websocket_server .background_tasks .discard
315
+ )
285
316
286
317
self ._message_queue .put_nowait (message )
287
318
self ._websocket_server .ypatch_nb += 1
@@ -300,7 +331,9 @@ def on_close(self) -> None:
300
331
if self ._room_id != "JupyterLab:globalAwareness" :
301
332
self ._emit_awareness_event (self .current_user .username , "leave" )
302
333
303
- def _emit (self , level : LogLevel , action : str | None = None , msg : str | None = None ) -> None :
334
+ def _emit (
335
+ self , level : LogLevel , action : str | None = None , msg : str | None = None
336
+ ) -> None :
304
337
_ , _ , file_id = decode_file_path (self ._room_id )
305
338
path = self ._file_id_manager .get_path (file_id )
306
339
@@ -312,12 +345,16 @@ def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = No
312
345
313
346
self .event_logger .emit (schema_id = JUPYTER_COLLABORATION_EVENTS_URI , data = data )
314
347
315
- def _emit_awareness_event (self , username : str , action : str , msg : str | None = None ) -> None :
348
+ def _emit_awareness_event (
349
+ self , username : str , action : str , msg : str | None = None
350
+ ) -> None :
316
351
data = {"roomid" : self ._room_id , "username" : username , "action" : action }
317
352
if msg :
318
353
data ["msg" ] = msg
319
354
320
- self .event_logger .emit (schema_id = JUPYTER_COLLABORATION_AWARENESS_EVENTS_URI , data = data )
355
+ self .event_logger .emit (
356
+ schema_id = JUPYTER_COLLABORATION_AWARENESS_EVENTS_URI , data = data
357
+ )
321
358
322
359
async def _clean_room (self ) -> None :
323
360
"""
@@ -387,7 +424,12 @@ async def put(self, path):
387
424
# index already exists
388
425
self .log .info ("Request for Y document '%s' with room ID: %s" , path , idx )
389
426
data = json .dumps (
390
- {"format" : format , "type" : content_type , "fileId" : idx , "sessionId" : SERVER_SESSION }
427
+ {
428
+ "format" : format ,
429
+ "type" : content_type ,
430
+ "fileId" : idx ,
431
+ "sessionId" : SERVER_SESSION ,
432
+ }
391
433
)
392
434
self .set_status (200 )
393
435
return self .finish (data )
@@ -401,7 +443,12 @@ async def put(self, path):
401
443
# index successfully created
402
444
self .log .info ("Request for Y document '%s' with room ID: %s" , path , idx )
403
445
data = json .dumps (
404
- {"format" : format , "type" : content_type , "fileId" : idx , "sessionId" : SERVER_SESSION }
446
+ {
447
+ "format" : format ,
448
+ "type" : content_type ,
449
+ "fileId" : idx ,
450
+ "sessionId" : SERVER_SESSION ,
451
+ }
405
452
)
406
453
self .set_status (201 )
407
454
return self .finish (data )
0 commit comments