Skip to content

Commit b43b525

Browse files
authored
Adds restore and ovewrite messages (#170)
* Handle restore and ovewrite messages * Creates a dialog to answer messages * Pre-commit * Revert document_cleanup_delay
1 parent 5190a17 commit b43b525

File tree

7 files changed

+222
-20
lines changed

7 files changed

+222
-20
lines changed

jupyter_collaboration/handlers.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ async def recv(self):
209209
message = await self._message_queue.get()
210210
return message
211211

212-
def on_message(self, message):
212+
async def on_message(self, message):
213213
"""
214214
On message receive.
215215
"""
@@ -240,6 +240,9 @@ def on_message(self, message):
240240
)
241241
return skip
242242

243+
if message_type == MessageType.ROOM:
244+
await self.room.handle_msg(message[1:])
245+
243246
if message_type == MessageType.CHAT:
244247
msg = message[2:].decode("utf-8")
245248

@@ -316,7 +319,7 @@ async def _clean_room(self) -> None:
316319
file = self._file_loaders[file_id]
317320
if file.number_of_subscriptions == 0:
318321
self.log.info("Deleting file %s", file.path)
319-
del self._file_loaders[file_id]
322+
await self._file_loaders.remove(file_id)
320323
self._emit(LogLevel.INFO, "clean", "Loader deleted.")
321324

322325
def check_origin(self, origin):

jupyter_collaboration/loaders.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,26 @@ async def load_content(self, format: str, file_type: str, content: bool) -> dict
118118
)
119119

120120
async def save_content(self, model: dict[str, Any]) -> dict[str, Any]:
121+
"""
122+
Save the content of the file.
123+
124+
Parameters:
125+
model (dict): A dictionary with format, type, last_modified, and content of the file.
126+
127+
Returns:
128+
model (dict): A dictionary with the metadata and content of the file.
129+
"""
130+
async with self._lock:
131+
path = self.path
132+
if model["type"] not in {"directory", "file", "notebook"}:
133+
# fall back to file if unknown type, the content manager only knows
134+
# how to handle these types
135+
model["type"] = "file"
136+
137+
self._log.info("Saving file: %s", path)
138+
return await ensure_async(self._contents_manager.save(model, path))
139+
140+
async def maybe_save_content(self, model: dict[str, Any]) -> dict[str, Any]:
121141
"""
122142
Save the content of the file.
123143

jupyter_collaboration/rooms.py

Lines changed: 102 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,24 @@
44
from __future__ import annotations
55

66
import asyncio
7+
import uuid
78
from logging import Logger
89
from typing import Any
910

1011
from jupyter_events import EventLogger
1112
from jupyter_ydoc import ydocs as YDOCS
1213
from ypy_websocket.websocket_server import YRoom
1314
from ypy_websocket.ystore import BaseYStore, YDocNotFound
15+
from ypy_websocket.yutils import write_var_uint
1416

1517
from .loaders import FileLoader
16-
from .utils import JUPYTER_COLLABORATION_EVENTS_URI, LogLevel, OutOfBandChanges
18+
from .utils import (
19+
JUPYTER_COLLABORATION_EVENTS_URI,
20+
LogLevel,
21+
MessageType,
22+
OutOfBandChanges,
23+
RoomMessages,
24+
)
1725

1826
YFILE = YDOCS["file"]
1927

@@ -45,9 +53,11 @@ def __init__(
4553
self._save_delay = save_delay
4654

4755
self._update_lock = asyncio.Lock()
56+
self._outofband_lock = asyncio.Lock()
4857
self._initialization_lock = asyncio.Lock()
4958
self._cleaner: asyncio.Task | None = None
5059
self._saving_document: asyncio.Task | None = None
60+
self._messages: dict[str, asyncio.Lock] = {}
5161

5262
# Listen for document changes
5363
self._document.observe(self._on_document_change)
@@ -149,6 +159,41 @@ async def initialize(self) -> None:
149159
self.ready = True
150160
self._emit(LogLevel.INFO, "initialize", "Room initialized")
151161

162+
async def handle_msg(self, data: bytes) -> None:
163+
msg_type = data[0]
164+
msg_id = data[2:].decode()
165+
166+
# Use a lock to prevent handling responses from multiple clients
167+
# at the same time
168+
async with self._messages[msg_id]:
169+
# Check whether the previous client resolved the conflict
170+
if msg_id not in self._messages:
171+
return
172+
173+
try:
174+
ans = None
175+
if msg_type == RoomMessages.RELOAD:
176+
# Restore the room with the content from disk
177+
await self._load_document()
178+
ans = RoomMessages.DOC_OVERWRITTEN
179+
180+
elif msg_type == RoomMessages.OVERWRITE:
181+
# Overwrite the file with content from the room
182+
await self._save_document()
183+
ans = RoomMessages.FILE_OVERWRITTEN
184+
185+
if ans is not None:
186+
# Remove the lock and broadcast the resolution
187+
self._messages.pop(msg_id)
188+
data = msg_id.encode()
189+
self._outofband_lock.release()
190+
await self._broadcast_msg(
191+
bytes([MessageType.ROOM, ans]) + write_var_uint(len(data)) + data
192+
)
193+
194+
except Exception:
195+
return
196+
152197
def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None:
153198
data = {"level": level.value, "room": self._room_id, "path": self._file.path}
154199
if action:
@@ -187,24 +232,24 @@ async def _on_content_change(self, event: str, args: dict[str, Any]) -> None:
187232
event (str): Type of change.
188233
args (dict): A dictionary with format, type, last_modified.
189234
"""
235+
if self._outofband_lock.locked():
236+
return
237+
190238
if event == "metadata" and (
191239
self._last_modified is None or self._last_modified < args["last_modified"]
192240
):
193241
self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id)
194242
self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes. Overwriting the room.")
195243

