Skip to content

Commit 6a4c7bd

Browse files
committed
Fixing bugs.
1 parent 13dec9b commit 6a4c7bd

File tree

3 files changed

+69
-45
lines changed

3 files changed

+69
-45
lines changed

jupyter_rtc_core/app.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,9 @@ class RtcExtensionApp(ExtensionApp):
2323
(r"api/collaboration/room/(.*)", YRoomWebsocket),
2424
# # handler that just adds compatibility with Jupyter Collaboration's frontend
2525
# (r"api/collaboration/session/(.*)", YRoomSessionHandler),
26-
(r"api/fileid/index", FileIDIndexHandler)
26+
(r"api/fileid/index", FileIDIndexHandler),
27+
*outputs_handlers
2728
]
28-
29-
for handler in outputs_handlers:
30-
handlers.append(handler)
3129

3230
yroom_manager_class = Type(
3331
klass=YRoomManager,

jupyter_rtc_core/outputs/manager.py

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ def _default_outputs_path(self):
2929

3030
def _ensure_path(self, file_id, cell_id):
3131
nested_dir = self.outputs_path / file_id / cell_id
32-
self.log.info(f"Creating directory: {nested_dir}")
3332
nested_dir.mkdir(parents=True, exist_ok=True)
3433

3534
def _build_path(self, file_id, cell_id=None, output_index=None):
@@ -42,7 +41,6 @@ def _build_path(self, file_id, cell_id=None, output_index=None):
4241

4342
def get_output(self, file_id, cell_id, output_index):
4443
"""Get an outputs by file_id, cell_id, and output_index."""
45-
self.log.info(f"OutputsManager.get: {file_id} {cell_id} {output_index}")
4644
path = self._build_path(file_id, cell_id, output_index)
4745
if not os.path.isfile(path):
4846
raise FileNotFoundError(f"The output file doesn't exist: {path}")
@@ -60,12 +58,15 @@ def get_stream(self, file_id, cell_id):
6058
return output
6159

6260
def write(self, file_id, cell_id, output):
63-
"""Write a new output for file_id and cell_id."""
64-
self.log.info(f"OutputsManager.write: {file_id} {cell_id} {output}")
65-
result = self.write_output(file_id, cell_id, output)
61+
"""Write a new output for file_id and cell_id.
62+
63+
Returns a placeholder output (pycrdt.Map) or None if no placeholder
64+
output should be written to the ydoc.
65+
"""
66+
placeholder = self.write_output(file_id, cell_id, output)
6667
if output["output_type"] == "stream" and self.stream_limit is not None:
67-
result = self.write_stream(file_id, cell_id, output)
68-
return result
68+
placeholder = self.write_stream(file_id, cell_id, output, placeholder)
69+
return placeholder
6970

7071
def write_output(self, file_id, cell_id, output):
7172
self._ensure_path(file_id, cell_id)
@@ -77,9 +78,10 @@ def write_output(self, file_id, cell_id, output):
7778
with open(path, "w", encoding="utf-8") as f:
7879
f.write(data)
7980
url = f"/api/outputs/{file_id}/{cell_id}/{index}.output"
80-
return Map({})
81+
self.log.info(f"Wrote output: {url}")
82+
return create_placeholder_output(output["output_type"], url)
8183

82-
def write_stream(self, file_id, cell_id, output) -> Map:
84+
def write_stream(self, file_id, cell_id, output, placeholder) -> Map:
8385
# How many stream outputs have been written for this cell previously
8486
count = self._stream_count.get(cell_id, 0)
8587

@@ -91,15 +93,16 @@ def write_stream(self, file_id, cell_id, output) -> Map:
9193
with open(path, "a", encoding="utf-8") as f:
9294
f.write(text)
9395
url = f"/api/outputs/{file_id}/{cell_id}/stream"
96+
self.log.info(f"Wrote stream: {url}")
9497

9598
# Increment the count
9699
count = count + 1
97100
self._stream_count[cell_id] = count
98101

99102
# Now create the placeholder output
100103
if count < self.stream_limit:
101-
# Return the original if we haven't reached the limit
102-
placeholder = Map({})
104+
# Return the original placeholder if we haven't reached the limit
105+
placeholder = placeholder
103106
elif count == self.stream_limit:
104107
# Return a link to the full stream output
105108
placeholder = Map({
@@ -109,6 +112,7 @@ def write_stream(self, file_id, cell_id, output) -> Map:
109112
}
110113
})
111114
elif count > self.stream_limit:
115+
# Return None to indicate that no placeholder should be written to the ydoc
112116
placeholder = None
113117
return placeholder
114118

@@ -123,4 +127,33 @@ def clear(self, file_id, cell_id=None):
123127
except KeyError:
124128
pass
125129
path = self._build_path(file_id, cell_id)
126-
shutil.rmtree(path)
130+
try:
131+
shutil.rmtree(path)
132+
except FileNotFoundError:
133+
pass
134+
135+
136+
def create_placeholder_output(output_type: str, url: str):
137+
metadata = dict(url=url)
138+
if output_type == "stream":
139+
output = Map({
140+
"output_type": "stream",
141+
"text": "",
142+
"metadata": metadata
143+
})
144+
elif output_type == "display_data":
145+
output = Map({
146+
"output_type": "display_data",
147+
"metadata": metadata
148+
})
149+
elif output_type == "execute_result":
150+
output = Map({
151+
"output_type": "execute_result",
152+
"metadata": metadata
153+
})
154+
elif output_type == "error":
155+
output = Map({
156+
"output_type": "error",
157+
"metadata": metadata
158+
})
159+
return output

