Skip to content

Commit 0a361fa

Browse files
authored
Improves autosaving documents (#206)
* Improves autosaving documents * pre-commit
1 parent 3ece53a commit 0a361fa

File tree

6 files changed

+58
-65
lines changed

6 files changed

+58
-65
lines changed

.flake8

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ enable-extensions = G
1010
extend-ignore =
1111
G001, G002, G004, G200, G201, G202,
1212
# black adds spaces around ':'
13-
E203,
13+
E203,E231
1414
per-file-ignores =
1515
# B011: Do not call assert False since python -O removes these calls
1616
# F841 local variable 'foo' is assigned to but never used

jupyter_collaboration/handlers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ async def open(self, room_id):
137137
self.close(1004, f"File {path} not found.")
138138
else:
139139
self.log.error(f"Error initializing: {path}\n{e!r}", exc_info=e)
140-
self.close(1003, f"Error initializing: {path}. You need to close the document.")
140+
self.close(1005, f"Error initializing: {path}. You need to close the document.")
141141

142142
# Clean up the room and delete the file loader
143143
if self.room is not None and len(self.room.clients) == 0 or self.room.clients == [self]:

jupyter_collaboration/rooms/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ async def handle_msg(self, data: bytes) -> None:
4444

4545
def broadcast_msg(self, msg: bytes) -> None:
4646
for client in self.clients:
47-
self._task_group.start_soon(client.send, msg)
47+
self._task_group.start_soon(client.send, msg) # type: ignore[union-attr]
4848

4949
async def _broadcast_updates(self):
5050
# FIXME should be upstreamed

jupyter_collaboration/rooms/document.py

Lines changed: 45 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ async def initialize(self) -> None:
7777
this setter will subscribe for updates on the shared document.
7878
"""
7979
async with self._initialization_lock:
80-
if self.ready: # type: ignore[has-type]
80+
if self.ready:
8181
return
8282

8383
self.log.info("Initializing room %s", self._room_id)
@@ -88,7 +88,9 @@ async def initialize(self) -> None:
8888
if self.ystore is not None and await self.ystore.exists(self._room_id):
8989
# Load the content from the store
9090
doc = await self.ystore.get(self._room_id)
91+
assert doc
9192
self._session_id = doc["session_id"]
93+
9294
await self.ystore.apply_updates(self._room_id, self.ydoc)
9395
self._emit(
9496
LogLevel.INFO,
@@ -207,18 +209,7 @@ async def _on_content_change(self, event: str, args: dict[str, Any]) -> None:
207209
if event == "metadata" and (
208210
self._last_modified is None or self._last_modified < args["last_modified"]
209211
):
210-
self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id)
211-
self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes. Overwriting the room.")
212-
213-
msg_id = str(uuid.uuid4())
214-
self._messages[msg_id] = asyncio.Lock()
215-
await self._outofband_lock.acquire()
216-
data = msg_id.encode()
217-
self.broadcast_msg(
218-
bytes([MessageType.ROOM, RoomMessages.FILE_CHANGED])
219-
+ write_var_uint(len(data))
220-
+ data
221-
)
212+
await self._send_confict_msg()
222213

223214
def _on_document_change(self, target: str, event: Any) -> None:
224215
"""
@@ -247,34 +238,35 @@ def _on_document_change(self, target: str, event: Any) -> None:
247238

248239
async def _load_document(self) -> None:
249240
try:
250-
model = await self._file.load_content(self._file_format, self._file_type, True)
241+
async with self._update_lock:
242+
model = await self._file.load_content(self._file_format, self._file_type, True)
243+
self._document.source = model["content"]
244+
self._last_modified = model["last_modified"]
245+
self._document.dirty = False
246+
251247
except Exception as e:
252248
msg = f"Error loading content from file: {self._file.path}\n{e!r}"
253249
self.log.error(msg, exc_info=e)
254250
self._emit(LogLevel.ERROR, None, msg)
255251
return None
256252

257-
async with self._update_lock:
258-
self._document.source = model["content"]
259-
self._last_modified = model["last_modified"]
260-
self._document.dirty = False
261-
262253
async def _save_document(self) -> None:
263254
"""
264255
Saves the content of the document to disk.
265256
"""
266257
try:
267258
self.log.info("Saving the content from room %s", self._room_id)
268-
model = await self._file.save_content(
269-
{
270-
"format": self._file_format,
271-
"type": self._file_type,
272-
"last_modified": self._last_modified,
273-
"content": self._document.source,
274-
}
275-
)
276-
self._last_modified = model["last_modified"]
259+
277260
async with self._update_lock:
261+
model = await self._file.save_content(
262+
{
263+
"format": self._file_format,
264+
"type": self._file_type,
265+
"last_modified": self._last_modified,
266+
"content": self._document.source,
267+
}
268+
)
269+
self._last_modified = model["last_modified"]
278270
self._document.dirty = False
279271

280272
self._emit(LogLevel.INFO, "save", "Content saved.")
@@ -299,40 +291,41 @@ async def _maybe_save_document(self) -> None:
299291
# save after X seconds of inactivity
300292
await asyncio.sleep(self._save_delay)
301293

294+
if self._outofband_lock.locked():
295+
return
296+
302297
try:
303298
self.log.info("Saving the content from room %s", self._room_id)
304-
model = await self._file.maybe_save_content(
305-
{
306-
"format": self._file_format,
307-
"type": self._file_type,
308-
"last_modified": self._last_modified,
309-
"content": self._document.source,
310-
}
311-
)
312-
self._last_modified = model["last_modified"]
313299
async with self._update_lock:
300+
model = await self._file.maybe_save_content(
301+
{
302+
"format": self._file_format,
303+
"type": self._file_type,
304+
"last_modified": self._last_modified,
305+
"content": self._document.source,
306+
}
307+
)
308+
self._last_modified = model["last_modified"]
314309
self._document.dirty = False
315310

316311
self._emit(LogLevel.INFO, "save", "Content saved.")
317312

318313
except OutOfBandChanges:
319-
self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id)
320-
try:
321-
model = await self._file.load_content(self._file_format, self._file_type, True)
322-
except Exception as e:
323-
msg = f"Error loading content from file: {self._file.path}\n{e!r}"
324-
self.log.error(msg, exc_info=e)
325-
self._emit(LogLevel.ERROR, None, msg)
326-
return None
327-
328-
async with self._update_lock:
329-
self._document.source = model["content"]
330-
self._last_modified = model["last_modified"]
331-
self._document.dirty = False
332-
333-
self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes while saving.")
314+
await self._send_confict_msg()
334315

