|
8 | 8 | import time
|
9 | 9 | import uuid
|
10 | 10 | from logging import Logger
|
11 |
| -from typing import Any |
| 11 | +from typing import Any, Literal |
12 | 12 | from uuid import uuid4
|
13 | 13 |
|
14 | 14 | from jupyter_server.auth import authorized
|
15 | 15 | from jupyter_server.base.handlers import APIHandler, JupyterHandler
|
16 | 16 | from jupyter_server.utils import ensure_async
|
17 | 17 | from jupyter_ydoc import ydocs as YDOCS
|
18 |
| -from pycrdt import Doc, UndoManager, YMessageType, write_var_uint |
| 18 | +from pycrdt import Doc, UndoManager, write_var_uint |
19 | 19 | from pycrdt_websocket.websocket_server import YRoom
|
20 | 20 | from pycrdt_websocket.ystore import BaseYStore
|
21 | 21 | from tornado import web
|
@@ -137,6 +137,10 @@ def exception_logger(exception: Exception, log: Logger) -> bool:
|
137 | 137 | exception_handler=exception_logger,
|
138 | 138 | )
|
139 | 139 |
|
| 140 | + if self._room_id == "JupyterLab:globalAwareness": |
| 141 | + # Listen for the changes in GlobalAwareness to update users |
| 142 | + self.room.awareness.observe(self._on_global_awareness_event) |
| 143 | + |
140 | 144 | try:
|
141 | 145 | await self._websocket_server.start_room(self.room)
|
142 | 146 | except Exception as e:
|
@@ -286,31 +290,6 @@ async def on_message(self, message):
|
286 | 290 | """
|
287 | 291 | message_type = message[0]
|
288 | 292 |
|
289 |
| - if message_type == YMessageType.AWARENESS: |
290 |
| - # awareness |
291 |
| - skip = False |
292 |
| - changes = self.room.awareness.get_changes(message[1:]) |
293 |
| - added_users = changes["added"] |
294 |
| - removed_users = changes["removed"] |
295 |
| - for i, user in enumerate(added_users): |
296 |
| - u = changes["states"][i] |
297 |
| - if "user" in u: |
298 |
| - name = u["user"]["name"] |
299 |
| - self._websocket_server.connected_users[user] = name |
300 |
| - self.log.debug("Y user joined: %s", name) |
301 |
| - for user in removed_users: |
302 |
| - if user in self._websocket_server.connected_users: |
303 |
| - name = self._websocket_server.connected_users[user] |
304 |
| - del self._websocket_server.connected_users[user] |
305 |
| - self.log.debug("Y user left: %s", name) |
306 |
| - # filter out message depending on changes |
307 |
| - if skip: |
308 |
| - self.log.debug( |
309 |
| - "Filtered out Y message of type: %s", |
310 |
| - YMessageType(message_type).name, |
311 |
| - ) |
312 |
| - return skip |
313 |
| - |
314 | 293 | if message_type == MessageType.CHAT:
|
315 | 294 | msg = message[2:].decode("utf-8")
|
316 | 295 |
|
@@ -405,6 +384,31 @@ async def _clean_room(self) -> None:
|
405 | 384 | self._emit(LogLevel.INFO, "clean", "Loader deleted.")
|
406 | 385 | del self._room_locks[self._room_id]
|
407 | 386 |
|
| 387 | + def _on_global_awareness_event( |
| 388 | + self, topic: Literal["change", "update"], changes: tuple[dict[str, Any], Any] |
| 389 | + ) -> None: |
| 390 | + """ |
| 391 | + Update the users when the global awareness changes. |
| 392 | +
|
| 393 | + Parameters: |
| 394 | + topic (str): `"update"` or `"change"` (`"change"` is triggered only if the states are modified). |
| 395 | + changes (tuple[dict[str, Any], Any]): The changes and the origin of the changes. |
| 396 | + """ |
| 397 | + if topic != "change": |
| 398 | + return |
| 399 | + added_users = changes[0]["added"] |
| 400 | + removed_users = changes[0]["removed"] |
| 401 | + for user in added_users: |
| 402 | + u = self.room.awareness.states[user] |
| 403 | + if "user" in u: |
| 404 | + name = u["user"]["name"] |
| 405 | + self._websocket_server.connected_users[user] = name |
| 406 | + self.log.debug("Y user joined: %s", name) |
| 407 | + for user in removed_users: |
| 408 | + if user in self._websocket_server.connected_users: |
| 409 | + name = self._websocket_server.connected_users.pop(user) |
| 410 | + self.log.debug("Y user left: %s", name) |
| 411 | + |
408 | 412 | def check_origin(self, origin):
|
409 | 413 | """
|
410 | 414 | Check origin
|
|
0 commit comments