Skip to content

Commit b1ef11e

Browse files
committed
Working prototype with Multi-process support for collab endpoint
Depends on y-crdt/pycrdt-websocket#128 Signed-off-by: Alexis Jeandet <alexis.jeandet@member.fsf.org>
1 parent 619c318 commit b1ef11e

File tree

7 files changed

+97
-7
lines changed

7 files changed

+97
-7
lines changed

docker/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ ENV SPEASY_CACHE_PATH=/data \
1212
SPEASY_PROXY_SERVER_URL="" \
1313
SPEASY_PROXY_LOG_LEVEL="WARN"\
1414
SPEASY_CORE_HTTP_REWRITE_RULES=""\
15+
WITH_COLLAB="true" \
1516
PORT=$PORT\
1617
SPEASY_PROXY_LOG_CONFIG_FILE="/home/speasy/speasy_proxy/logging.yaml"
1718

docker/entry_point.sh

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,19 @@
11
#!/usr/bin/env bash
22

33
#~/.local/bin/gunicorn speasy_proxy:app -w 32 -k uvicorn.workers.UvicornWorker --ws-max-queue=128 --backlog=32 --limit-max-requests=1024 --root-path=$SPEASY_PROXY_PREFIX --host="0.0.0.0" --port=$PORT --proxy-headers --limit-concurrency=10000 --log-config=/home/speasy/speasy_proxy/logging.yaml
4-
~/.local/bin/gunicorn speasy_proxy:app --timeout 600 --max-requests 10000 --backlog 2048 --worker-connections 1000 -w 32 --threads=8 -k speasy_proxy.UvicornWorker.SpeasyUvicornWorker
4+
if [ "$SPEASY_PROXY_COLLAB_ENDPOINT_ENABLE" = "true" ]; then
5+
echo "SPEASY_PROXY_COLLAB_ENDPOINT_ENABLE= is set, starting collab server"
6+
python3 -m speasy_proxy.collab_server &
7+
COLLAB_PID=$!
8+
fi
9+
10+
PYTHON_PATH=$(dirname "$(which python3)")
11+
12+
"$PYTHON_PATH"/gunicorn speasy_proxy:app --timeout 600 --max-requests 10000 --backlog 2048 --worker-connections 1000 -w 32 --threads=8 -k speasy_proxy.UvicornWorker.SpeasyUvicornWorker
13+
14+
if [ "$SPEASY_PROXY_COLLAB_ENDPOINT_ENABLE" = "true" ]; then
15+
echo "killing collab server with PID $COLLAB_PID"
16+
kill $COLLAB_PID
17+
wait $COLLAB_PID
18+
echo "collab server with PID $COLLAB_PID has been killed"
19+
fi

pyproject.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ classifiers = [
3131
"Programming Language :: Python :: 3.11",
3232
"Programming Language :: Python :: 3.12",
3333
]
34-
dependencies = ['apscheduler < 4.0.0', 'humanize', 'speasy>=1.5.0', 'pycdfpp>=0.7.3', 'diskcache', 'jinja2', 'pyzstd', 'bokeh>=3.0.0', 'matplotlib', 'fastapi[all]', 'uvicorn', 'pycrdt-websocket >=0.15.4,<0.16.0']
34+
dependencies = ['apscheduler < 4.0.0', 'humanize', 'speasy>=1.5.0', 'pycdfpp>=0.7.3', 'diskcache', 'jinja2', 'pyzstd', 'bokeh>=3.0.0', 'matplotlib', 'fastapi[all]', 'uvicorn']
35+
[project.optional-dependencies]
36+
collab = ['pycrdt-websocket >=0.15.4,<0.16.0', 'hypercorn', 'httpx_ws']
37+
3538
[project.urls]
3639
homepage = "https://github.com/SciQLop/speasy_proxy"
3740

speasy_proxy/UvicornWorker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from uvicorn.workers import UvicornWorker
1+
from uvicorn_worker import UvicornWorker
22
import os
33

