Skip to content

Commit 0dc369b

Browse files
authored
Improves document sessions (#204)
* Improves document sessions * Fix CI * Pre-commit
1 parent ee4d81c commit 0dc369b

File tree

11 files changed

+136
-73
lines changed

11 files changed

+136
-73
lines changed

jupyter_collaboration/app.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,14 @@ def initialize_handlers(self):
107107
"room_manager": self.room_manager,
108108
},
109109
),
110-
(r"/api/collaboration/session/(.*)", DocSessionHandler),
110+
(
111+
r"/api/collaboration/session/(.*)",
112+
DocSessionHandler,
113+
{
114+
"store": self.store,
115+
"room_manager": self.room_manager,
116+
},
117+
),
111118
]
112119
)
113120

jupyter_collaboration/handlers.py

Lines changed: 67 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import asyncio
77
import json
88
import time
9-
import uuid
109
from typing import Any
1110

1211
from jupyter_server.auth import authorized
@@ -28,8 +27,6 @@
2827

2928
YFILE = YDOCS["file"]
3029

31-
SERVER_SESSION = str(uuid.uuid4())
32-
3330

3431
class YDocWebSocketHandler(WebSocketHandler, JupyterHandler):
3532
"""`YDocWebSocketHandler` uses the singleton pattern for ``WebsocketServer``,
@@ -62,6 +59,7 @@ def initialize(
6259
room_manager: RoomManager,
6360
document_cleanup_delay: float | None = 60.0,
6461
) -> None:
62+
super().initialize()
6563
# File ID manager cannot be passed as argument as the extension may load after this one
6664
self._file_id_manager = self.settings["file_id_manager"]
6765

@@ -124,13 +122,6 @@ async def open(self, room_id):
124122
"""
125123
self._room_id = self.request.path.split("/")[-1]
126124

127-
# Close the connection if the document session expired
128-
session_id = self.get_query_argument("sessionId", None)
129-
if session_id and SERVER_SESSION != session_id:
130-
self.close(
131-
1003,
132-
f"Document session {session_id} expired. You need to reload this browser tab.",
133-
)
134125
try:
135126
# Get room
136127
self.room = await self._room_manager.get_room(self._room_id)
@@ -154,10 +145,22 @@ async def open(self, room_id):
154145
self._serve_task.cancel()
155146
await self._room_manager.remove_room(self._room_id)
156147

157-
self._emit(LogLevel.INFO, "initialize", "New client connected.")
148+
return
149+
150+
# Close the connection if the document session expired
151+
session_id = self.get_query_argument("sessionId", None)
152+
if session_id and session_id != self.room.session_id:
153+
self.log.error(
154+
f"Client tried to connect to {self._room_id} with an expired session ID {session_id}."
155+
)
156+
self.close(
157+
1003,
158+
f"Document session {session_id} expired. You need to reload this browser tab.",
159+
)
158160

159161
# Start processing messages in the room
160162
self._serve_task = asyncio.create_task(self.room.serve(self))
163+
self._emit(LogLevel.INFO, "initialize", "New client connected.")
161164

