Skip to content

Commit 1f38302

Browse files
committed
More intelligent stream handling.
1 parent 44c855c commit 1f38302

File tree

2 files changed

+58
-23
lines changed

2 files changed

+58
-23
lines changed

jupyter_rtc_core/outputs/manager.py

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from pathlib import Path, PurePath
44
import shutil
55

6+
from pycrdt import Map
67

78
from traitlets.config import LoggingConfigurable
89
from traitlets import (
@@ -14,6 +15,7 @@
1415
TraitError,
1516
Type,
1617
Unicode,
18+
Int,
1719
default,
1820
validate,
1921
)
@@ -22,8 +24,11 @@
2224

2325
class OutputsManager(LoggingConfigurable):
2426

25-
outputs_path = Instance(PurePath, help="The local runtime dir")
2627
_last_output_index = Dict(default_value={})
28+
_stream_count = Dict(default_value={})
29+
30+
outputs_path = Instance(PurePath, help="The local runtime dir")
31+
stream_limit = Int(default_value=10, config=True, allow_none=True)
2732

2833
@default("outputs_path")
2934
def _default_outputs_path(self):
@@ -64,11 +69,10 @@ def get_stream(self, file_id, cell_id):
6469
def write(self, file_id, cell_id, output):
6570
"""Write a new output for file_id and cell_id."""
6671
self.log.info(f"OutputsManager.write: {file_id} {cell_id} {output}")
67-
if output["output_type"] == "stream":
68-
url = self.write_stream(file_id, cell_id, output)
69-
else:
70-
url = self.write_output(file_id, cell_id, output)
71-
return url
72+
result = self.write_output(file_id, cell_id, output)
73+
if output["output_type"] == "stream" and self.stream_limit is not None:
74+
result = self.write_stream(file_id, cell_id, output)
75+
return result
7276

7377
def write_output(self, file_id, cell_id, output):
7478
self._ensure_path(file_id, cell_id)
@@ -80,17 +84,46 @@ def write_output(self, file_id, cell_id, output):
8084
with open(path, "w", encoding="utf-8") as f:
8185
f.write(data)
8286
url = f"/api/outputs/{file_id}/{cell_id}/{index}.output"
83-
return url
87+
return Map({})
8488

85-
def write_stream(self, file_id, cell_id, output):
89+
def write_stream(self, file_id, cell_id, output) -> Map:
90+
# How many stream outputs have been written for this cell previously
91+
count = self._stream_count.get(cell_id, 0)
92+
93+
# Go ahead and write the incoming stream
8694
self._ensure_path(file_id, cell_id)
8795
path = self._build_path(file_id, cell_id) / "stream"
8896
text = output["text"]
8997
mode = 'a' if os.path.isfile(path) else 'w'
9098
with open(path, "a", encoding="utf-8") as f:
9199
f.write(text)
92100
url = f"/api/outputs/{file_id}/{cell_id}/stream"
93-
return url
101+
102+
# Increment the count
103+
count = count + 1
104+
self._stream_count[cell_id] = count
105+
106+
# Now create the placeholder output
107+
if count < self.stream_limit:
108+
# Return the original if we haven't reached the limit
109+
placeholder = Map(output)
110+
elif count == self.stream_limit:
111+
# Return a link to the full stream output
112+
placeholder = Map({
113+
"output_type": "display_data",
114+
"data": {
115+
'text/html': f'<a href="{url}">Click this link to see the full stream output</a>'
116+
}
117+
})
118+
elif count > self.stream_limit:
119+
placeholder = None
120+
return placeholder
121+
122+
123+
124+
125+
126+
94127

95128
def clear(self, file_id, cell_id=None):
96129
path = self._build_path(file_id, cell_id)

jupyter_rtc_core/outputs/output_processor.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from pycrdt import Map
55

6-
from traitlets import Dict, Unicode
6+
from traitlets import Dict, Unicode, Bool
77
from traitlets.config import LoggingConfigurable
88

99

@@ -13,6 +13,12 @@ class OutputProcessor(LoggingConfigurable):
1313
_cell_indices = Dict(default_value={}) # a map from cell_id -> cell index in notebook
1414
_file_id = Unicode(default_value=None, allow_none=True)
1515

16+
use_outputs_service = Bool(
17+
default_value=True,
18+
config=True,
19+
help="Should outputs be routed to the outputs service to minimize the in memory ydoc size."
20+
)
21+
1622
@property
1723
def settings(self):
1824
"""A shortcut for the Tornado web app settings."""
@@ -99,7 +105,8 @@ def process_incoming_message(self, channel: str, msg: list[bytes]):
99105
if existing_msg_id != msg_id: # cell is being re-run, clear output state
100106
self.clear(cell_id)
101107
if self._file_id is not None:
102-
self.outputs_manager.clear(file_id=self._file_id, cell_id=cell_id)
108+
if self.use_outputs_service:
109+
self.outputs_manager.clear(file_id=self._file_id, cell_id=cell_id)
103110
self.log.info(f"Saving (msg_id, cell_id): ({msg_id} {cell_id})")
104111
self.set_cell_id(msg_id, cell_id)
105112

@@ -146,18 +153,13 @@ async def output_task(self, msg_type, cell_id, content):
146153
return
147154

148155
# Convert from the message spec to the nbformat output structure
149-
output = self.transform_output(msg_type, content, ydoc=False)
150-
output_url = self.outputs_manager.write(file_id, cell_id, output)
151-
nb_output = Map({
152-
"output_type": "display_data",
153-
"data": {
154-
'text/html': f'<a href="{output_url}">Output</a>'
155-
},
156-
"metadata": {
157-
"outputs_service": True
158-
}
159-
})
160-
target_cell["outputs"].append(nb_output)
156+
if self.use_outputs_service:
157+
output = self.transform_output(msg_type, content, ydoc=False)
158+
output = self.outputs_manager.write(file_id, cell_id, output)
159+
else:
160+
output = self.transform_output(msg_type, content, ydoc=True)
161+
if output is not None:
162+
target_cell["outputs"].append(output)
161163

162164
def find_cell(self, cell_id, cells):
163165
"""Find a cell with a given cell_id in the list of cells.

0 commit comments

Comments
 (0)