Skip to content

Commit 1773ef7

Browse files
committed
Support YRoom forking
1 parent fc46172 commit 1773ef7

File tree

5 files changed

+155
-10
lines changed

5 files changed

+155
-10
lines changed

jupyverse_api/jupyverse_api/yjs/__init__.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ async def create_roomid(
3535
):
3636
return await self.create_roomid(path, request, response, user)
3737

38+
@router.put("/api/collaboration/room/{roomid}", status_code=201)
39+
async def fork_room(
40+
roomid,
41+
user: User = Depends(auth.current_user(permissions={"contents": ["read"]})),
42+
):
43+
return await self.fork_room(roomid, user)
44+
3845
self.include_router(router)
3946

4047
@abstractmethod
@@ -55,6 +62,14 @@ async def create_roomid(
5562
):
5663
...
5764

65+
@abstractmethod
66+
async def fork_room(
67+
self,
68+
roomid,
69+
user: User,
70+
):
71+
...
72+
5873
@abstractmethod
5974
def get_document(
6075
self,

plugins/yjs/fps_yjs/routes.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,26 @@ async def create_roomid(
9595
res["fileId"] = idx
9696
return res
9797

98+
async def fork_room(
99+
self,
100+
roomid: str,
101+
user: User,
102+
):
103+
idx = uuid4().hex
104+
105+
root_room = await self.room_manager.websocket_server.get_room(roomid)
106+
update = root_room.ydoc.get_update()
107+
new_ydoc = Doc()
108+
new_ydoc.apply_update(update)
109+
new_room = await self.room_manager.websocket_server.get_room(idx, new_ydoc)
110+
root_room.local_clients.add(new_room)
111+
112+
res = {
113+
"sessionId": SERVER_SESSION,
114+
"roomId": idx,
115+
}
116+
return res
117+
98118
def get_document(self, document_id: str) -> YBaseDoc:
99119
return self.room_manager.documents[document_id]
100120

@@ -223,7 +243,7 @@ async def serve(self, websocket: YWebsocket, permissions) -> None:
223243
await self.websocket_server.started.wait()
224244
await self.websocket_server.serve(websocket)
225245

226-
if is_stored_document and not room.clients:
246+
if is_stored_document and not room.remote_clients:
227247
# no client in this room after we disconnect
228248
self.cleaners[room] = asyncio.create_task(self.maybe_clean_room(room, websocket.path))
229249

plugins/yjs/fps_yjs/ywebsocket/websocket_server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ async def _serve(self, websocket: Websocket, tg: TaskGroup):
151151
await self.start_room(room)
152152
await room.serve(websocket)
153153

154-
if self.auto_clean_rooms and not room.clients:
154+
if self.auto_clean_rooms and not room.remote_clients:
155155
self.delete_room(room=room)
156156
tg.cancel_scope.cancel()
157157

plugins/yjs/fps_yjs/ywebsocket/yroom.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@
2929

3030

3131
class YRoom:
32-
clients: list
32+
remote_clients: set
33+
local_clients: set[YRoom]
3334
ydoc: Doc
3435
ystore: BaseYStore | None
3536
_on_message: Callable[[bytes], Awaitable[bool] | bool] | None
@@ -76,7 +77,8 @@ def __init__(
7677
self.ready = ready
7778
self.ystore = ystore
7879
self.log = log or getLogger(__name__)
79-
self.clients = []
80+
self.remote_clients = set()
81+
self.local_clients = set()
8082
self._on_message = None
8183
self._started = None
8284
self._starting = False
@@ -133,10 +135,15 @@ async def _broadcast_updates(self):
133135
return
134136
# broadcast internal ydoc's update to all clients, that includes changes from the
135137
# clients and changes from the backend (out-of-band changes)
136-
for client in self.clients:
137-
self.log.debug("Sending Y update to client with endpoint: %s", client.path)
138+
for client in self.local_clients:
139+
client.ydoc.apply_update(update)
140+
if self.remote_clients:
138141
message = create_update_message(update)
139-
self._task_group.start_soon(client.send, message)
142+
for client in self.remote_clients:
143+
self.log.debug(
144+
"Sending Y update to remote client with endpoint: %s", client.path
145+
)
146+
self._task_group.start_soon(client.send, message)
140147
if self.ystore:
141148
self.log.debug("Writing Y update to YStore")
142149
self._task_group.start_soon(self.ystore.write, update)
@@ -197,7 +204,7 @@ async def serve(self, websocket: Websocket):
197204
websocket: The WebSocket through which to serve the client.
198205
"""
199206
async with create_task_group() as tg:
200-
self.clients.append(websocket)
207+
self.remote_clients.add(websocket)
201208
await sync(self.ydoc, websocket, self.log)
202209
try:
203210
async for message in websocket:
@@ -224,7 +231,7 @@ async def serve(self, websocket: Websocket):
224231
YMessageType.AWARENESS.name,
225232
websocket.path,
226233
)
227-
for client in self.clients:
234+
for client in self.remote_clients:
228235
self.log.debug(
229236
"Sending Y awareness from client with endpoint "
230237
"%s to client with endpoint: %s",
@@ -236,4 +243,4 @@ async def serve(self, websocket: Websocket):
236243
self.log.debug("Error serving endpoint: %s", websocket.path, exc_info=e)
237244

238245
# remove this client
239-
self.clients = [c for c in self.clients if c != websocket]
246+
self.remote_clients.remove(websocket)

tests/test_yjs.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import os
2+
from asyncio import sleep
3+
from pathlib import Path
4+
5+
import pytest
6+
from asphalt.core import Context
7+
from fps_yjs.ywebsocket import WebsocketProvider
8+
from httpx import AsyncClient
9+
from httpx_ws import aconnect_ws
10+
from pycrdt import Doc, Text
11+
12+
from jupyverse_api.main import JupyverseComponent
13+
from jupyverse_api.yjs.models import CreateDocumentSession
14+
15+
16+
@pytest.mark.asyncio
17+
async def test_fork_room(tmp_path, unused_tcp_port):
18+
prev_dir = os.getcwd()
19+
os.chdir(tmp_path)
20+
21+
path = Path("foo.txt")
22+
path.write_text("Hello")
23+
24+
components = {
25+
"app": {"type": "app"},
26+
"auth": {"type": "auth", "test": True, "mode": "noauth"},
27+
"contents": {"type": "contents"},
28+
"frontend": {"type": "frontend"},
29+
"yjs": {"type": "yjs"},
30+
}
31+
async with Context() as ctx, AsyncClient() as http:
32+
await JupyverseComponent(
33+
components=components,
34+
port=unused_tcp_port,
35+
).start(ctx)
36+
37+
create_document_session = CreateDocumentSession(format="text", type="file")
38+
response = await http.put(
39+
f"http://127.0.0.1:{unused_tcp_port}/api/collaboration/session/{path}",
40+
json=create_document_session.model_dump(),
41+
)
42+
r = response.json()
43+
file_id = r["fileId"]
44+
45+
# connect to root room
46+
async with aconnect_ws(
47+
f"http://127.0.0.1:{unused_tcp_port}/api/collaboration/room/text:file:{file_id}"
48+
) as root_ws:
49+
# create a root room client
50+
root_ydoc = Doc()
51+
root_ydoc["source"] = root_ytext = Text()
52+
async with WebsocketProvider(root_ydoc, Websocket(root_ws, file_id)):
53+
await sleep(0.1)
54+
assert str(root_ytext) == "Hello"
55+
# fork room
56+
response = await http.put(
57+
f"http://127.0.0.1:{unused_tcp_port}/api/collaboration/room/text:file:{file_id}"
58+
)
59+
r = response.json()
60+
fork_room_id = r["roomId"]
61+
# connect to forked room
62+
async with aconnect_ws(
63+
f"http://127.0.0.1:{unused_tcp_port}/api/collaboration/room/{fork_room_id}"
64+
) as fork_ws:
65+
# create a forked room client
66+
fork_ydoc = Doc()
67+
fork_ydoc["source"] = fork_ytext = Text()
68+
async with WebsocketProvider(fork_ydoc, Websocket(fork_ws, fork_room_id)):
69+
await sleep(0.1)
70+
assert str(fork_ytext) == "Hello"
71+
root_ytext += ", World!"
72+
await sleep(0.1)
73+
assert str(root_ytext) == "Hello, World!"
74+
assert str(fork_ytext) == "Hello, World!"
75+
76+
os.chdir(prev_dir)
77+
78+
79+
class Websocket:
80+
def __init__(self, websocket, roomid: str):
81+
self.websocket = websocket
82+
self.roomid = roomid
83+
84+
@property
85+
def path(self) -> str:
86+
return self.roomid
87+
88+
def __aiter__(self):
89+
return self
90+
91+
async def __anext__(self) -> bytes:
92+
try:
93+
message = await self.recv()
94+
except BaseException:
95+
raise StopAsyncIteration()
96+
return message
97+
98+
async def send(self, message: bytes):
99+
await self.websocket.send_bytes(message)
100+
101+
async def recv(self) -> bytes:
102+
b = await self.websocket.receive_bytes()
103+
return bytes(b)

0 commit comments

Comments
 (0)