162165
async def send(self, message):
163166
"""
@@ -207,14 +210,20 @@ def on_close(self) -> None:
207210
if self._serve_task is not None and not self._serve_task.cancelled():
208211
self._serve_task.cancel()
209212

210-
if self.room is not None and self.room.clients == [self]:
211-
# no client in this room after we disconnect
212-
# Remove the room with a delay in case someone reconnects
213-
IOLoop.current().add_callback(
214-
self._room_manager.remove_room, self._room_id, self._cleanup_delay
215-
)
213+
if self.room is not None:
214+
# Remove it self from the list of clients
215+
self.room.clients.remove(self)
216+
if len(self.room.clients) == 0:
217+
# no client in this room after we disconnect
218+
# Remove the room with a delay in case someone reconnects
219+
IOLoop.current().add_callback(
220+
self._room_manager.remove_room, self._room_id, self._cleanup_delay
221+
)
216222

217223
def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None:
224+
if self._room_id.count(":") < 2:
225+
return
226+
218227
_, _, file_id = decode_file_path(self._room_id)
219228
path = self._file_id_manager.get_path(file_id)
220229

@@ -240,6 +249,22 @@ class DocSessionHandler(APIHandler):
240249

241250
auth_resource = "contents"
242251

252+
def initialize(self, store: BaseYStore, room_manager: RoomManager) -> None:
253+
super().initialize()
254+
self._store = store
255+
self._room_manager = room_manager
256+
257+
async def prepare(self):
258+
# NOTE: Initialize in the ExtensionApp.start_extension once
259+
# https://github.com/jupyter-server/jupyter_server/issues/1329
260+
# is done.
261+
# We are temporarily initializing the store here because the
262+
# initialization is async
263+
if not self._store.initialized:
264+
await self._store.initialize()
265+
266+
return await super().prepare()
267+
243268
@web.authenticated
244269
@authorized
245270
async def put(self, path):
@@ -251,26 +276,35 @@ async def put(self, path):
251276
content_type = body["type"]
252277
file_id_manager = self.settings["file_id_manager"]
253278

279+
status = 200
254280
idx = file_id_manager.get_id(path)
255-
if idx is not None:
256-
# index already exists
257-
self.log.info("Request for Y document '%s' with room ID: %s", path, idx)
258-
data = json.dumps(
259-
{"format": format, "type": content_type, "fileId": idx, "sessionId": SERVER_SESSION}
260-
)
261-
self.set_status(200)
262-
return self.finish(data)
263-
264-
# try indexing
265-
idx = file_id_manager.index(path)
266281
if idx is None:
267-
# file does not exists
268-
raise web.HTTPError(404, f"File {path!r} does not exist")
282+
# try indexing
283+
status = 201
284+
idx = file_id_manager.index(path)
285+
if idx is None:
286+
# file does not exists
287+
raise web.HTTPError(404, f"File {path!r} does not exist")
288+
289+
session_id = await self._get_session_id(f"{format}:{content_type}:{idx}")
269290

270-
# index successfully created
271291
self.log.info("Request for Y document '%s' with room ID: %s", path, idx)
272292
data = json.dumps(
273-
{"format": format, "type": content_type, "fileId": idx, "sessionId": SERVER_SESSION}
293+
{"format": format, "type": content_type, "fileId": idx, "sessionId": session_id}
274294
)
275-
self.set_status(201)
295+
self.set_status(status)
276296
return self.finish(data)
297+
298+
async def _get_session_id(self, room_id: str) -> str | None:
299+
# If the room exists and it is ready, return the session_id from the room.
300+
if self._room_manager.has_room(room_id):
301+
room = await self._room_manager.get_room(room_id)
302+
if room.ready:
303+
return room.session_id
304+
305+
if await self._store.exists(room_id):
306+
doc = await self._store.get(room_id)
307+
if doc is not None and "session_id" in doc:
308+
return doc["session_id"]
309+
310+
return None

jupyter_collaboration/rooms/base.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from __future__ import annotations
55

66
import asyncio
7+
import uuid
78
from logging import Logger
89

910
from ypy_websocket.websocket_server import YRoom
@@ -15,6 +16,7 @@ class BaseRoom(YRoom):
1516
def __init__(self, room_id: str, store: BaseYStore | None = None, log: Logger | None = None):
1617
super().__init__(ready=False, ystore=store, log=log)
1718
self._room_id = room_id
19+
self._session_id: str = str(uuid.uuid4())
1820

1921
@property
2022
def room_id(self) -> str:
@@ -23,6 +25,18 @@ def room_id(self) -> str:
2325
"""
2426
return self._room_id
2527

28+
@property
29+
def session_id(self) -> str:
30+
"""
31+
A unique identifier for the updates.
32+
33+
NOTE: The session id, is a unique identifier for the updates
34+
that compose the Y document. If this document is destroyed, every
35+
client connected must replace its content with new updates otherwise
36+
once we initialize a new Y document, the content will be duplicated.
37+
"""
38+
return self._session_id
39+
2640
async def initialize(self) -> None:
2741
return
2842

