|
4 | 4 | from functools import partial
|
5 | 5 | from inspect import isawaitable
|
6 | 6 | from logging import Logger, getLogger
|
7 |
| -from typing import Awaitable, Callable |
| 7 | +from typing import Any, Awaitable, Callable |
8 | 8 |
|
9 | 9 | from anyio import (
|
10 | 10 | TASK_STATUS_IGNORED,
|
|
16 | 16 | from anyio.abc import TaskGroup, TaskStatus
|
17 | 17 | from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
|
18 | 18 | from pycrdt import (
|
| 19 | + Awareness, |
19 | 20 | Doc,
|
20 | 21 | Subscription,
|
21 | 22 | YMessageType,
|
22 | 23 | YSyncMessageType,
|
| 24 | + create_awareness_message, |
23 | 25 | create_sync_message,
|
24 | 26 | create_update_message,
|
25 | 27 | handle_sync_message,
|
26 | 28 | )
|
27 | 29 |
|
28 |
| -from .awareness import Awareness |
29 | 30 | from .websocket import Websocket
|
30 | 31 | from .ystore import BaseYStore
|
31 | 32 | from .yutils import put_updates
|
@@ -77,11 +78,12 @@ def __init__(
|
77 | 78 | ydoc: An optional document for the room (a new one is created otherwise).
|
78 | 79 | """
|
79 | 80 | self.ydoc = Doc() if ydoc is None else ydoc
|
80 |
| - self.awareness = Awareness(self.ydoc) |
81 | 81 | self.ready_event = Event()
|
82 | 82 | self.ready = ready
|
83 | 83 | self.ystore = ystore
|
84 | 84 | self.log = log or getLogger(__name__)
|
| 85 | + self.awareness = Awareness(self.ydoc) |
| 86 | + self.awareness.observe(self.send_server_awareness) |
85 | 87 | self.clients = set()
|
86 | 88 | self._on_message = None
|
87 | 89 | self.exception_handler = exception_handler
|
@@ -304,3 +306,34 @@ async def serve(self, websocket: Websocket):
|
304 | 306 | self.clients.remove(websocket)
|
305 | 307 | except Exception as exception:
|
306 | 308 | self._handle_exception(exception)
|
| 309 | + |
| 310 | + def send_server_awareness(self, type: str, changes: tuple[dict[str, Any], Any]) -> None: |
| 311 | + """ |
| 312 | + Callback to broadcast the server awareness to clients. |
| 313 | +
|
| 314 | + Arguments: |
| 315 | + type: The change type. |
| 316 | + changes: The awareness changes. |
| 317 | + """ |
| 318 | + if type != "update" or changes[1] != "local": |
| 319 | + return |
| 320 | + |
| 321 | + if self._task_group is not None: |
| 322 | + updated_clients = [v for value in changes[0].values() for v in value] |
| 323 | + state = self.awareness.encode_awareness_update(updated_clients) |
| 324 | + message = create_awareness_message(state) |
| 325 | + self._task_group.start_soon(self._send_server_awareness, message) |
| 326 | + else: |
| 327 | + self.log.error("Cannot broadcast server awareness: YRoom not started") |
| 328 | + |
| 329 | + async def _send_server_awareness(self, state: bytes) -> None: |
| 330 | + try: |
| 331 | + async with create_task_group() as tg: |
| 332 | + for client in self.clients: |
| 333 | + self.log.debug( |
| 334 | + "Sending awareness from server to client with endpoint: %s", |
| 335 | + client.path, |
| 336 | + ) |
| 337 | + tg.start_soon(client.send, state) |
| 338 | + except Exception as e: |
| 339 | + self.log.error("Error while broadcasting awareness changes: %s", e) |
0 commit comments