Skip to content

Commit 095bb38

Browse files
committed
Work to integrate output processor into kernel client.
1 parent 9fd486e commit 095bb38

File tree

3 files changed

+77
-47
lines changed

3 files changed

+77
-47
lines changed

jupyter_rtc_core/kernels/kernel_client.py

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
"""
22
A new Kernel client that is aware of ydocuments.
33
"""
4+
import anyio
45
import asyncio
56
import json
67
import typing as t
7-
from traitlets import Set
8-
from traitlets import Instance
9-
from traitlets import Any
10-
from .utils import LRUCache
8+
9+
from traitlets import Set, Instance, Any, Type, default
1110
from jupyter_client.asynchronous.client import AsyncKernelClient
12-
import anyio
11+
12+
from .utils import LRUCache
1313
from jupyter_rtc_core.rooms.yroom import YRoom
14+
from jupyter_rtc_core.outputs import OutputProcessor
1415

1516

1617
class DocumentAwareKernelClient(AsyncKernelClient):
@@ -35,6 +36,20 @@ class DocumentAwareKernelClient(AsyncKernelClient):
3536
# status messages.
3637
_yrooms: t.Set[YRoom] = Set(trait=Instance(YRoom), default_value=set())
3738

39+
output_processor = Instance(
40+
OutputProcessor,
41+
allow_none=True
42+
)
43+
44+
output_process_class = Type(
45+
klass=OutputProcessor,
46+
default_value=OutputProcessor
47+
).tag(config=True)
48+
49+
@default("output_processor")
50+
def _default_output_processor(self) -> OutputProcessor:
51+
self.log.info("Creating output processor")
52+
return OutputProcessor(parent=self, config=self.config)
3853

3954
async def start_listening(self):
4055
"""Start listening to messages coming from the kernel.
@@ -78,7 +93,8 @@ def handle_incoming_message(self, channel_name: str, msg: list[bytes]):
7893
# Cache the message ID and its socket name so that
7994
# any response message can be mapped back to the
8095
# source channel.
81-
header = header = json.loads(msg[0])
96+
self.output_processor.process_incoming_message(channel=channel_name, msg=msg)
97+
header = header = json.loads(msg[0]) # TODO: use session.unpack
8298
msg_id = header["msg_id"]
8399
self.message_source_cache[msg_id] = channel_name
84100
channel = getattr(self, f"{channel_name}_channel")
@@ -159,10 +175,9 @@ async def handle_outgoing_message(self, channel_name: str, msg: list[bytes]):
159175
# Intercept messages that are IOPub focused.
160176
if channel_name == "iopub":
161177
message_returned = await self.handle_iopub_message(msg)
162-
# TODO: If the message is not returned by the iopub handler, then
178+
# If the message is not returned by the iopub handler, then
163179
# return here and do not forward to listeners.
164180
if not message_returned:
165-
self.log.warn(f"If message is handled donot forward after adding output manager")
166181
return
167182

168183
# Update the last activity.
@@ -182,47 +197,38 @@ async def handle_iopub_message(self, msg: list[bytes]) -> t.Optional[list[bytes]
182197
Returns
183198
-------
184199
Returns the message if it should be forwarded to listeners. Otherwise,
185-
returns `None` and keeps (i.e. intercepts) the message from going
186-
to listenres.
200+
returns `None` and prevents (i.e. intercepts) the message from going
201+
to listeners.
187202
"""
188-
# NOTE: Here's where we will inject the kernel state
189-
# into the awareness of a document.
190203

191204
try:
192205
dmsg = self.session.deserialize(msg, content=False)
193206
except Exception as e:
194207
self.log.error(f"Error deserializing message: {e}")
195208
raise ValueError
196209

197-
if dmsg["msg_type"] == "status":
198-
# Forward to all yrooms.
199-
for yroom in self._yrooms:
200-
# NOTE: We need to create a real message here.
201-
awareness_update_message = b""
202-
self.log.debug(f"Update Awareness here: {dmsg}. YRoom: {yroom}")
203-
#self.log.debug(f"Getting YDoc: {await yroom.get_ydoc()}")
204-
#yroom.add_message(awareness_update_message)
210+
# if dmsg["msg_type"] == "status":
211+
# # Forward to all yrooms.
212+
# for yroom in self._yrooms:
213+
# # NOTE: We need to create a real message here.
214+
# awareness_update_message = b""
215+
# self.log.debug(f"Update Awareness here: {dmsg}. YRoom: {yroom}")
216+
# #self.log.debug(f"Getting YDoc: {await yroom.get_ydoc()}")
217+
# #yroom.add_message(awareness_update_message)
205218

206-
# TODO: returning message temporarily to not break UI
207-
return msg
219+
# # TODO: returning message temporarily to not break UI
220+
# return msg
208221

