Skip to content

Commit 94e7a41

Browse files
authored
Integrate outputs manager into extension app (#48)
* Integrate outputs manager into extension app. * Work to integrate output processor into kernel client. * Fix parent references. * Fixing bugs. * Fixing double assignment
1 parent a46ae91 commit 94e7a41

File tree

4 files changed

+145
-73
lines changed

4 files changed

+145
-73
lines changed

jupyter_rtc_core/app.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from .handlers import RouteHandler, FileIDIndexHandler
77
from .websockets import GlobalAwarenessWebsocket, YRoomWebsocket
88
from .rooms.yroom_manager import YRoomManager
9+
from .outputs import OutputsManager, outputs_handlers
910

1011
class RtcExtensionApp(ExtensionApp):
1112
name = "jupyter_rtc_core"
@@ -22,9 +23,10 @@ class RtcExtensionApp(ExtensionApp):
2223
(r"api/collaboration/room/(.*)", YRoomWebsocket),
2324
# # handler that just adds compatibility with Jupyter Collaboration's frontend
2425
# (r"api/collaboration/session/(.*)", YRoomSessionHandler),
25-
(r"api/fileid/index", FileIDIndexHandler)
26+
(r"api/fileid/index", FileIDIndexHandler),
27+
*outputs_handlers
2628
]
27-
29+
2830
yroom_manager_class = Type(
2931
klass=YRoomManager,
3032
help="""YRoom Manager Class.""",
@@ -35,6 +37,18 @@ class RtcExtensionApp(ExtensionApp):
3537
def yroom_manager(self) -> YRoomManager | None:
3638
return self.settings.get("yroom_manager", None)
3739

40+
outputs_manager_class = Type(
41+
klass=OutputsManager,
42+
help="Outputs manager class.",
43+
default_value=OutputsManager
44+
).tag(config=True)
45+
46+
outputs_manager = Instance(
47+
klass=OutputsManager,
48+
help="An instance of the OutputsManager",
49+
allow_none=True
50+
).tag(config=True)
51+
3852
def initialize(self):
3953
super().initialize()
4054

@@ -56,8 +70,11 @@ def get_fileid_manager():
5670
loop=loop,
5771
log=log
5872
)
73+
74+
# Initialize OutputsManager
75+
self.outputs_manager = self.outputs_manager_class(config=self.config)
76+
self.settings["outputs_manager"] = self.outputs_manager
5977

60-
6178
def _link_jupyter_server_extension(self, server_app):
6279
"""Setup custom config needed by this extension."""
6380
c = Config()

jupyter_rtc_core/kernels/kernel_client.py

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
"""
22
A new Kernel client that is aware of ydocuments.
33
"""
4+
import anyio
45
import asyncio
56
import json
67
import typing as t
7-
from traitlets import Set
8-
from traitlets import Instance
9-
from traitlets import Any
10-
from .utils import LRUCache
8+
9+
from traitlets import Set, Instance, Any, Type, default
1110
from jupyter_client.asynchronous.client import AsyncKernelClient
12-
import anyio
11+
12+
from .utils import LRUCache
1313
from jupyter_rtc_core.rooms.yroom import YRoom
14+
from jupyter_rtc_core.outputs import OutputProcessor
1415

1516

1617
class DocumentAwareKernelClient(AsyncKernelClient):
@@ -35,6 +36,21 @@ class DocumentAwareKernelClient(AsyncKernelClient):
3536
# status messages.
3637
_yrooms: t.Set[YRoom] = Set(trait=Instance(YRoom), default_value=set())
3738

