diff --git a/docs/reference/Django_Channels.md b/docs/reference/Django_Channels.md new file mode 100644 index 0000000..66131d1 --- /dev/null +++ b/docs/reference/Django_Channels.md @@ -0,0 +1,11 @@ +## Consumer + +::: ypy_websocket.django_channels.yjs_consumer.YjsConsumer + +## Storage + +### BaseYRoomStorage +::: ypy_websocket.django_channels.yroom_storage.BaseYRoomStorage + +### RedisYRoomStorage +::: ypy_websocket.django_channels.yroom_storage.RedisYRoomStorage diff --git a/docs/reference/Django_Channels_consumer.md b/docs/reference/Django_Channels_consumer.md deleted file mode 100644 index 0eef186..0000000 --- a/docs/reference/Django_Channels_consumer.md +++ /dev/null @@ -1 +0,0 @@ -::: ypy_websocket.django_channels_consumer.YjsConsumer diff --git a/mkdocs.yml b/mkdocs.yml index 01c586a..d53d1b9 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -37,7 +37,7 @@ nav: - reference/WebSocket_provider.md - reference/WebSocket_server.md - reference/ASGI_server.md - - reference/Django_Channels_consumer.md + - reference/Django_Channels.md - reference/WebSocket.md - reference/Room.md - reference/Store.md diff --git a/pyproject.toml b/pyproject.toml index 68e568f..4ccb4db 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,9 @@ docs = [ django = [ "channels", ] +redis = [ + "redis", +] [project.urls] Homepage = "https://github.com/y-crdt/ypy-websocket" diff --git a/ypy_websocket/django_channels/__init__.py b/ypy_websocket/django_channels/__init__.py new file mode 100644 index 0000000..20aa50e --- /dev/null +++ b/ypy_websocket/django_channels/__init__.py @@ -0,0 +1,2 @@ +from .yjs_consumer import YjsConsumer +from .yroom_storage import BaseYRoomStorage, RedisYRoomStorage diff --git a/ypy_websocket/django_channels_consumer.py b/ypy_websocket/django_channels/yjs_consumer.py similarity index 65% rename from ypy_websocket/django_channels_consumer.py rename to ypy_websocket/django_channels/yjs_consumer.py index bf0daa2..8367463 100644 --- a/ypy_websocket/django_channels_consumer.py +++ b/ypy_websocket/django_channels/yjs_consumer.py @@ -4,10 +4,19 @@ from typing import TypedDict import y_py as Y -from channels.generic.websocket import AsyncWebsocketConsumer # type: ignore +from channels.generic.websocket import AsyncWebsocketConsumer -from .websocket import Websocket -from .yutils import YMessageType, process_sync_message, sync +from ypy_websocket.django_channels.yroom_storage import BaseYRoomStorage + +from ..websocket import Websocket +from ..yutils import ( + EMPTY_UPDATE, + YMessageType, + YSyncMessageType, + process_sync_message, + read_message, + sync, +) logger = getLogger(__name__) @@ -70,14 +79,17 @@ class YjsConsumer(AsyncWebsocketConsumer): In particular, - Override `make_room_name` to customize the room name. - - Override `make_ydoc` to initialize the YDoc. This is useful to initialize it with data - from your database, or to add observers to it). + - Override `make_room_storage` to initialize the room storage. Create your own storage class + by subclassing `BaseYRoomStorage` and implementing the methods. - Override `connect` to do custom validation (like auth) on connect, but be sure to call `await super().connect()` in the end. - Call `group_send_message` to send a message to an entire group/room. - Call `send_message` to send a message to a single client, although this is not recommended. - A full example of a custom consumer showcasing all of these options is: + A full example of a custom consumer showcasing all of these options is below. The example also + includes an example function `propagate_document_update_from_external` that demonstrates how to + send a message to all connected clients from an external source (like a Celery job). + ```py import y_py as Y from asgiref.sync import async_to_sync @@ -87,45 +99,57 @@ class YjsConsumer(AsyncWebsocketConsumer): class DocConsumer(YjsConsumer): + def make_room_storage(self) -> BaseYRoomStorage: + # Modify the room storage here + + return RedisYRoomStorage(self.room_name) + def make_room_name(self) -> str: - # modify the room name here - return self.scope["url_route"]["kwargs"]["room"] + # Modify the room name here - async def make_ydoc(self) -> Y.YDoc: - doc = Y.YDoc() - # fill doc with data from DB here - doc.observe_after_transaction(self.on_update_event) - return doc + return self.scope["url_route"]["kwargs"]["room"] async def connect(self): user = self.scope["user"] + if user is None or user.is_anonymous: await self.close() return - await super().connect() - def on_update_event(self, event): - # process event here - ... + await super().connect() - async def doc_update(self, update_wrapper): + async def propagate_document_update(self, update_wrapper): update = update_wrapper["update"] - Y.apply_update(self.ydoc, update) - await self.group_send_message(create_update_message(update)) + await self.send(create_update_message(update)) - def send_doc_update(room_name, update): - layer = get_channel_layer() - async_to_sync(layer.group_send)(room_name, {"type": "doc_update", "update": update}) - ``` + async def propagate_document_update_from_external(room_name, update): + channel_layer = get_channel_layer() + + await channel_layer.group_send( + room_name, + {"type": "propagate_document_update", "update": update}, + ) + ``` """ def __init__(self): super().__init__() - self.room_name = None - self.ydoc = None - self._websocket_shim = None + self.room_name: str | None = None + self.ydoc: Y.YDoc | None = None + self.room_storage: BaseYRoomStorage | None = None + self._websocket_shim: _WebsocketShim | None = None + + def make_room_storage(self) -> BaseYRoomStorage | None: + """Make the room storage for a new channel to persist the YDoc permanently. + + Defaults to not using any (just broadcast updates between consumers). + + Example: + self.room_storage = RedisYRoomStorage(self.room_name) + """ + return None def make_room_name(self) -> str: """Make the room name for a new channel. @@ -137,15 +161,10 @@ def make_room_name(self) -> str: """ return self.scope["url_route"]["kwargs"]["room"] - async def make_ydoc(self) -> Y.YDoc: - """Make the YDoc for a new channel. - - Override to customize the YDoc when a channel is created - (useful to initialize it with data from your database, or to add observers to it). + async def _make_ydoc(self) -> Y.YDoc: + if self.room_storage: + return await self.room_storage.get_document() - Returns: - The YDoc for a new channel. Defaults to a new empty YDoc. - """ return Y.YDoc() def _make_websocket_shim(self, path: str) -> _WebsocketShim: @@ -153,7 +172,9 @@ def _make_websocket_shim(self, path: str) -> _WebsocketShim: async def connect(self) -> None: self.room_name = self.make_room_name() - self.ydoc = await self.make_ydoc() + self.room_storage = self.make_room_storage() + + self.ydoc = await self._make_ydoc() self._websocket_shim = self._make_websocket_shim(self.scope["path"]) await self.channel_layer.group_add(self.room_name, self.channel_name) @@ -162,14 +183,32 @@ async def connect(self) -> None: await sync(self.ydoc, self._websocket_shim, logger) async def disconnect(self, code) -> None: + if self.room_storage: + await self.room_storage.close() + + if not self.room_name: + return + await self.channel_layer.group_discard(self.room_name, self.channel_name) async def receive(self, text_data=None, bytes_data=None): if bytes_data is None: return + await self.group_send_message(bytes_data) + if bytes_data[0] != YMessageType.SYNC: return + + # If it's an update message, apply it to the storage document + if self.room_storage and bytes_data[1] == YSyncMessageType.SYNC_UPDATE: + update = read_message(bytes_data[2:]) + + if update != EMPTY_UPDATE: + await self.room_storage.update_document(update) + + return + await process_sync_message(bytes_data[1:], self.ydoc, self._websocket_shim, logger) class WrappedMessage(TypedDict): diff --git a/ypy_websocket/django_channels/yroom_storage.py b/ypy_websocket/django_channels/yroom_storage.py new file mode 100644 index 0000000..4a3e5c5 --- /dev/null +++ b/ypy_websocket/django_channels/yroom_storage.py @@ -0,0 +1,211 @@ +import time +from typing import Optional + +import redis.asyncio as redis +import y_py as Y + + +class BaseYRoomStorage: + """Base class for YRoom storage. + + This class is responsible for storing, retrieving, updating and persisting the Ypy document. + + Each Django Channels Consumer should have its own YRoomStorage instance, although all consumers + and rooms with the same room name will be connected to the same document in the end. + + Updates to the document should be sent to the shared storage, instead of each + consumer having its own version of the YDoc. + + A full example of a Redis as temporary storage and Postgres as persistent storage is: + + ```py + from typing import Optional + + from django.db import models + from ypy_websocket.django_channels.yroom_storage import RedisYRoomStorage + + + class YDocSnapshotManager(models.Manager): + async def aget_snapshot(self, name) -> Optional[bytes]: + try: + instance: YDocSnapshot = await self.aget(name=name) + result = instance.data + if not isinstance(result, bytes): + # Postgres on psycopg2 returns memoryview + return bytes(result) + + except YDocSnapshot.DoesNotExist: + return None + else: + return result + + async def asave_snapshot(self, name, data): + return await self.aupdate_or_create(name=name, defaults={"data": data}) + + + class YDocSnapshot(models.Model): + name = models.CharField(max_length=255, primary_key=True) + data = models.BinaryField() + + objects = YDocSnapshotManager() + + + class CustomRoomStorage(RedisYRoomStorage): + async def load_snapshot(self) -> Optional[bytes]: + return await YDocSnapshot.objects.aget_snapshot(self.room_name) + + async def save_snapshot(self): + current_snapshot = await self.redis.get(self.redis_key) + + if not current_snapshot: + return + + await YDocSnapshot.objects.asave_snapshot( + self.room_name, + current_snapshot, + ) + + ``` + """ + + def __init__(self, room_name: str) -> None: + self.room_name = room_name + + self.last_saved_at = time.time() + self.save_throttle_interval = 5 + + async def get_document(self) -> Y.YDoc: + """Gets the document from the storage. + + Ideally it should be retrieved first from temporary storage (e.g. Redis) and then from + persistent storage (e.g. a database). + + Returns: + The document with the latest changes. + """ + + raise NotImplementedError + + async def update_document(self, update: bytes): + """Updates the document in the storage. + + Updates could be received by Yjs client (e.g. from a WebSocket) or from the server + (e.g. from a Django Celery job). + + Args: + update: The update to apply to the document. + """ + + raise NotImplementedError + + async def load_snapshot(self) -> Optional[bytes]: + """Gets the document from the database. Override this method to + implement a persistent storage. + + Defaults to None. + + Returns: + The latest document snapshot. + """ + return None + + async def save_snapshot(self) -> None: + """Saves a snapshot of the document to the storage. + + If you need to persist the document to a database, you should do it here. + + Default implementation does nothing. + """ + + pass + + async def throttled_save_snapshot(self) -> None: + """Saves a snapshot of the document to the storage, debouncing the calls.""" + + if time.time() - self.last_saved_at <= self.save_throttle_interval: + return + + await self.save_snapshot() + + self.last_saved_at = time.time() + + async def close(self): + """Closes the storage. + + Default implementation does nothing. + """ + + pass + + +class RedisYRoomStorage(BaseYRoomStorage): + """A YRoom storage that uses Redis as main storage, without + persistent storage. + + Args: + room_name: The name of the room. + """ + + def __init__(self, room_name: str) -> None: + super().__init__(room_name) + + self.redis_key = f"document:{self.room_name}" + self.redis = self.make_redis() + + def make_redis(self): + """Makes a Redis client. + + Defaults to a local client""" + + return redis.Redis(host="localhost", port=6379, db=0) + + async def get_document(self) -> Y.YDoc: + snapshot = await self.redis.get(self.redis_key) + + if not snapshot: + snapshot = await self.load_snapshot() + + document = Y.YDoc() + + if snapshot: + Y.apply_update(document, snapshot) + + return document + + async def update_document(self, update: bytes): + await self.redis.watch(self.redis_key) + + try: + current_document = await self.get_document() + updated_snapshot = self._apply_update_to_snapshot(current_document, update) + + async with self.redis.pipeline() as pipe: + while True: + try: + pipe.multi() + pipe.set(self.redis_key, updated_snapshot) + + await pipe.execute() + + break + except redis.WatchError: + current_snapshot = await self.get_document() + updated_snapshot = self._apply_update_to_snapshot( + current_snapshot, + update, + ) + + continue + finally: + await self.redis.unwatch() + + await self.throttled_save_snapshot() + + async def close(self): + await self.save_snapshot() + await self.redis.close() + + def _apply_update_to_snapshot(self, document: Y.YDoc, update: bytes) -> bytes: + Y.apply_update(document, update) + + return Y.encode_state_as_update(document) diff --git a/ypy_websocket/yutils.py b/ypy_websocket/yutils.py index 37d9c62..50b5949 100644 --- a/ypy_websocket/yutils.py +++ b/ypy_websocket/yutils.py @@ -19,6 +19,10 @@ class YSyncMessageType(IntEnum): SYNC_UPDATE = 2 +# Empty updates (see https://github.com/y-crdt/ypy/issues/98) +EMPTY_UPDATE = b"\x00\x00" + + def write_var_uint(num: int) -> bytes: res = [] while num > 127: @@ -127,8 +131,7 @@ async def process_sync_message(message: bytes, ydoc: Y.YDoc, websocket, log) -> YSyncMessageType.SYNC_UPDATE, ): update = read_message(msg) - # Ignore empty updates (see https://github.com/y-crdt/ypy/issues/98) - if update != b"\x00\x00": + if update != EMPTY_UPDATE: Y.apply_update(ydoc, update)