|
8 | 8 | import time
|
9 | 9 | import uuid
|
10 | 10 | from logging import Logger
|
11 |
| -from typing import Any |
| 11 | +from typing import Any, Literal |
12 | 12 | from uuid import uuid4
|
13 | 13 |
|
14 | 14 | from jupyter_server.auth import authorized
|
15 | 15 | from jupyter_server.base.handlers import APIHandler, JupyterHandler
|
16 | 16 | from jupyter_server.utils import ensure_async
|
17 | 17 | from jupyter_ydoc import ydocs as YDOCS
|
18 |
| -from pycrdt import Doc, UndoManager, YMessageType, write_var_uint |
| 18 | +from pycrdt import Doc, UndoManager, write_var_uint |
19 | 19 | from pycrdt_websocket.websocket_server import YRoom
|
20 | 20 | from pycrdt_websocket.ystore import BaseYStore
|
21 | 21 | from tornado import web
|
@@ -142,6 +142,10 @@ def exception_logger(exception: Exception, log: Logger) -> bool:
|
142 | 142 | exception_handler=exception_logger,
|
143 | 143 | )
|
144 | 144 |
|
| 145 | + if self._room_id == "JupyterLab:globalAwareness": |
| 146 | + # Listen for the changes in GlobalAwareness to update users |
| 147 | + self.room.awareness.observe(self._on_global_awareness_event) |
| 148 | + |
145 | 149 | try:
|
146 | 150 | await self._websocket_server.start_room(self.room)
|
147 | 151 | except Exception as e:
|
@@ -293,31 +297,6 @@ async def on_message(self, message):
|
293 | 297 | """
|
294 | 298 | message_type = message[0]
|
295 | 299 |
|
296 |
| - if message_type == YMessageType.AWARENESS: |
297 |
| - # awareness |
298 |
| - skip = False |
299 |
| - changes = self.room.awareness.get_changes(message[1:]) |
300 |
| - added_users = changes["added"] |
301 |
| - removed_users = changes["removed"] |
302 |
| - for i, user in enumerate(added_users): |
303 |
| - u = changes["states"][i] |
304 |
| - if "user" in u: |
305 |
| - name = u["user"]["name"] |
306 |
| - self._websocket_server.connected_users[user] = name |
307 |
| - self.log.debug("Y user joined: %s", name) |
308 |
| - for user in removed_users: |
309 |
| - if user in self._websocket_server.connected_users: |
310 |
| - name = self._websocket_server.connected_users[user] |
311 |
| - del self._websocket_server.connected_users[user] |
312 |
| - self.log.debug("Y user left: %s", name) |
313 |
| - # filter out message depending on changes |
314 |
| - if skip: |
315 |
| - self.log.debug( |
316 |
| - "Filtered out Y message of type: %s", |
317 |
| - YMessageType(message_type).name, |
318 |
| - ) |
319 |
| - return skip |
320 |
| - |
321 | 300 | if message_type == MessageType.CHAT:
|
322 | 301 | msg = message[2:].decode("utf-8")
|
323 | 302 |
|
@@ -408,6 +387,31 @@ async def _clean_room(self) -> None:
|
408 | 387 | self._emit(LogLevel.INFO, "clean", "Loader deleted.")
|
409 | 388 | del self._room_locks[self._room_id]
|
410 | 389 |
|
| 390 | + def _on_global_awareness_event( |
| 391 | + self, topic: Literal["change", "update"], changes: tuple[dict[str, Any], Any] |
| 392 | + ) -> None: |
| 393 | + """ |
| 394 | + Update the users when the global awareness changes. |
| 395 | +
|
| 396 | + Parameters: |
| 397 | + topic (str): `"update"` or `"change"` (`"change"` is triggered only if the states are modified). |
| 398 | + changes (tuple[dict[str, Any], Any]): The changes and the origin of the changes. |
| 399 | + """ |
| 400 | + if topic != "change": |
| 401 | + return |
| 402 | + added_users = changes[0]["added"] |
| 403 | + removed_users = changes[0]["removed"] |
| 404 | + for user in added_users: |
| 405 | + u = self.room.awareness.states[user] |
| 406 | + if "user" in u: |
| 407 | + name = u["user"]["name"] |
| 408 | + self._websocket_server.connected_users[user] = name |
| 409 | + self.log.debug("Y user joined: %s", name) |
| 410 | + for user in removed_users: |
| 411 | + if user in self._websocket_server.connected_users: |
| 412 | + name = self._websocket_server.connected_users.pop(user) |
| 413 | + self.log.debug("Y user left: %s", name) |
| 414 | + |
411 | 415 | def check_origin(self, origin):
|
412 | 416 | """
|
413 | 417 | Check origin
|
|
0 commit comments