39+
output_processor = Instance(
40+
OutputProcessor,
41+
allow_none=True
42+
)
43+
44+
output_process_class = Type(
45+
klass=OutputProcessor,
46+
default_value=OutputProcessor
47+
).tag(config=True)
48+
49+
@default("output_processor")
50+
def _default_output_processor(self) -> OutputProcessor:
51+
self.log.info("Creating output processor")
52+
return OutputProcessor(parent=self, config=self.config)
53+
3854
async def start_listening(self):
3955
"""Start listening to messages coming from the kernel.
4056
@@ -77,7 +93,8 @@ def handle_incoming_message(self, channel_name: str, msg: list[bytes]):
7793
# Cache the message ID and its socket name so that
7894
# any response message can be mapped back to the
7995
# source channel.
80-
header = header = json.loads(msg[0])
96+
self.output_processor.process_incoming_message(channel=channel_name, msg=msg)
97+
header = json.loads(msg[0]) # TODO: use session.unpack
8198
msg_id = header["msg_id"]
8299
self.message_source_cache[msg_id] = channel_name
83100
channel = getattr(self, f"{channel_name}_channel")
@@ -158,10 +175,9 @@ async def handle_outgoing_message(self, channel_name: str, msg: list[bytes]):
158175
# Intercept messages that are IOPub focused.
159176
if channel_name == "iopub":
160177
message_returned = await self.handle_iopub_message(msg)
161-
# TODO: If the message is not returned by the iopub handler, then
178+
# If the message is not returned by the iopub handler, then
162179
# return here and do not forward to listeners.
163180
if not message_returned:
164-
self.log.warn(f"If message is handled donot forward after adding output manager")
165181
return
166182

167183
# Update the last activity.
@@ -181,29 +197,25 @@ async def handle_iopub_message(self, msg: list[bytes]) -> t.Optional[list[bytes]
181197
Returns
182198
-------
183199
Returns the message if it should be forwarded to listeners. Otherwise,
184-
returns `None` and keeps (i.e. intercepts) the message from going
200+
returns `None` and prevents (i.e. intercepts) the message from going
185201
to listeners.
186202
"""
187-
# NOTE: Here's where we will inject the kernel state
188-
# into the awareness of a document.
189203

190204
try:
191205
dmsg = self.session.deserialize(msg, content=False)
192206
except Exception as e:
193207
self.log.error(f"Error deserializing message: {e}")
194208
raise
195209

196-
# NOTE: Inject display data into ydoc.
197-
if dmsg["msg_type"] == "display_data":
198-
# Forward to all yrooms.
199-
for yroom in self._yrooms:
200-
update_document_message = b""
201-
# yroom.add_message(update_document_message)
210+
if self.output_processor is not None and dmsg["msg_type"] in ("stream", "display_data", "execute_result", "error"):
211+
dmsg = self.output_processor.process_outgoing_message(dmsg)
202212

203-
# TODO: returning message temporarily to not break UI
204-
# If the message isn't handled above, return it and it will
205-
# be forwarded to all listeners
206-
return msg
213+
# If process_outgoing_message returns None, return None so the message isn't
214+
# sent to clients, otherwise return the original serialized message.
215+
if dmsg is None:
216+
return None
217+
else:
218+
return msg
207219

208220
def send_kernel_awareness(self, kernel_status: dict):
209221
"""
@@ -218,7 +230,6 @@ def send_kernel_awareness(self, kernel_status: dict):
218230
awareness.set_local_state_field("kernel", kernel_status)
219231
self.log.debug(f"current state: {awareness.get_local_state()} room_id: {yroom.room_id}")
220232

221-
222233
async def add_yroom(self, yroom: YRoom):
223234
"""
224235
Register a YRoom with this kernel client. YRooms will

jupyter_rtc_core/outputs/manager.py

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,10 @@
77

88
from traitlets.config import LoggingConfigurable
99
from traitlets import (
10-
Any,
11-
Bool,
1210
Dict,
1311
Instance,
14-
List,
15-
TraitError,
16-
Type,
17-
Unicode,
1812
Int,
19-
default,
20-
validate,
13+
default
2114
)
2215

2316
from jupyter_core.paths import jupyter_runtime_dir
@@ -36,7 +29,6 @@ def _default_outputs_path(self):
3629

3730
def _ensure_path(self, file_id, cell_id):
3831
nested_dir = self.outputs_path / file_id / cell_id
39-
self.log.info(f"Creating directory: {nested_dir}")
4032
nested_dir.mkdir(parents=True, exist_ok=True)
4133

4234
def _build_path(self, file_id, cell_id=None, output_index=None):
@@ -49,7 +41,6 @@ def _build_path(self, file_id, cell_id=None, output_index=None):
4941