196-
try:
197-
model = await self._file.load_content(self._file_format, self._file_type, True)
198-
except Exception as e:
199-
msg = f"Error loading content from file: {self._file.path}\n{e!r}"
200-
self.log.error(msg, exc_info=e)
201-
self._emit(LogLevel.ERROR, None, msg)
202-
return None
203-
204-
async with self._update_lock:
205-
self._document.source = model["content"]
206-
self._last_modified = model["last_modified"]
207-
self._document.dirty = False
244+
msg_id = str(uuid.uuid4())
245+
self._messages[msg_id] = asyncio.Lock()
246+
await self._outofband_lock.acquire()
247+
data = msg_id.encode()
248+
await self._broadcast_msg(
249+
bytes([MessageType.ROOM, RoomMessages.FILE_CHANGED])
250+
+ write_var_uint(len(data))
251+
+ data
252+
)
208253

209254
def _on_document_change(self, target: str, event: Any) -> None:
210255
"""
@@ -231,6 +276,45 @@ def _on_document_change(self, target: str, event: Any) -> None:
231276

232277
self._saving_document = asyncio.create_task(self._maybe_save_document())
233278

279+
async def _load_document(self) -> None:
280+
try:
281+
model = await self._file.load_content(self._file_format, self._file_type, True)
282+
except Exception as e:
283+
msg = f"Error loading content from file: {self._file.path}\n{e!r}"
284+
self.log.error(msg, exc_info=e)
285+
self._emit(LogLevel.ERROR, None, msg)
286+
return None
287+
288+
async with self._update_lock:
289+
self._document.source = model["content"]
290+
self._last_modified = model["last_modified"]
291+
self._document.dirty = False
292+
293+
async def _save_document(self) -> None:
294+
"""
295+
Saves the content of the document to disk.
296+
"""
297+
try:
298+
self.log.info("Saving the content from room %s", self._room_id)
299+
model = await self._file.save_content(
300+
{
301+
"format": self._file_format,
302+
"type": self._file_type,
303+
"last_modified": self._last_modified,
304+
"content": self._document.source,
305+
}
306+
)
307+
self._last_modified = model["last_modified"]
308+
async with self._update_lock:
309+
self._document.dirty = False
310+
311+
self._emit(LogLevel.INFO, "save", "Content saved.")
312+
313+
except Exception as e:
314+
msg = f"Error saving file: {self._file.path}\n{e!r}"
315+
self.log.error(msg, exc_info=e)
316+
self._emit(LogLevel.ERROR, None, msg)
317+
234318
async def _maybe_save_document(self) -> None:
235319
"""
236320
Saves the content of the document to disk.
@@ -248,7 +332,7 @@ async def _maybe_save_document(self) -> None:
248332