jupyter_collaboration/rooms/document.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ async def initialize(self) -> None:
8787
# try to apply Y updates from the YStore for this document
8888
if self.ystore is not None and await self.ystore.exists(self._room_id):
8989
# Load the content from the store
90+
doc = await self.ystore.get(self._room_id)
91+
self._session_id = doc["session_id"]
9092
await self.ystore.apply_updates(self._room_id, self.ydoc)
9193
self._emit(
9294
LogLevel.INFO,
@@ -114,14 +116,6 @@ async def initialize(self) -> None:
114116

115117
# Update the content
116118
self._document.source = model["content"]
117-
118-
doc = await self.ystore.get(self._room_id)
119-
await self.ystore.remove(self._room_id)
120-
version = 0
121-
if "version" in doc:
122-
version = doc["version"] + 1
123-
124-
await self.ystore.create(self._room_id, version)
125119
await self.ystore.encode_state_as_update(self._room_id, self.ydoc)
126120

127121
else:
@@ -132,7 +126,7 @@ async def initialize(self) -> None:
132126
self._document.source = model["content"]
133127

134128
if self.ystore is not None:
135-
await self.ystore.create(self._room_id, 0)
129+
await self.ystore.create(self._room_id, self.session_id)
136130
await self.ystore.encode_state_as_update(self._room_id, self.ydoc)
137131

138132
self._last_modified = model["last_modified"]

jupyter_collaboration/rooms/manager.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -150,18 +150,23 @@ async def _clean_up_room(self, room_id: str, delay: float) -> None:
150150
self._emit(room_id, LogLevel.INFO, "clean", "Room deleted.")
151151

152152
# Clean the file loader if there are not rooms using it
153-
_, _, file_id = decode_file_path(room_id)
154-
file = self._file_loaders[file_id]
155-
if file.number_of_subscriptions == 0:
156-
await self._file_loaders.remove(file_id)
157-
self.log.info("Loader %s deleted", file.path)
158-
self._emit(room_id, LogLevel.INFO, "clean", "Loader deleted.")
153+
if room_id.count(":") >= 2:
154+
_, _, file_id = decode_file_path(room_id)
155+
file = self._file_loaders[file_id]
156+
if file.number_of_subscriptions == 0:
157+
await self._file_loaders.remove(file_id)
158+
self.log.info("Loader %s deleted", file.path)
159+
self._emit(room_id, LogLevel.INFO, "clean", "Loader deleted.")
159160

160-
del self._clean_up_tasks[room_id]
161+
if room_id in self._clean_up_tasks:
162+
del self._clean_up_tasks[room_id]
161163

162164
def _emit(
163165
self, room_id: str, level: LogLevel, action: str | None = None, msg: str | None = None
164166
) -> None:
167+
if room_id.count(":") < 2:
168+
return
169+
165170
_, _, file_id = decode_file_path(room_id)
166171
path = self._file_loaders.file_id_manager.get_path(file_id)
167172

jupyter_collaboration/stores/base_store.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,13 @@ async def get(self, path: str, updates: bool = False) -> dict | None:
7272
...
7373

7474
@abstractmethod
75-
async def create(self, path: str, version: int) -> None:
75+
async def create(self, path: str, session_id: str) -> None:
7676
"""
7777
Creates a new document.
7878
7979
Arguments:
8080
path: The document name/path.
81-
version: Document version.
81+
session_id: A unique identifier for the updates.
8282
"""
8383
...
8484

jupyter_collaboration/stores/file_store.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,27 +121,27 @@ async def get(self, path: str, updates: bool = False) -> dict | None:
121121
if not await anyio.Path(file_path).exists():
122122
return None
123123
else:
124-
version = None
124+
session_id = None
125125
async with await anyio.open_file(file_path, "rb") as f:
126126
header = await f.read(8)
127127
if header == b"VERSION:":
128-
version = int(await f.readline())
128+
session_id = str(await f.readline())
129129

130130
list_updates: list[tuple[bytes, bytes, float]] = []
131131
if updates:
132132
data = await f.read()
133133
async for update, metadata, timestamp in self._decode_data(data):
134134
list_updates.append((update, metadata, timestamp))
135135

136-
return dict(path=path, version=version, updates=list_updates)
136+
return dict(path=path, session_id=session_id, updates=list_updates)
137137

138-
async def create(self, path: str, version: int) -> None:
138+
async def create(self, path: str, session_id: str) -> None:
139139
"""
140140
Creates a new document.
141141
142142
Arguments:
143143
path: The document name/path.
144-
version: Document version.
144+
session_id: A unique identifier for the updates.
145145
"""
146146
if self._initialized is None:
147147
raise Exception("The store was not initialized.")
@@ -154,7 +154,7 @@ async def create(self, path: str, version: int) -> None:
154154
else:
155155
await anyio.Path(file_path.parent).mkdir(parents=True, exist_ok=True)
156156
async with await anyio.open_file(file_path, "wb") as f:
157-
version_bytes = f"VERSION:{version}\n".encode()
157+
version_bytes = f"VERSION:{session_id}\n".encode()
158158
await f.write(version_bytes)
159159

160160
async def remove(self, path: str) -> None:

0 commit comments

Comments
 (0)