5042
def get_output(self, file_id, cell_id, output_index):
5143
"""Get an outputs by file_id, cell_id, and output_index."""
52-
self.log.info(f"OutputsManager.get: {file_id} {cell_id} {output_index}")
5344
path = self._build_path(file_id, cell_id, output_index)
5445
if not os.path.isfile(path):
5546
raise FileNotFoundError(f"The output file doesn't exist: {path}")
@@ -67,12 +58,15 @@ def get_stream(self, file_id, cell_id):
6758
return output
6859

6960
def write(self, file_id, cell_id, output):
70-
"""Write a new output for file_id and cell_id."""
71-
self.log.info(f"OutputsManager.write: {file_id} {cell_id} {output}")
72-
result = self.write_output(file_id, cell_id, output)
61+
"""Write a new output for file_id and cell_id.
62+
63+
Returns a placeholder output (pycrdt.Map) or None if no placeholder
64+
output should be written to the ydoc.
65+
"""
66+
placeholder = self.write_output(file_id, cell_id, output)
7367
if output["output_type"] == "stream" and self.stream_limit is not None:
74-
result = self.write_stream(file_id, cell_id, output)
75-
return result
68+
placeholder = self.write_stream(file_id, cell_id, output, placeholder)
69+
return placeholder
7670

7771
def write_output(self, file_id, cell_id, output):
7872
self._ensure_path(file_id, cell_id)
@@ -84,9 +78,10 @@ def write_output(self, file_id, cell_id, output):
8478
with open(path, "w", encoding="utf-8") as f:
8579
f.write(data)
8680
url = f"/api/outputs/{file_id}/{cell_id}/{index}.output"
87-
return Map({})
81+
self.log.info(f"Wrote output: {url}")
82+
return create_placeholder_output(output["output_type"], url)
8883

89-
def write_stream(self, file_id, cell_id, output) -> Map:
84+
def write_stream(self, file_id, cell_id, output, placeholder) -> Map:
9085
# How many stream outputs have been written for this cell previously
9186
count = self._stream_count.get(cell_id, 0)
9287

@@ -98,15 +93,16 @@ def write_stream(self, file_id, cell_id, output) -> Map:
9893
with open(path, "a", encoding="utf-8") as f:
9994
f.write(text)
10095
url = f"/api/outputs/{file_id}/{cell_id}/stream"
96+
self.log.info(f"Wrote stream: {url}")
10197

10298
# Increment the count
10399
count = count + 1
104100
self._stream_count[cell_id] = count
105101

106102
# Now create the placeholder output
107103
if count < self.stream_limit:
108-
# Return the original if we haven't reached the limit
109-
placeholder = Map(output)
104+
# Return the original placeholder if we haven't reached the limit
105+
placeholder = placeholder
110106
elif count == self.stream_limit:
111107
# Return a link to the full stream output
112108
placeholder = Map({
@@ -116,6 +112,7 @@ def write_stream(self, file_id, cell_id, output) -> Map:
116112
}
117113
})
118114
elif count > self.stream_limit:
115+
# Return None to indicate that no placeholder should be written to the ydoc
119116
placeholder = None
120117
return placeholder
121118

@@ -130,4 +127,33 @@ def clear(self, file_id, cell_id=None):
130127
except KeyError:
131128
pass
132129
path = self._build_path(file_id, cell_id)
133-
shutil.rmtree(path)
130+
try:
131+
shutil.rmtree(path)
132+
except FileNotFoundError:
133+
pass
134+
135+
136+
def create_placeholder_output(output_type: str, url: str):
137+
metadata = dict(url=url)
138+
if output_type == "stream":
139+
output = Map({
140+
"output_type": "stream",
141+
"text": "",
142+
"metadata": metadata
143+
})
144+
elif output_type == "display_data":
145+
output = Map({
146+
"output_type": "display_data",
147+
"metadata": metadata
148+
})
149+
elif output_type == "execute_result":
150+
output = Map({
151+
"output_type": "execute_result",
152+
"metadata": metadata
153+
})
154+
elif output_type == "error":
155+
output = Map({
156+
"output_type": "error",
157+
"metadata": metadata
158+
})
159+
return output

0 commit comments

Comments
 (0)