@@ -56,6 +56,12 @@ class YDocWebSocketHandler(WebSocketHandler, JupyterHandler):
5656
5757 _message_queue : asyncio .Queue [Any ]
5858 _background_tasks : set [asyncio .Task ]
59+ _room_locks : dict [str , asyncio .Lock ] = {}
60+
61+ def _room_lock (self , room_id : str ):
62+ if room_id not in self ._room_locks :
63+ self ._room_locks [room_id ] = asyncio .Lock ()
64+ return self ._room_locks [room_id ]
5965
6066 def create_task (self , aw ):
6167 task = asyncio .create_task (aw )
@@ -70,38 +76,38 @@ async def prepare(self):
7076 # Get room
7177 self ._room_id : str = self .request .path .split ("/" )[- 1 ]
7278
73- if self ._websocket_server .room_exists (self ._room_id ):
74- self .room : YRoom = await self ._websocket_server .get_room (self ._room_id )
75-
76- else :
77- if self ._room_id .count (":" ) >= 2 :
78- # DocumentRoom
79- file_format , file_type , file_id = decode_file_path (self ._room_id )
80- if file_id in self ._file_loaders :
81- self ._emit (
82- LogLevel .WARNING ,
83- None ,
84- "There is another collaborative session accessing the same file.\n The synchronization between rooms is not supported and you might lose some of your changes." ,
79+ async with self ._room_lock (self ._room_id ):
80+ if self ._websocket_server .room_exists (self ._room_id ):
81+ self .room : YRoom = await self ._websocket_server .get_room (self ._room_id )
82+ else :
83+ if self ._room_id .count (":" ) >= 2 :
84+ # DocumentRoom
85+ file_format , file_type , file_id = decode_file_path (self ._room_id )
86+ if file_id in self ._file_loaders :
87+ self ._emit (
88+ LogLevel .WARNING ,
89+ None ,
90+ "There is another collaborative session accessing the same file.\n The synchronization between rooms is not supported and you might lose some of your changes." ,
91+ )
92+
93+ file = self ._file_loaders [file_id ]
94+ updates_file_path = f".{ file_type } :{ file_id } .y"
95+ ystore = self ._ystore_class (path = updates_file_path , log = self .log )
96+ self .room = DocumentRoom (
97+ self ._room_id ,
98+ file_format ,
99+ file_type ,
100+ file ,
101+ self .event_logger ,
102+ ystore ,
103+ self .log ,
104+ self ._document_save_delay ,
85105 )
86106
87- file = self ._file_loaders [file_id ]
88- updates_file_path = f".{ file_type } :{ file_id } .y"
89- ystore = self ._ystore_class (path = updates_file_path , log = self .log )
90- self .room = DocumentRoom (
91- self ._room_id ,
92- file_format ,
93- file_type ,
94- file ,
95- self .event_logger ,
96- ystore ,
97- self .log ,
98- self ._document_save_delay ,
99- )
100-
101- else :
102- # TransientRoom
103- # it is a transient document (e.g. awareness)
104- self .room = TransientRoom (self ._room_id , self .log )
107+ else :
108+ # TransientRoom
109+ # it is a transient document (e.g. awareness)
110+ self .room = TransientRoom (self ._room_id , self .log )
105111
106112 await self ._websocket_server .start_room (self .room )
107113 self ._websocket_server .add_room (self ._room_id , self .room )
@@ -184,7 +190,8 @@ async def open(self, room_id):
184190
185191 try :
186192 # Initialize the room
187- await self .room .initialize ()
193+ async with self ._room_lock (self ._room_id ):
194+ await self .room .initialize ()
188195 self ._emit_awareness_event (self .current_user .username , "join" )
189196 except Exception as e :
190197 _ , _ , file_id = decode_file_path (self ._room_id )
@@ -323,29 +330,31 @@ async def _clean_room(self) -> None:
323330 contains a copy of the document. In addition, we remove the file if there is no rooms
324331 subscribed to it.
325332 """
326- assert isinstance (self .room , DocumentRoom )
327-
328- if self ._cleanup_delay is None :
329- return
330-
331- await asyncio .sleep (self ._cleanup_delay )
332-
333- # Remove the room from the websocket server
334- self .log .info ("Deleting Y document from memory: %s" , self .room .room_id )
335- self ._websocket_server .delete_room (room = self .room )
336-
337- # Clean room
338- del self .room
339- self .log .info ("Room %s deleted" , self ._room_id )
340- self ._emit (LogLevel .INFO , "clean" , "Room deleted." )
341-
342- # Clean the file loader if there are not rooms using it
343- _ , _ , file_id = decode_file_path (self ._room_id )
344- file = self ._file_loaders [file_id ]
345- if file .number_of_subscriptions == 0 :
346- self .log .info ("Deleting file %s" , file .path )
347- await self ._file_loaders .remove (file_id )
348- self ._emit (LogLevel .INFO , "clean" , "Loader deleted." )
333+ async with self ._room_lock (self ._room_id ):
334+ assert isinstance (self .room , DocumentRoom )
335+
336+ if self ._cleanup_delay is None :
337+ return
338+
339+ await asyncio .sleep (self ._cleanup_delay )
340+
341+ # Remove the room from the websocket server
342+ self .log .info ("Deleting Y document from memory: %s" , self .room .room_id )
343+ self ._websocket_server .delete_room (room = self .room )
344+
345+ # Clean room
346+ del self .room
347+ self .log .info ("Room %s deleted" , self ._room_id )
348+ self ._emit (LogLevel .INFO , "clean" , "Room deleted." )
349+
350+ # Clean the file loader if there are not rooms using it
351+ _ , _ , file_id = decode_file_path (self ._room_id )
352+ file = self ._file_loaders [file_id ]
353+ if file .number_of_subscriptions == 0 :
354+ self .log .info ("Deleting file %s" , file .path )
355+ await self ._file_loaders .remove (file_id )
356+ self ._emit (LogLevel .INFO , "clean" , "Loader deleted." )
357+ del self ._room_locks [self ._room_id ]
349358
350359 def check_origin (self , origin ):
351360 """
0 commit comments