|
8 | 8 | import time
|
9 | 9 | import uuid
|
10 | 10 | from logging import Logger
|
11 |
| -from typing import Any |
| 11 | +from typing import Any, Literal |
12 | 12 |
|
13 | 13 | from jupyter_server.auth import authorized
|
14 | 14 | from jupyter_server.base.handlers import APIHandler, JupyterHandler
|
15 | 15 | from jupyter_server.utils import ensure_async
|
16 | 16 | from jupyter_ydoc import ydocs as YDOCS
|
17 |
| -from pycrdt import YMessageType, write_var_uint |
| 17 | +from pycrdt import write_var_uint |
18 | 18 | from pycrdt_websocket.websocket_server import YRoom
|
19 | 19 | from pycrdt_websocket.ystore import BaseYStore
|
20 | 20 | from tornado import web
|
@@ -132,6 +132,10 @@ def exception_logger(exception: Exception, log: Logger) -> bool:
|
132 | 132 | exception_handler=exception_logger,
|
133 | 133 | )
|
134 | 134 |
|
| 135 | + if self._room_id == "JupyterLab:globalAwareness": |
| 136 | + # Listen for the changes in GlobalAwareness to update users |
| 137 | + self.room.awareness.observe(self._on_global_awareness_event) |
| 138 | + |
135 | 139 | try:
|
136 | 140 | await self._websocket_server.start_room(self.room)
|
137 | 141 | except Exception as e:
|
@@ -280,31 +284,6 @@ async def on_message(self, message):
|
280 | 284 | """
|
281 | 285 | message_type = message[0]
|
282 | 286 |
|
283 |
| - if message_type == YMessageType.AWARENESS: |
284 |
| - # awareness |
285 |
| - skip = False |
286 |
| - changes = self.room.awareness.get_changes(message[1:]) |
287 |
| - added_users = changes["added"] |
288 |
| - removed_users = changes["removed"] |
289 |
| - for i, user in enumerate(added_users): |
290 |
| - u = changes["states"][i] |
291 |
| - if "user" in u: |
292 |
| - name = u["user"]["name"] |
293 |
| - self._websocket_server.connected_users[user] = name |
294 |
| - self.log.debug("Y user joined: %s", name) |
295 |
| - for user in removed_users: |
296 |
| - if user in self._websocket_server.connected_users: |
297 |
| - name = self._websocket_server.connected_users[user] |
298 |
| - del self._websocket_server.connected_users[user] |
299 |
| - self.log.debug("Y user left: %s", name) |
300 |
| - # filter out message depending on changes |
301 |
| - if skip: |
302 |
| - self.log.debug( |
303 |
| - "Filtered out Y message of type: %s", |
304 |
| - YMessageType(message_type).name, |
305 |
| - ) |
306 |
| - return skip |
307 |
| - |
308 | 287 | if message_type == MessageType.CHAT:
|
309 | 288 | msg = message[2:].decode("utf-8")
|
310 | 289 |
|
@@ -395,6 +374,31 @@ async def _clean_room(self) -> None:
|
395 | 374 | self._emit(LogLevel.INFO, "clean", "Loader deleted.")
|
396 | 375 | del self._room_locks[self._room_id]
|
397 | 376 |
|
| 377 | + def _on_global_awareness_event( |
| 378 | + self, topic: Literal["change", "update"], changes: tuple[dict[str, Any], Any] |
| 379 | + ) -> None: |
| 380 | + """ |
| 381 | + Update the users when the global awareness changes. |
| 382 | +
|
| 383 | + Parameters: |
| 384 | + topic (str): `"update"` or `"change"` (`"change"` is triggered only if the states are modified). |
| 385 | + changes (tuple[dict[str, Any], Any]): The changes and the origin of the changes. |
| 386 | + """ |
| 387 | + if topic != "change": |
| 388 | + return |
| 389 | + added_users = changes[0]["added"] |
| 390 | + removed_users = changes[0]["removed"] |
| 391 | + for user in added_users: |
| 392 | + u = self.room.awareness.states[user] |
| 393 | + if "user" in u: |
| 394 | + name = u["user"]["name"] |
| 395 | + self._websocket_server.connected_users[user] = name |
| 396 | + self.log.debug("Y user joined: %s", name) |
| 397 | + for user in removed_users: |
| 398 | + if user in self._websocket_server.connected_users: |
| 399 | + name = self._websocket_server.connected_users.pop(user) |
| 400 | + self.log.debug("Y user left: %s", name) |
| 401 | + |
398 | 402 | def check_origin(self, origin):
|
399 | 403 | """
|
400 | 404 | Check origin
|
|
0 commit comments