jupyter_rtc_core/outputs/output_processor.py

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class OutputProcessor(LoggingConfigurable):
2222
def settings(self):
2323
"""A shortcut for the Tornado web app settings."""
2424
# self.KernelClient.KernelManager.AsyncMultiKernelManager.ServerApp
25-
return self.parent.parent.parent.web_app.settings
25+
return self.parent.parent.parent.parent.web_app.settings
2626

2727
@property
2828
def kernel_client(self):
@@ -45,9 +45,9 @@ def file_id_manager(self):
4545
return self.settings["file_id_manager"]
4646

4747
@property
48-
def jupyter_server_ydoc(self):
48+
def yroom_manager(self):
4949
"""A shortcut for the jupyter server ydoc manager."""
50-
return self.settings["jupyter_server_ydoc"]
50+
return self.settings["yroom_manager"]
5151

5252
def clear(self, cell_id=None):
5353
"""Clear the state of the output processor.
@@ -126,17 +126,12 @@ def process_outgoing_message(self, dmsg: dict):
126126
The content has not been deserialized yet as we need to verify we
127127
should process it.
128128
"""
129-
self.log.info("Process outgoing message")
130129
msg_type = dmsg["header"]["msg_type"]
131-
self.log.info(f"msg_type: {msg_type}")
132130
if msg_type not in ("stream", "display_data", "execute_result", "error"):
133131
return dmsg
134132
msg_id = dmsg["parent_header"]["msg_id"]
135-
self.log.info(f"msg_type: {msg_type}")
136133
content = self.parent.session.unpack(dmsg["content"])
137-
self.log.info(f"content: {content}")
138134
cell_id = self.get_cell_id(msg_id)
139-
self.log.info(f"cell_id: {cell_id}")
140135
if cell_id is None:
141136
# This is valid as cell_id is optional
142137
return dmsg
@@ -145,9 +140,11 @@ def process_outgoing_message(self, dmsg: dict):
145140

146141
async def output_task(self, msg_type, cell_id, content):
147142
"""A coroutine to handle output messages."""
148-
self.log.info(f"output_task: {msg_type} {cell_id}")
149143
try:
150-
kernel_session = await self.session_manager.get_session(kernel_id=self.kernel_id)
144+
# TODO: The session manager may have multiple notebooks connected to the kernel
145+
# but currently get_session only returns the first. We need to fix this and
146+
# find the notebook with the right cell_id.
147+
kernel_session = await self.session_manager.get_session(kernel_id=self.parent.parent.kernel_id)
151148
except Exception as e:
152149
self.log.error(f"An exception occurred when processing output for cell {cell_id}")
153150
self.log.exception(e)
@@ -156,37 +153,33 @@ async def output_task(self, msg_type, cell_id, content):
156153
path = kernel_session["path"]
157154

158155
file_id = self.file_id_manager.get_id(path)
159-
self.log.info(f"Output for file_id, cell_id: {file_id} {cell_id}")
160156
if file_id is None:
161157
self.log.error(f"Could not find file_id for path: {path}")
162158
return
163159
self._file_id = file_id
164-
try:
165-
notebook = await self.jupyter_server_ydoc.get_document(
166-
path=path,
167-
copy=False,
168-
file_format='json',
169-
content_type='notebook'
170-
)
171-
except Exception as e:
172-
self.log.error(f"An exception occurred when processing output for cell {cell_id}")
173-
self.log.exception(e)
174-
return
175-
cells = notebook.ycells
176-
177-
cell_index, target_cell = self.find_cell(cell_id, cells)
178-
if target_cell is None:
179-
# This is valid as cell_id is optional
180-
return
181160

182161
# Convert from the message spec to the nbformat output structure
183162
if self.use_outputs_service:
184163
output = self.transform_output(msg_type, content, ydoc=False)
185164
output = self.outputs_manager.write(file_id, cell_id, output)
186165
else:
187166
output = self.transform_output(msg_type, content, ydoc=True)
188-
if output is not None:
167+
168+
# Get the notebook ydoc from the room
169+
room_id = f"json:notebook:{file_id}"
170+
room = self.yroom_manager.get_room(room_id)
171+
if room is None:
172+
self.log.error(f"YRoom not found: {room_id}")
173+
return
174+
notebook = await room.get_jupyter_ydoc()
175+
self.log.info(f"Notebook: {notebook}")
176+
177+
# Write the outputs to the ydoc cell.
178+
cells = notebook.ycells
179+
cell_index, target_cell = self.find_cell(cell_id, cells)
180+
if target_cell is not None and output is not None:
189181
target_cell["outputs"].append(output)
182+
self.log.info(f"Write output to ydoc: {path} {cell_id} {output}")
190183

191184
def find_cell(self, cell_id, cells):
192185
"""Find a cell with a given cell_id in the list of cells.

0 commit comments

Comments
 (0)