@@ -35,7 +35,6 @@ class DocumentAwareKernelClient(AsyncKernelClient):
3535 # status messages.
3636 _yrooms : t .Set [YRoom ] = Set (trait = Instance (YRoom ), default_value = set ())
3737
38-
3938 async def start_listening (self ):
4039 """Start listening to messages coming from the kernel.
4140
@@ -70,7 +69,7 @@ async def stop_listening(self):
7069 # Log any exceptions that were raised.
7170 except Exception as err :
7271 self .log .error (err )
73-
72+
7473 _listening_task : t .Optional [t .Awaitable ] = Any (allow_none = True )
7574
7675 def handle_incoming_message (self , channel_name : str , msg : list [bytes ]):
@@ -166,7 +165,7 @@ async def handle_outgoing_message(self, channel_name: str, msg: list[bytes]):
166165 return
167166
168167 # Update the last activity.
169- #self.last_activity = self.session.msg_time
168+ # self.last_activity = self.session.msg_time
170169
171170 await self .send_message_to_listeners (channel_name , msg )
172171
@@ -183,7 +182,7 @@ async def handle_iopub_message(self, msg: list[bytes]) -> t.Optional[list[bytes]
183182 -------
184183 Returns the message if it should be forwarded to listeners. Otherwise,
185184 returns `None` and keeps (i.e. intercepts) the message from going
186- to listenres .
185+ to listeners .
187186 """
188187 # NOTE: Here's where we will inject the kernel state
189188 # into the awareness of a document.
@@ -192,37 +191,34 @@ async def handle_iopub_message(self, msg: list[bytes]) -> t.Optional[list[bytes]
192191 dmsg = self .session .deserialize (msg , content = False )
193192 except Exception as e :
194193 self .log .error (f"Error deserializing message: { e } " )
195- raise ValueError
196-
197- if dmsg ["msg_type" ] == "status" :
198- # Forward to all yrooms.
199- for yroom in self ._yrooms :
200- # NOTE: We need to create a real message here.
201- awareness_update_message = b""
202- self .log .debug (f"Update Awareness here: { dmsg } . YRoom: { yroom } " )
203- #self.log.debug(f"Getting YDoc: {await yroom.get_ydoc()}")
204- #yroom.add_message(awareness_update_message)
205-
206- # TODO: returning message temporarily to not break UI
207- return msg
208-
194+ raise
209195
210196 # NOTE: Inject display data into ydoc.
211197 if dmsg ["msg_type" ] == "display_data" :
212198 # Forward to all yrooms.
213199 for yroom in self ._yrooms :
214200 update_document_message = b""
215- self .log .debug (f"Update Document here: { dmsg } . Yroom: { yroom } " )
216- #self.log.debug(f"Getting YDoc: {await yroom.get_ydoc()}")
217- #yroom.add_message(update_document_message)
218-
219- # TODO: returning message temporarily to not break UI
220- return msg
201+ # yroom.add_message(update_document_message)
221202
203+ # TODO: returning message temporarily to not break UI
222204 # If the message isn't handled above, return it and it will
223205 # be forwarded to all listeners
224206 return msg
225207
208+ def send_kernel_awareness (self , kernel_status : dict ):
209+ """
210+ Send kernel status awareness messages to all yrooms
211+ """
212+ for yroom in self ._yrooms :
213+ awareness = yroom .get_awareness ()
214+ if awareness is None :
215+ self .log .error (f"awareness cannot be None. room_id: { yroom .room_id } " )
216+ continue
217+ self .log .debug (f"current state: { awareness .get_local_state ()} room_id: { yroom .room_id } . kernel status: { kernel_status } " )
218+ awareness .set_local_state_field ("kernel" , kernel_status )
219+ self .log .debug (f"current state: { awareness .get_local_state ()} room_id: { yroom .room_id } " )
220+
221+
226222 async def add_yroom (self , yroom : YRoom ):
227223 """
228224 Register a YRoom with this kernel client. YRooms will
0 commit comments