Skip to content

Commit d4ec54a

Browse files
Handle last_modified only in FileLoader (#232)
1 parent 0f98e92 commit d4ec54a

File tree

3 files changed

+77
-91
lines changed

3 files changed

+77
-91
lines changed

jupyter_collaboration/loaders.py

Lines changed: 37 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,7 @@ def __init__(
3838
self._contents_manager = contents_manager
3939

4040
self._log = log or getLogger(__name__)
41-
self._subscriptions: dict[
42-
str, Callable[[str, dict[str, Any]], Coroutine[Any, Any, None]]
43-
] = {}
41+
self._subscriptions: dict[str, Callable[[], Coroutine[Any, Any, None]]] = {}
4442

4543
self._watcher = asyncio.create_task(self._watch_file()) if self._poll_interval else None
4644
self.last_modified = None
@@ -78,11 +76,9 @@ async def clean(self) -> None:
7876
self._watcher.cancel()
7977
await self._watcher
8078

81-
def observe(
82-
self, id: str, callback: Callable[[str, dict[str, Any]], Coroutine[Any, Any, None]]
83-
) -> None:
79+
def observe(self, id: str, callback: Callable[[], Coroutine[Any, Any, None]]) -> None:
8480
"""
85-
Subscribe to the file to get notified on file changes.
81+
Subscribe to the file to get notified about out-of-band file changes.
8682
8783
Parameters:
8884
id (str): Room ID
@@ -99,7 +95,7 @@ def unobserve(self, id: str) -> None:
9995
"""
10096
del self._subscriptions[id]
10197

102-
async def load_content(self, format: str, file_type: str, content: bool) -> dict[str, Any]:
98+
async def load_content(self, format: str, file_type: str) -> dict[str, Any]:
10399
"""
104100
Load the content of the file.
105101
@@ -112,31 +108,11 @@ async def load_content(self, format: str, file_type: str, content: bool) -> dict
112108
model (dict): A dictionary with the metadata and content of the file.
113109
"""
114110
async with self._lock:
115-
return await ensure_async(
116-
self._contents_manager.get(
117-
self.path, format=format, type=file_type, content=content
118-
)
111+
model = await ensure_async(
112+
self._contents_manager.get(self.path, format=format, type=file_type, content=True)
119113
)
120-
121-
async def save_content(self, model: dict[str, Any]) -> dict[str, Any]:
122-
"""
123-
Save the content of the file.
124-
125-
Parameters:
126-
model (dict): A dictionary with format, type, last_modified, and content of the file.
127-
128-
Returns:
129-
model (dict): A dictionary with the metadata and content of the file.
130-
"""
131-
async with self._lock:
132-
path = self.path
133-
if model["type"] not in {"directory", "file", "notebook"}:
134-
# fall back to file if unknown type, the content manager only knows
135-
# how to handle these types
136-
model["type"] = "file"
137-
138-
self._log.info("Saving file: %s", path)
139-
return await ensure_async(self._contents_manager.save(model, path))
114+
self.last_modified = model["last_modified"]
115+
return model
140116

141117
async def maybe_save_content(self, model: dict[str, Any]) -> None:
142118
"""
@@ -168,16 +144,24 @@ async def maybe_save_content(self, model: dict[str, Any]) -> None:
168144
self._log.info("Saving file: %s", path)
169145
# saving is shielded so that it cannot be cancelled
170146
# otherwise it could corrupt the file
171-
task = asyncio.create_task(self._save_content(model))
172-
await asyncio.shield(task)
173-
147+
done_saving = asyncio.Event()
148+
task = asyncio.create_task(self._save_content(model, done_saving))
149+
try:
150+
await asyncio.shield(task)
151+
except asyncio.CancelledError:
152+
pass
153+
await done_saving.wait()
174154
else:
175155
# file changed on disk, raise an error
156+
self.last_modified = m["last_modified"]
176157
raise OutOfBandChanges
177158

178-
async def _save_content(self, model: dict[str, Any]) -> None:
179-
m = await ensure_async(self._contents_manager.save(model, self.path))
180-
self.last_modified = m["last_modified"]
159+
async def _save_content(self, model: dict[str, Any], done_saving: asyncio.Event) -> None:
160+
try:
161+
m = await ensure_async(self._contents_manager.save(model, self.path))
162+
self.last_modified = m["last_modified"]
163+
finally:
164+
done_saving.set()
181165

182166
async def _watch_file(self) -> None:
183167
"""
@@ -192,24 +176,31 @@ async def _watch_file(self) -> None:
192176
try:
193177
await asyncio.sleep(self._poll_interval)
194178
try:
195-
await self.notify()
179+
await self.maybe_notify()
196180
except Exception as e:
197181
self._log.error(f"Error watching file: {self.path}\n{e!r}", exc_info=e)
198182
except asyncio.CancelledError:
199183
break
200184

201-
async def notify(self) -> None:
185+
async def maybe_notify(self) -> None:
202186
"""
203-
Notifies subscribed rooms about changes on the content of the file.
187+
Notifies subscribed rooms about out-of-band file changes.
204188
"""
189+
do_notify = False
205190
async with self._lock:
206-
path = self.path
207191
# Get model metadata; format and type are not need
208-
model = await ensure_async(self._contents_manager.get(path, content=False))
192+
model = await ensure_async(self._contents_manager.get(self.path, content=False))
193+
194+
if self.last_modified is not None and self.last_modified < model["last_modified"]:
195+
do_notify = True
196+
197+
self.last_modified = model["last_modified"]
209198

210-
# Notify that the content changed on disk
211-
for callback in self._subscriptions.values():
212-
await callback("metadata", model)
199+
if do_notify:
200+
# Notify out-of-band change
201+
# callbacks will load the file content, thus release the lock before calling them
202+
for callback in self._subscriptions.values():
203+
await callback()
213204

214205

215206
class FileLoaderMapping:

jupyter_collaboration/rooms.py

Lines changed: 26 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def __init__(
5151

5252
# Listen for document changes
5353
self._document.observe(self._on_document_change)
54-
self._file.observe(self.room_id, self._on_content_change)
54+
self._file.observe(self.room_id, self._on_outofband_change)
5555

5656
@property
5757
def room_id(self) -> str:
@@ -95,7 +95,7 @@ async def initialize(self) -> None:
9595

9696
self.log.info("Initializing room %s", self._room_id)
9797

98-
model = await self._file.load_content(self._file_format, self._file_type, True)
98+
model = await self._file.load_content(self._file_format, self._file_type)
9999

100100
async with self._update_lock:
101101
# try to apply Y updates from the YStore for this document
@@ -144,7 +144,6 @@ async def initialize(self) -> None:
144144
if self.ystore:
145145
await self.ystore.encode_state_as_update(self.ydoc)
146146

147-
self._file.last_modified = model["last_modified"]
148147
self._document.dirty = False
149148
self.ready = True
150149
self._emit(LogLevel.INFO, "initialize", "Room initialized")
@@ -179,32 +178,24 @@ async def _broadcast_updates(self):
179178
except asyncio.CancelledError:
180179
pass
181180

182-
async def _on_content_change(self, event: str, args: dict[str, Any]) -> None:
181+
async def _on_outofband_change(self) -> None:
183182
"""
184-
Called when the file changes.
185-
186-
Parameters:
187-
event (str): Type of change.
188-
args (dict): A dictionary with format, type, last_modified.
183+
Called when the file got out-of-band changes.
189184
"""
190-
if event == "metadata" and (
191-
self._file.last_modified is None or self._file.last_modified < args["last_modified"]
192-
):
193-
self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id)
194-
self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes. Overwriting the room.")
185+
self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id)
186+
self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes. Overwriting the room.")
195187

196-
try:
197-
model = await self._file.load_content(self._file_format, self._file_type, True)
198-
except Exception as e:
199-
msg = f"Error loading content from file: {self._file.path}\n{e!r}"
200-
self.log.error(msg, exc_info=e)
201-
self._emit(LogLevel.ERROR, None, msg)
202-
return None
188+
try:
189+
model = await self._file.load_content(self._file_format, self._file_type)
190+
except Exception as e:
191+
msg = f"Error loading content from file: {self._file.path}\n{e!r}"
192+
self.log.error(msg, exc_info=e)
193+
self._emit(LogLevel.ERROR, None, msg)
194+
return
203195

204-
async with self._update_lock:
205-
self._document.source = model["content"]
206-
self._file.last_modified = model["last_modified"]
207-
self._document.dirty = False
196+
async with self._update_lock:
197+
self._document.source = model["content"]
198+
self._document.dirty = False
208199

209200
def _on_document_change(self, target: str, event: Any) -> None:
210201
"""
@@ -224,14 +215,11 @@ def _on_document_change(self, target: str, event: Any) -> None:
224215
if self._update_lock.locked():
225216
return
226217

227-
if self._saving_document is not None and not self._saving_document.done():
228-
# the document is being saved, cancel that
229-
self._saving_document.cancel()
230-
self._saving_document = None
231-
232-
self._saving_document = asyncio.create_task(self._maybe_save_document())
218+
self._saving_document = asyncio.create_task(
219+
self._maybe_save_document(self._saving_document)
220+
)
233221

234-
async def _maybe_save_document(self) -> None:
222+
async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> None:
235223
"""
236224
Saves the content of the document to disk.
237225
@@ -243,6 +231,11 @@ async def _maybe_save_document(self) -> None:
243231
if self._save_delay is None:
244232
return
245233

234+
if saving_document is not None and not saving_document.done():
235+
# the document is being saved, cancel that
236+
saving_document.cancel()
237+
await saving_document
238+
246239
# save after X seconds of inactivity
247240
await asyncio.sleep(self._save_delay)
248241

@@ -263,7 +256,7 @@ async def _maybe_save_document(self) -> None:
263256
except OutOfBandChanges:
264257
self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id)
265258
try:
266-
model = await self._file.load_content(self._file_format, self._file_type, True)
259+
model = await self._file.load_content(self._file_format, self._file_type)
267260
except Exception as e:
268261
msg = f"Error loading content from file: {self._file.path}\n{e!r}"
269262
self.log.error(msg, exc_info=e)
@@ -272,7 +265,6 @@ async def _maybe_save_document(self) -> None:
272265

273266
async with self._update_lock:
274267
self._document.source = model["content"]
275-
self._file.last_modified = model["last_modified"]
276268
self._document.dirty = False
277269

278270
self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes while saving.")

tests/test_loaders.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from __future__ import annotations
55

66
import asyncio
7-
from datetime import datetime
7+
from datetime import datetime, timedelta, timezone
88

99
from jupyter_collaboration.loaders import FileLoader, FileLoaderMapping
1010

@@ -17,23 +17,24 @@ async def test_FileLoader_with_watcher():
1717
paths = {}
1818
paths[id] = path
1919

20-
cm = FakeContentsManager({"last_modified": datetime.now()})
20+
cm = FakeContentsManager({"last_modified": datetime.now(timezone.utc)})
2121
loader = FileLoader(
2222
id,
2323
FakeFileIDManager(paths),
2424
cm,
2525
poll_interval=0.1,
2626
)
27+
await loader.load_content("text", "file")
2728

2829
triggered = False
2930

30-
async def trigger(*args):
31+
async def trigger():
3132
nonlocal triggered
3233
triggered = True
3334

3435
loader.observe("test", trigger)
3536

36-
cm.model["last_modified"] = datetime.now()
37+
cm.model["last_modified"] = datetime.now(timezone.utc) + timedelta(seconds=1)
3738

3839
await asyncio.sleep(0.15)
3940

@@ -49,24 +50,25 @@ async def test_FileLoader_without_watcher():
4950
paths = {}
5051
paths[id] = path
5152

52-
cm = FakeContentsManager({"last_modified": datetime.now()})
53+
cm = FakeContentsManager({"last_modified": datetime.now(timezone.utc)})
5354
loader = FileLoader(
5455
id,
5556
FakeFileIDManager(paths),
5657
cm,
5758
)
59+
await loader.load_content("text", "file")
5860

5961
triggered = False
6062

61-
async def trigger(*args):
63+
async def trigger():
6264
nonlocal triggered
6365
triggered = True
6466

6567
loader.observe("test", trigger)
6668

67-
cm.model["last_modified"] = datetime.now()
69+
cm.model["last_modified"] = datetime.now(timezone.utc) + timedelta(seconds=1)
6870

69-
await loader.notify()
71+
await loader.maybe_notify()
7072

7173
try:
7274
assert triggered
@@ -80,26 +82,27 @@ async def test_FileLoaderMapping_with_watcher():
8082
paths = {}
8183
paths[id] = path
8284

83-
cm = FakeContentsManager({"last_modified": datetime.now()})
85+
cm = FakeContentsManager({"last_modified": datetime.now(timezone.utc)})
8486

8587
map = FileLoaderMapping(
8688
{"contents_manager": cm, "file_id_manager": FakeFileIDManager(paths)},
8789
file_poll_interval=1.0,
8890
)
8991

9092
loader = map[id]
93+
await loader.load_content("text", "file")
9194

9295
triggered = False
9396

94-
async def trigger(*args):
97+
async def trigger():
9598
nonlocal triggered
9699
triggered = True
97100

98101
loader.observe("test", trigger)
99102

100103
# Clear map (and its loader) before updating => triggered should be False
101104
await map.clear()
102-
cm.model["last_modified"] = datetime.now()
105+
cm.model["last_modified"] = datetime.now(timezone.utc)
103106

104107
await asyncio.sleep(0.15)
105108

0 commit comments

Comments
 (0)