Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 20 additions & 24 deletions jupyter_rtc_core/kernels/kernel_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class DocumentAwareKernelClient(AsyncKernelClient):
# status messages.
_yrooms: t.Set[YRoom] = Set(trait=Instance(YRoom), default_value=set())


async def start_listening(self):
"""Start listening to messages coming from the kernel.

Expand Down Expand Up @@ -70,7 +69,7 @@ async def stop_listening(self):
# Log any exceptions that were raised.
except Exception as err:
self.log.error(err)

_listening_task: t.Optional[t.Awaitable] = Any(allow_none=True)

def handle_incoming_message(self, channel_name: str, msg: list[bytes]):
Expand Down Expand Up @@ -166,7 +165,7 @@ async def handle_outgoing_message(self, channel_name: str, msg: list[bytes]):
return

# Update the last activity.
#self.last_activity = self.session.msg_time
# self.last_activity = self.session.msg_time

await self.send_message_to_listeners(channel_name, msg)

Expand All @@ -183,7 +182,7 @@ async def handle_iopub_message(self, msg: list[bytes]) -> t.Optional[list[bytes]
-------
Returns the message if it should be forwarded to listeners. Otherwise,
returns `None` and keeps (i.e. intercepts) the message from going
to listenres.
to listeners.
"""
# NOTE: Here's where we will inject the kernel state
# into the awareness of a document.
Expand All @@ -192,37 +191,34 @@ async def handle_iopub_message(self, msg: list[bytes]) -> t.Optional[list[bytes]
dmsg = self.session.deserialize(msg, content=False)
except Exception as e:
self.log.error(f"Error deserializing message: {e}")
raise ValueError

if dmsg["msg_type"] == "status":
# Forward to all yrooms.
for yroom in self._yrooms:
# NOTE: We need to create a real message here.
awareness_update_message = b""
self.log.debug(f"Update Awareness here: {dmsg}. YRoom: {yroom}")
#self.log.debug(f"Getting YDoc: {await yroom.get_ydoc()}")
#yroom.add_message(awareness_update_message)

# TODO: returning message temporarily to not break UI
return msg

raise

# NOTE: Inject display data into ydoc.
if dmsg["msg_type"] == "display_data":
# Forward to all yrooms.
for yroom in self._yrooms:
update_document_message = b""
self.log.debug(f"Update Document here: {dmsg}. Yroom: {yroom}")
#self.log.debug(f"Getting YDoc: {await yroom.get_ydoc()}")
#yroom.add_message(update_document_message)

# TODO: returning message temporarily to not break UI
return msg
# yroom.add_message(update_document_message)

# TODO: returning message temporarily to not break UI
# If the message isn't handled above, return it and it will
# be forwarded to all listeners
return msg

def send_kernel_awareness(self, kernel_status: dict):
"""
Send kernel status awareness messages to all yrooms
"""
for yroom in self._yrooms:
awareness = yroom.get_awareness()
if awareness is None:
self.log.error(f"awareness cannot be None. room_id: {yroom.room_id}")
continue
self.log.debug(f"current state: {awareness.get_local_state()} room_id: {yroom.room_id}. kernel status: {kernel_status}")
awareness.set_local_state_field("kernel", kernel_status)
self.log.debug(f"current state: {awareness.get_local_state()} room_id: {yroom.room_id}")


async def add_yroom(self, yroom: YRoom):
"""
Register a YRoom with this kernel client. YRooms will
Expand Down
11 changes: 10 additions & 1 deletion jupyter_rtc_core/kernels/kernel_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ def set_state(

if broadcast:
# Broadcast this state change to all listeners
# Turn off state broadcasting temporarily to avoid
self._state_observers = None
self.broadcast_state()

Expand Down Expand Up @@ -202,4 +201,14 @@ def execution_state_listener(self, channel_name: str, msg: list[bytes]):
if parent_channel and parent_channel == "shell":
# Don't broadcast, since this message is already going out.
self.set_state(LifecycleStates.CONNECTED, ExecutionStates(execution_state), broadcast=False)

kernel_status = {
"execution_state": self.execution_state,
"lifecycle_state": self.lifecycle_state
}
self.log.debug(f"Sending kernel status awareness {kernel_status}")
self.main_client.send_kernel_awareness(kernel_status)
self.log.debug(f"Sent kernel status awareness {kernel_status}")



2 changes: 1 addition & 1 deletion jupyter_rtc_core/session_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


class YDocSessionManager(SessionManager):
"""A Jupyter Server Session Manager that's connects YDocuments
"""A Jupyter Server Session Manager that connects YDocuments
to Kernel Clients.
"""

Expand Down
Loading