21
21
class DocumentRoom (BaseRoom ):
22
22
"""A Y room for a possibly stored document (e.g. a notebook)."""
23
23
24
+ _background_tasks : set [asyncio .Task ]
25
+
24
26
def __init__ (
25
27
self ,
26
28
room_id : str ,
@@ -47,6 +49,7 @@ def __init__(
47
49
self ._cleaner : asyncio .Task | None = None
48
50
self ._saving_document : asyncio .Task | None = None
49
51
self ._messages : dict [str , asyncio .Lock ] = {}
52
+ self ._background_tasks = set ()
50
53
51
54
# Listen for document changes
52
55
self ._document .observe (self ._on_document_change )
@@ -78,6 +81,10 @@ async def initialize(self) -> None:
78
81
# try to apply Y updates from the YStore for this document
79
82
read_from_source = True
80
83
if self .ystore is not None :
84
+ async with self .ystore .start_lock :
85
+ if not self .ystore .started .is_set ():
86
+ self .create_task (self .ystore .start ())
87
+ await self .ystore .started .wait ()
81
88
try :
82
89
await self .ystore .apply_updates (self .ydoc )
83
90
self ._emit (
@@ -152,7 +159,20 @@ async def stop(self) -> None:
152
159
if self ._saving_document :
153
160
self ._saving_document .cancel ()
154
161
155
- return super ().stop ()
162
+ self ._document .unobserve ()
163
+ self ._file .unobserve (self .room_id )
164
+
165
+ def create_task (self , aw ):
166
+ task = asyncio .create_task (aw )
167
+ self ._background_tasks .add (task )
168
+ task .add_done_callback (self ._background_tasks .discard )
169
+
170
+ async def _broadcast_updates (self ):
171
+ # FIXME should be upstreamed
172
+ try :
173
+ await super ()._broadcast_updates ()
174
+ except asyncio .CancelledError :
175
+ pass
156
176
157
177
async def _on_outofband_change (self ) -> None :
158
178
"""
0 commit comments