209-
210-
# NOTE: Inject display data into ydoc.
211-
if dmsg["msg_type"] == "display_data":
212-
# Forward to all yrooms.
213-
for yroom in self._yrooms:
214-
update_document_message = b""
215-
self.log.debug(f"Update Document here: {dmsg}. Yroom: {yroom}")
216-
#self.log.debug(f"Getting YDoc: {await yroom.get_ydoc()}")
217-
#yroom.add_message(update_document_message)
218-
219-
# TODO: returning message temporarily to not break UI
222+
if self.output_processor is not None and dmsg["msg_type"] in ("stream", "display_data", "execute_result", "error"):
223+
dmsg = self.output_processor.process_outgoing_message(dmsg)
224+
225+
# If process_outgoing_message returns None, return None so the message isn't
226+
# sent to clients, otherwise return the original serialized message.
227+
if dmsg is None:
228+
return None
229+
else:
220230
return msg
221231

222-
# If the message isn't handled above, return it and it will
223-
# be forwarded to all listeners
224-
return msg
225-
226232
async def add_yroom(self, yroom: YRoom):
227233
"""
228234
Register a YRoom with this kernel client. YRooms will

jupyter_rtc_core/outputs/manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def write_stream(self, file_id, cell_id, output) -> Map:
9999
# Now create the placeholder output
100100
if count < self.stream_limit:
101101
# Return the original if we haven't reached the limit
102-
placeholder = Map(output)
102+
placeholder = Map({})
103103
elif count == self.stream_limit:
104104
# Return a link to the full stream output
105105
placeholder = Map({

jupyter_rtc_core/outputs/output_processor.py

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@ class OutputProcessor(LoggingConfigurable):
1515

1616
use_outputs_service = Bool(
1717
default_value=True,
18-
config=True,
1918
help="Should outputs be routed to the outputs service to minimize the in memory ydoc size."
20-
)
19+
).tag(config=True)
2120

2221
@property
2322
def settings(self):
@@ -107,35 +106,58 @@ def process_incoming_message(self, channel: str, msg: list[bytes]):
107106
if self._file_id is not None:
108107
if self.use_outputs_service:
109108
self.outputs_manager.clear(file_id=self._file_id, cell_id=cell_id)
109+
self.outputs_manager.clear(file_id=self._file_id)
110110
self.log.info(f"Saving (msg_id, cell_id): ({msg_id} {cell_id})")
111111
self.set_cell_id(msg_id, cell_id)
112112

113113
# Outgoing messages
114114

115-
def process_outgoing_message(self, channel: str, msg: list[bytes]):
116-
"""Process outgoing messagers from the kernel."""
117-
dmsg = self.kernel_client.session.deserialize(msg)
115+
def process_outgoing_message(self, dmsg: dict):
116+
"""Process outgoing messages from the kernel.
117+
118+
This returns the input dmsg if no the message should be sent to
119+
clients, or None if it should not be sent.
120+
121+
The dmsg is a deserialized message generated by calling:
122+
123+
> self.kernel_client.session.deserialize(dmsg, content=False)
124+
125+
The content has not been deserialized yet as we need to verify we
126+
should process it.
127+
"""
128+
self.log.info("Process outgoing message")
118129
msg_type = dmsg["header"]["msg_type"]
130+
self.log.info(f"msg_type: {msg_type}")
131+
if msg_type not in ("stream", "display_data", "execute_result", "error"):
132+
return dmsg
119133
msg_id = dmsg["parent_header"]["msg_id"]
120-
content = dmsg["content"]
134+
self.log.info(f"msg_type: {msg_type}")
135+
content = self.parent.session.unpack(dmsg["content"])
136+
self.log.info(f"content: {content}")
121137
cell_id = self.get_cell_id(msg_id)
138+
self.log.info(f"cell_id: {cell_id}")
122139
if cell_id is None:
123140
# This is valid as cell_id is optional
124-
return
141+
return dmsg
125142
asyncio.create_task(self.output_task(msg_type, cell_id, content))
126143
return None # Don't allow the original message to propagate to the frontend
127144

128145
async def output_task(self, msg_type, cell_id, content):
129146
"""A coroutine to handle output messages."""
147+
self.log.info(f"output_task: {msg_type} {cell_id}")
130148
try:
131149
kernel_session = await self.session_manager.get_session(kernel_id=self.kernel_id)
132-
except: # TODO: what exception to catch and log?
150+
except Exception as e:
151+
self.log.error(f"An exception occurred when processing output for cell {cell_id}")
152+
self.log.exception(e)
133153
return
134154
else:
135155
path = kernel_session["path"]
136156

137157
file_id = self.file_id_manager.get_id(path)
158+
self.log.info(f"Output for file_id, cell_id: {file_id} {cell_id}")
138159
if file_id is None:
160+
self.log.error(f"Could not find file_id for path: {path}")
139161
return
140162
self._file_id = file_id
141163
try:
@@ -145,7 +167,9 @@ async def output_task(self, msg_type, cell_id, content):
145167
file_format='json',
146168
content_type='notebook'
147169
)
148-
except: # TODO: what exception to catch and log?
170+
except Exception as e:
171+
self.log.error(f"An exception occurred when processing output for cell {cell_id}")
172+
self.log.exception(e)
149173
return
150174
cells = notebook.ycells
151175

0 commit comments

Comments
 (0)