249333
try:
250334
self.log.info("Saving the content from room %s", self._room_id)
251-
model = await self._file.save_content(
335+
model = await self._file.maybe_save_content(
252336
{
253337
"format": self._file_format,
254338
"type": self._file_type,
@@ -284,6 +368,10 @@ async def _maybe_save_document(self) -> None:
284368
self.log.error(msg, exc_info=e)
285369
self._emit(LogLevel.ERROR, None, msg)
286370

371+
async def _broadcast_msg(self, msg: bytes) -> None:
372+
for client in self.clients:
373+
await client.send(msg)
374+
287375

288376
class TransientRoom(YRoom):
289377
"""A Y room for sharing state (e.g. awareness)."""

jupyter_collaboration/utils.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,18 @@
1212
class MessageType(IntEnum):
1313
SYNC = 0
1414
AWARENESS = 1
15+
ROOM = 124
1516
CHAT = 125
1617

1718

19+
class RoomMessages(IntEnum):
20+
RELOAD = 0
21+
OVERWRITE = 1
22+
FILE_CHANGED = 2
23+
FILE_OVERWRITTEN = 3
24+
DOC_OVERWRITTEN = 4
25+
26+
1827
class LogLevel(Enum):
1928
INFO = "INFO"
2029
DEBUG = "DEBUG"

packages/docprovider/src/awareness.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,9 @@ import * as decoding from 'lib0/decoding';
1414
import * as encoding from 'lib0/encoding';
1515
import { WebsocketProvider } from 'y-websocket';
1616

17+
import { MessageType } from './utils';
1718
import { IAwarenessProvider } from './tokens';
1819

19-
export enum MessageType {
20-
CHAT = 125
21-
}
22-
2320
export interface IContent {
2421
type: string;
2522
body: string;

packages/docprovider/src/utils.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/* -----------------------------------------------------------------------------
2+
| Copyright (c) Jupyter Development Team.
3+
| Distributed under the terms of the Modified BSD License.
4+
|----------------------------------------------------------------------------*/
5+
6+
export enum MessageType {
7+
ROOM = 124,
8+
CHAT = 125
9+
}
10+
11+
export enum RoomMessage {
12+
RELOAD = 0,
13+
OVERWRITE = 1,
14+
FILE_CHANGED = 2,
15+
FILE_OVERWRITTEN = 3,
16+
DOC_OVERWRITTEN = 4
17+
}

packages/docprovider/src/yprovider.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,13 @@ import { Signal } from '@lumino/signaling';
1313

1414
import { DocumentChange, YDocument } from '@jupyter/ydoc';
1515

16+
import * as decoding from 'lib0/decoding';
17+
import * as encoding from 'lib0/encoding';
1618
import { Awareness } from 'y-protocols/awareness';
1719
import { WebsocketProvider as YWebsocketProvider } from 'y-websocket';
1820

1921
import { requestDocSession } from './requests';
22+
import { MessageType, RoomMessage } from './utils';
2023

2124
/**
2225
* An interface for a document provider.
@@ -111,6 +114,18 @@ export class WebSocketProvider implements IDocumentProvider {
111114

112115
this._yWebsocketProvider.on('sync', this._onSync);
113116
this._yWebsocketProvider.on('connection-close', this._onConnectionClosed);
117+
118+
this._yWebsocketProvider.messageHandlers[MessageType.ROOM] = (
119+
encoder,
120+
decoder,
121+
provider,
122+
emitSynced,
123+
messageType
124+
) => {
125+
const msgType = decoding.readVarUint(decoder);
126+
const data = decoding.readVarString(decoder);
127+
this._handleRoomMessage(msgType, data);
128+
};
114129
}
115130

116131
private _onUserChanged(user: User.IManager): void {
@@ -138,6 +153,59 @@ export class WebSocketProvider implements IDocumentProvider {
138153
}
139154
};
140155

156+
private _handleRoomMessage(type: number, data: string): void {
157+
switch (type) {
158+
case RoomMessage.FILE_CHANGED:
159+
this._handleFileChanged(data);
160+
break;
161+
162+
case RoomMessage.DOC_OVERWRITTEN:
163+
case RoomMessage.FILE_OVERWRITTEN:
164+
if (this._dialog) {
165+
this._dialog.close();
166+
this._dialog = null;
167+
}
168+
break;
169+
}
170+
}
171+
172+
private _handleFileChanged(data: string): void {
173+
this._dialog = new Dialog({
174+
title: this._trans.__('File changed'),
175+
body: this._trans.__('Do you want to overwrite the file or reload it?'),
176+
buttons: [
177+
Dialog.okButton({ label: 'Reload' }),
178+
Dialog.warnButton({ label: 'Overwrite' })
179+
],
180+
hasClose: false
181+
});
182+
183+
this._dialog.launch().then(resp => {
184+
if (resp.button.label === 'Reload') {
185+
this._sendReloadMsg(data);
186+
} else if (resp.button.label === 'Overwrite') {
187+
this._sendOverwriteMsg(data);
188+
}
189+
});
190+
}
191+
192+
private _sendReloadMsg(data: string): void {
193+
const encoder = encoding.createEncoder();
194+
encoding.writeVarUint(encoder, MessageType.ROOM);
195+
encoding.writeVarUint(encoder, RoomMessage.RELOAD);
196+
encoding.writeVarString(encoder, data);
197+
this._yWebsocketProvider?.ws!.send(encoding.toUint8Array(encoder));
198+
}
199+
200+
private _sendOverwriteMsg(data: string): void {
201+
const encoder = encoding.createEncoder();
202+
encoding.writeVarUint(encoder, MessageType.ROOM);
203+
encoding.writeVarUint(encoder, RoomMessage.OVERWRITE);
204+
encoding.writeVarString(encoder, data);
205+
this._yWebsocketProvider?.ws!.send(encoding.toUint8Array(encoder));
206+
}
207+
208+
private _dialog: Dialog<any> | null = null;
141209
private _awareness: Awareness;
142210
private _contentType: string;
143211
private _format: string;

0 commit comments

Comments
 (0)