Skip to content

Commit 0fd9b4c

Browse files
committed
refactor DocumentRoom to not rely on asyncio locks
1 parent f675964 commit 0fd9b4c

File tree

1 file changed

+107
-88
lines changed

1 file changed

+107
-88
lines changed

jupyter_collaboration/rooms.py

Lines changed: 107 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,24 @@ def __init__(
4343
self._logger = logger
4444
self._save_delay = save_delay
4545

46-
self._update_lock = asyncio.Lock()
47-
self._initialization_lock = asyncio.Lock()
4846
self._cleaner: asyncio.Task | None = None
49-
self._saving_document: asyncio.Task | None = None
50-
self._messages: dict[str, asyncio.Lock] = {}
47+
48+
# the task currently saving to disk via FileLoader, if any.
49+
self._save_task: asyncio.Task | None = None
50+
51+
# flag that indicates whether a previous call to
52+
# `self._maybe_save_document()` is waiting to save.
53+
#
54+
# if `self._maybe_save_document()` is called while this flag is `True`,
55+
# then the call does nothing, as a previous task will save the Ydoc
56+
# within `self._save_delay` seconds.
57+
self._waiting_to_save = False
58+
59+
# flag that indicates whether a previous call to
60+
# `self._maybe_save_document()` should call itself again after
61+
# `self._save_task` finishes. this is set to `True` when a document
62+
# update occurs while `self._save_task` is running.
63+
self._should_resave = False
5164

5265
# Listen for document changes
5366
self._document.observe(self._on_document_change)
@@ -78,7 +91,8 @@ async def initialize(self) -> None:
7891
"""
7992
Initializes the room.
8093
81-
This method is thread safe so only one client can initialize the room.
94+
This method is not coroutine-safe, i.e. consumers must await this method
95+
before calling any other methods.
8296
8397
To initialize the room, we check if the content was already in the store
8498
as a Y updates and if it is up to date with the content on disk. In this
@@ -89,64 +103,61 @@ async def initialize(self) -> None:
89103
It is important to set the ready property in the parent class (`self.ready = True`),
90104
this setter will subscribe for updates on the shared document.
91105
"""
92-
async with self._initialization_lock:
93-
if self.ready: # type: ignore[has-type]
94-
return
106+
if self.ready: # type: ignore[has-type]
107+
return
95108

96-
self.log.info("Initializing room %s", self._room_id)
109+
self.log.info("Initializing room %s", self._room_id)
110+
model = await self._file.load_content(self._file_format, self._file_type)
97111

98-
model = await self._file.load_content(self._file_format, self._file_type)
112+
# try to apply Y updates from the YStore for this document
113+
read_from_source = True
114+
if self.ystore is not None:
115+
try:
116+
await self.ystore.apply_updates(self.ydoc)
117+
self._emit(
118+
LogLevel.INFO,
119+
"load",
120+
"Content loaded from the store {}".format(
121+
self.ystore.__class__.__qualname__
122+
),
123+
)
124+
self.log.info(
125+
"Content in room %s loaded from the ystore %s",
126+
self._room_id,
127+
self.ystore.__class__.__name__,
128+
)
129+
read_from_source = False
130+
except YDocNotFound:
131+
# YDoc not found in the YStore, create the document from the source file (no change history)
132+
pass
133+
134+
if read_from_source:
135+
self._emit(LogLevel.INFO, "load", "Content loaded from disk.")
136+
self.log.info(
137+
"Content in room %s loaded from file %s", self._room_id, self._file.path
138+
)
139+
self._document.source = model["content"]
99140

100-
async with self._update_lock:
101-
# try to apply Y updates from the YStore for this document
141+
if self.ystore:
142+
await self.ystore.encode_state_as_update(self.ydoc)
143+
else:
144+
# if YStore updates and source file are out-of-sync, resync updates with source
145+
if self._document.source != model["content"]:
146+
# TODO: Delete document from the store.
147+
self._emit(
148+
LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore."
149+
)
150+
self.log.info(
151+
"Content in file %s is out-of-sync with the ystore %s",
152+
self._file.path,
153+
self.ystore.__class__.__name__,
154+
)
102155
read_from_source = True
103-
if self.ystore is not None:
104-
try:
105-
await self.ystore.apply_updates(self.ydoc)
106-
self._emit(
107-
LogLevel.INFO,
108-
"load",
109-
"Content loaded from the store {}".format(
110-
self.ystore.__class__.__qualname__
111-
),
112-
)
113-
self.log.info(
114-
"Content in room %s loaded from the ystore %s",
115-
self._room_id,
116-
self.ystore.__class__.__name__,
117-
)
118-
read_from_source = False
119-
except YDocNotFound:
120-
# YDoc not found in the YStore, create the document from the source file (no change history)
121-
pass
122-
123-
if read_from_source:
124-
self._emit(LogLevel.INFO, "load", "Content loaded from disk.")
125-
self.log.info(
126-
"Content in room %s loaded from file %s", self._room_id, self._file.path
127-
)
128-
self._document.source = model["content"]
129-
130-
if self.ystore:
131-
await self.ystore.encode_state_as_update(self.ydoc)
132-
else:
133-
# if YStore updates and source file are out-of-sync, resync updates with source
134-
if self._document.source != model["content"]:
135-
# TODO: Delete document from the store.
136-
self._emit(
137-
LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore."
138-
)
139-
self.log.info(
140-
"Content in file %s is out-of-sync with the ystore %s",
141-
self._file.path,
142-
self.ystore.__class__.__name__,
143-
)
144-
read_from_source = True
145-
146-
147-
self._document.dirty = False
148-
self.ready = True
149-
self._emit(LogLevel.INFO, "initialize", "Room initialized")
156+
157+
158+
self._document.dirty = False
159+
self.ready = True
160+
self._emit(LogLevel.INFO, "initialize", "Room initialized")
150161

151162
def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None:
152163
data = {"level": level.value, "room": self._room_id, "path": self._file.path}
@@ -165,8 +176,8 @@ def stop(self) -> None:
165176
"""
166177
super().stop()
167178
# TODO: Should we cancel or wait ?
168-
if self._saving_document:
169-
self._saving_document.cancel()
179+
if self._save_task:
180+
self._save_task.cancel()
170181

171182
self._document.unobserve()
172183
self._file.unobserve(self.room_id)
@@ -193,9 +204,8 @@ async def _on_outofband_change(self) -> None:
193204
self._emit(LogLevel.ERROR, None, msg)
194205
return
195206

196-
async with self._update_lock:
197-
self._document.source = model["content"]
198-
self._document.dirty = False
207+
self._document.source = model["content"]
208+
self._document.dirty = False
199209

200210
def _on_document_change(self, target: str, event: Any) -> None:
201211
"""
@@ -212,49 +222,60 @@ def _on_document_change(self, target: str, event: Any) -> None:
212222
document. This tasks are debounced (60 seconds by default) so we
213223
need to cancel previous tasks before creating a new one.
214224
"""
215-
if self._update_lock.locked():
216-
return
217-
218-
self._saving_document = asyncio.create_task(
219-
self._maybe_save_document(self._saving_document)
220-
)
225+
asyncio.create_task(self._maybe_save_document())
221226

222-
async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> None:
227+
async def _maybe_save_document(self) -> None:
223228
"""
224229
Saves the content of the document to disk.
225230
226231
### Note:
227-
There is a save delay to debounce the save since we could receive a high
232+
There is a save delay to throttle the save since we could receive a high
228233
amount of changes in a short period of time. This way we can cancel the
229234
previous save.
230235
"""
231236
if self._save_delay is None:
237+
# TODO: fix this; if _save_delay is unset, then this never writes to disk
232238
return
233239

234-
if saving_document is not None and not saving_document.done():
235-
# the document is being saved, cancel that
236-
saving_document.cancel()
240+
if self._waiting_to_save:
241+
# if a previously spawned `self._maybe_save_document()` task is
242+
# waiting to save, then that task will save the Ydoc within
243+
# `self._save_delay` seconds. therefore we can return early.
244+
return
245+
246+
if self._save_task and not self._save_task.done():
247+
# if we are currently saving, then set the `_should_resave`
248+
# flag. this indicates to the currently running `self._save_task`
249+
# that it should re-call this method after it completes.
250+
self._should_resave = True
251+
return
252+
253+
# save after `self._save_delay` seconds of inactivity
254+
self._waiting_to_save = True
255+
await asyncio.sleep(self._save_delay)
256+
self._waiting_to_save = False
237257

238258
# all async code (i.e. await statements) must be part of this try/except block
239259
# because this coroutine is run in a cancellable task and cancellation is handled here
240-
241260
try:
242-
# save after X seconds of inactivity
243-
await asyncio.sleep(self._save_delay)
244-
261+
# do not write to `self._document` in this `try` block, as that will
262+
# trigger the observer and result in a save loop.
245263
self.log.info("Saving the content from room %s", self._room_id)
246-
await self._file.maybe_save_content(
264+
loop = asyncio.get_running_loop()
265+
self._save_task = loop.create_task(self._file.maybe_save_content(
247266
{
248267
"format": self._file_format,
249268
"type": self._file_type,
250269
"content": self._document.source,
251270
}
252-
)
253-
async with self._update_lock:
254-
self._document.dirty = False
255-
271+
))
272+
await self._save_task
256273
self._emit(LogLevel.INFO, "save", "Content saved.")
257274

275+
if self._should_resave:
276+
self._should_resave = False
277+
asyncio.create_task(self._maybe_save_document())
278+
258279
except asyncio.CancelledError:
259280
return
260281

@@ -268,10 +289,8 @@ async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> No
268289
self._emit(LogLevel.ERROR, None, msg)
269290
return None
270291

271-
async with self._update_lock:
272-
self._document.source = model["content"]
273-
self._document.dirty = False
274-
292+
self._document.source = model["content"]
293+
self._document.dirty = False
275294
self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes while saving.")
276295

277296
except Exception as e:

0 commit comments

Comments
 (0)