335316
except Exception as e:
336317
msg = f"Error saving file: {self._file.path}\n{e!r}"
337318
self.log.error(msg, exc_info=e)
338319
self._emit(LogLevel.ERROR, None, msg)
320+
321+
async def _send_confict_msg(self) -> None:
322+
self.log.info("Out-of-band changes in room %s", self._room_id)
323+
self._emit(LogLevel.INFO, "overwrite", f"Out-of-band changes in room {self._room_id}")
324+
325+
msg_id = str(uuid.uuid4())
326+
self._messages[msg_id] = asyncio.Lock()
327+
await self._outofband_lock.acquire()
328+
data = msg_id.encode()
329+
self.broadcast_msg(
330+
bytes([MessageType.ROOM, RoomMessages.FILE_CHANGED]) + write_var_uint(len(data)) + data
331+
)

jupyter_collaboration/rooms/yroom.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ def on_message(self) -> Callable[[bytes], Awaitable[bool] | bool] | None:
115115
return self._on_message
116116

117117
@on_message.setter
118-
def on_message(self, value: Callable[[bytes], Awaitable[bool] | bool] | None):
118+
def on_message(self, value: Callable[[bytes], Awaitable[bool] | bool] | None) -> None:
119119
"""
120120
Arguments:
121121
value: An optional callback to call when a message is received. If the callback returns True, the message is skipped.
@@ -125,17 +125,17 @@ def on_message(self, value: Callable[[bytes], Awaitable[bool] | bool] | None):
125125
async def _broadcast_updates(self):
126126
async with self._update_receive_stream:
127127
async for update in self._update_receive_stream:
128-
if self._task_group.cancel_scope.cancel_called:
128+
if self._task_group.cancel_scope.cancel_called: # type: ignore[union-attr]
129129
return
130130
# broadcast internal ydoc's update to all clients, that includes changes from the
131131
# clients and changes from the backend (out-of-band changes)
132132
for client in self.clients:
133133
self.log.debug("Sending Y update to client with endpoint: %s", client.path)
134134
message = create_update_message(update)
135-
self._task_group.start_soon(client.send, message)
135+
self._task_group.start_soon(client.send, message) # type: ignore[union-attr]
136136
if self.ystore:
137137
self.log.debug("Writing Y update to YStore")
138-
self._task_group.start_soon(self.ystore.write, client.path, update)
138+
self._task_group.start_soon(self.ystore.write, client.path, update) # type: ignore[union-attr]
139139

140140
async def __aenter__(self) -> YRoom:
141141
if self._task_group is not None:
@@ -158,7 +158,7 @@ async def __aexit__(self, exc_type, exc_value, exc_tb):
158158
self._task_group = None
159159
return await self._exit_stack.__aexit__(exc_type, exc_value, exc_tb)
160160

161-
async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
161+
async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED) -> None:
162162
"""Start the room.
163163
164164
Arguments:
@@ -178,15 +178,15 @@ async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
178178
self._starting = False
179179
task_status.started()
180180

181-
def stop(self):
181+
def stop(self) -> None:
182182
"""Stop the room."""
183183
if self._task_group is None:
184184
raise RuntimeError("YRoom not running")
185185

186186
self._task_group.cancel_scope.cancel()
187187
self._task_group = None
188188

189-
async def serve(self, websocket: Websocket):
189+
async def serve(self, websocket: Websocket) -> None:
190190
"""Serve a client.
191191
192192
Arguments:

packages/docprovider/src/yprovider.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,10 @@ export class WebSocketProvider implements IDocumentProvider {
136136
}
137137

138138
private _onConnectionClosed = (event: any): void => {
139-
if (event.code === 1003) {
140-
console.error('Document provider closed:', event.reason);
139+
if (event.code >= 1003 && event.code < 1006) {
140+
console.error('Document provider closed:', event.code, event.reason);
141141

142-
showErrorMessage(this._trans.__('Document session error'), event.reason, [
142+
showErrorMessage(this._trans.__('Document error'), event.reason, [
143143
Dialog.okButton()
144144
]);
145145

0 commit comments

Comments
 (0)