44

speasy_proxy/api/v1/ws_collaboration.py

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,63 @@
55

66
if collab_endpoint.enable():
77
from asyncio import create_task
8+
from anyio import get_cancelled_exc_class, Lock
9+
from contextlib import asynccontextmanager
810
from fastapi import WebSocket
9-
from pycrdt_websocket import WebsocketServer
10-
from pycrdt_websocket.websocket import HttpxWebsocket
11+
from httpx_ws import aconnect_ws
12+
from pycrdt import Array, Doc, Provider
13+
from pycrdt.websocket import WebsocketServer
14+
from pycrdt.websocket.websocket import HttpxWebsocket
1115
from .routes import router
1216

1317

18+
class Websocket:
19+
def __init__(self, websocket, path: str):
20+
self._websocket = websocket
21+
self._path = path
22+
self._send_lock = Lock()
23+
24+
@property
25+
def path(self) -> str:
26+
return self._path
27+
28+
def __aiter__(self):
29+
return self
30+
31+
async def __anext__(self) -> bytes:
32+
try:
33+
message = await self.recv()
34+
except Exception:
35+
raise StopAsyncIteration()
36+
return message
37+
38+
async def send(self, message: bytes):
39+
async with self._send_lock:
40+
await self._websocket.send_bytes(message)
41+
42+
async def recv(self) -> bytes:
43+
b = await self._websocket.receive_bytes()
44+
return bytes(b)
45+
46+
47+
@asynccontextmanager
48+
async def aprovider_factory(port, room_name, ydoc=None, log=None):
49+
ydoc = Doc() if ydoc is None else ydoc
50+
server_websocket = None
51+
connect = aconnect_ws(f"http://localhost:{port}/{room_name}")
52+
try:
53+
async with connect as websocket:
54+
websocket_provider = Provider(ydoc, Websocket(websocket, room_name), log)
55+
async with websocket_provider as websocket_provider:
56+
yield ydoc, server_websocket
57+
except get_cancelled_exc_class():
58+
pass
59+
60+
61+
def provider_factory(path, doc, log):
62+
return aprovider_factory(collab_endpoint.port(), path, ydoc=doc, log=log)
63+
64+
1465
@router.websocket("/collaboration/{path:path}")
1566
async def websocket_endpoint(path: str, websocket: WebSocket):
1667
await websocket.accept()
@@ -21,7 +72,7 @@ async def websocket_endpoint(path: str, websocket: WebSocket):
2172
async def get_websocket_server():
2273
global WEBSOCKET_SERVER
2374
if WEBSOCKET_SERVER is None:
24-
WEBSOCKET_SERVER = WebsocketServer()
75+
WEBSOCKET_SERVER = WebsocketServer(provider_factory=provider_factory)
2576
create_task(WEBSOCKET_SERVER.start())
2677
await WEBSOCKET_SERVER.started.wait()
2778
return WEBSOCKET_SERVER

speasy_proxy/config/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,6 @@
44
collab_endpoint = ConfigSection("PROXY_COLLAB_ENDPOINT",
55
enable={"default": False,
66
"type_ctor": lambda x: {'true': True,
7-
'false': False}.get(x.lower(), False)})
7+
'false': False}.get(x.lower(), False)},
8+
port ={"default": 9999, "type_ctor": int},
9+
)
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#!/usr/bin/env python3
2+
import asyncio
3+
from hypercorn import Config
4+
from hypercorn.asyncio import serve
5+
from pycrdt.websocket import ASGIServer, WebsocketServer
6+
from speasy_proxy.config import collab_endpoint
7+
import os
8+
9+
10+
async def main():
11+
websocket_server = WebsocketServer()
12+
app = ASGIServer(websocket_server)
13+
config = Config()
14+
config.bind = [f"localhost:{collab_endpoint.port()}"]
15+
async with websocket_server:
16+
await serve(app, config, mode="asgi")
17+
18+
asyncio.run(main())

0 commit comments

Comments
 (0)