Skip to content

Commit 04c7852

Browse files
authored
Add server side outputs handling (#30)
* Adding the output processor. * Adding the outputs manager. * Adding the outputs handlers. * Setup test files for outputs. * Init file for outputs. * Added test_stream. * Finish tests for outputs manager. * Minor cleanup on outputs handlers. * Work on outputs processor tests. * Update tests for output processor. * More intelligent stream handling. * Fix clear. * Adding comments about exceptional cases. * Running linter.
1 parent 3ee45ea commit 04c7852

File tree

9 files changed

+629
-6
lines changed

9 files changed

+629
-6
lines changed

README.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@ for the frontend extension.
1111
## Try it out
1212

1313
Run with the proper configuration
14+
1415
```
1516
jupyter lab --config jupyter_config.py
1617
```
1718

18-
1919
## Requirements
2020

2121
- JupyterLab >= 4.0.0
@@ -62,7 +62,6 @@ Activating an environment is required to access any Python packages installed in
6262
that environment. You should activate the environment before developing any
6363
changes to the `jupyter_rtc_core` package locally.
6464

65-
6665
### Development install
6766

6867
After ensuring that the `rtccore` environment is activated, you can install an
@@ -116,7 +115,6 @@ Here is a summary of the commands to run after making changes:
116115
- Finally, refresh the JupyterLab page in the browser to load the new
117116
frontend assets and use the new backend.
118117

119-
120118
### Building on change (frontend only)
121119

122120
You can watch the source directory and run JupyterLab at the same time in
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .handlers import outputs_handlers
2+
from .manager import OutputsManager
3+
from .output_processor import OutputProcessor
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import json
2+
3+
from tornado import web
4+
5+
from jupyter_server.auth.decorator import authorized
6+
from jupyter_server.base.handlers import APIHandler
7+
from jupyter_server.utils import url_path_join
8+
9+
10+
class OutputsAPIHandler(APIHandler):
11+
"""An outputs service API handler."""
12+
13+
auth_resource = "outputs"
14+
15+
@property
16+
def outputs(self):
17+
return self.settings["outputs_manager"]
18+
19+
@web.authenticated
20+
@authorized
21+
async def get(self, file_id=None, cell_id=None, output_index=None):
22+
try:
23+
output = self.outputs.get_output(file_id, cell_id, output_index)
24+
except (FileNotFoundError, KeyError):
25+
self.set_status(404)
26+
self.finish({"error": "Output not found."})
27+
else:
28+
self.set_status(200)
29+
self.set_header("Content-Type", "application/json")
30+
self.write(output)
31+
32+
33+
class StreamAPIHandler(APIHandler):
34+
"""An outputs service API handler."""
35+
36+
auth_resource = "outputs"
37+
38+
@property
39+
def outputs(self):
40+
return self.settings["outputs_manager"]
41+
42+
@web.authenticated
43+
@authorized
44+
async def get(self, file_id=None, cell_id=None):
45+
try:
46+
output = self.outputs.get_stream(file_id, cell_id)
47+
except (FileNotFoundError, KeyError):
48+
self.set_status(404)
49+
self.finish({"error": "Stream output not found."})
50+
else:
51+
# self.set_header("Content-Type", "text/plain; charset=uft-8")
52+
self.set_header("Cache-Control", "no-store, no-cache, must-revalidate, max-age=0")
53+
self.set_header("Pragma", "no-cache")
54+
self.set_header("Expires", "0")
55+
self.set_status(200)
56+
self.write(output)
57+
self.finish(set_content_type="text/plain; charset=utf-8")
58+
59+
60+
# -----------------------------------------------------------------------------
61+
# URL to handler mappings
62+
# -----------------------------------------------------------------------------
63+
64+
# Strict UUID regex (matches standard 8-4-4-4-12 UUIDs)
65+
_uuid_regex = r"[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}"
66+
67+
_file_id_regex = rf"(?P<file_id>{_uuid_regex})"
68+
_cell_id_regex = rf"(?P<cell_id>{_uuid_regex})"
69+
70+
# non-negative integers
71+
_output_index_regex = r"(?P<output_index>0|[1-9]\d*)"
72+
73+
outputs_handlers = [
74+
(rf"/api/outputs/{_file_id_regex}/{_cell_id_regex}/{_output_index_regex}.output", OutputsAPIHandler),
75+
(rf"/api/outputs/{_file_id_regex}/{_cell_id_regex}/stream", StreamAPIHandler),
76+
]
77+
78+
# def setup_handlers(web_app):
79+
# """Setup the handlers for the outputs service."""
80+
81+
# handlers = [
82+
# (rf"/api/outputs/{_file_id_regex}/{_cell_id_regex}/{_output_index_regex}.output", OutputsAPIHandler),
83+
# (rf"/api/outputs/{_file_id_regex}/{_cell_id_regex}/stream", StreamAPIHandler),
84+
# ]
85+
86+
# base_url = web_app.settings["base_url"]
87+
# new_handlers = []
88+
# for handler in handlers:
89+
# pattern = url_path_join(base_url, handler[0])
90+
# new_handler = (pattern, *handler[1:])
91+
# new_handlers.append(new_handler)
92+
93+
# # Add the handler for all hosts
94+
# web_app.add_handlers(".*$", new_handlers)
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import json
2+
import os
3+
from pathlib import Path, PurePath
4+
import shutil
5+
6+
from pycrdt import Map
7+
8+
from traitlets.config import LoggingConfigurable
9+
from traitlets import (
10+
Any,
11+
Bool,
12+
Dict,
13+
Instance,
14+
List,
15+
TraitError,
16+
Type,
17+
Unicode,
18+
Int,
19+
default,
20+
validate,
21+
)
22+
23+
from jupyter_core.paths import jupyter_runtime_dir
24+
25+
class OutputsManager(LoggingConfigurable):
26+
27+
_last_output_index = Dict(default_value={})
28+
_stream_count = Dict(default_value={})
29+
30+
outputs_path = Instance(PurePath, help="The local runtime dir")
31+
stream_limit = Int(default_value=10, config=True, allow_none=True)
32+
33+
@default("outputs_path")
34+
def _default_outputs_path(self):
35+
return Path(jupyter_runtime_dir()) / "outputs"
36+
37+
def _ensure_path(self, file_id, cell_id):
38+
nested_dir = self.outputs_path / file_id / cell_id
39+
self.log.info(f"Creating directory: {nested_dir}")
40+
nested_dir.mkdir(parents=True, exist_ok=True)
41+
42+
def _build_path(self, file_id, cell_id=None, output_index=None):
43+
path = self.outputs_path / file_id
44+
if cell_id is not None:
45+
path = path / cell_id
46+
if output_index is not None:
47+
path = path / f"{output_index}.output"
48+
return path
49+
50+
def get_output(self, file_id, cell_id, output_index):
51+
"""Get an outputs by file_id, cell_id, and output_index."""
52+
self.log.info(f"OutputsManager.get: {file_id} {cell_id} {output_index}")
53+
path = self._build_path(file_id, cell_id, output_index)
54+
if not os.path.isfile(path):
55+
raise FileNotFoundError(f"The output file doesn't exist: {path}")
56+
with open(path, "r", encoding="utf-8") as f:
57+
output = json.loads(f.read())
58+
return output
59+
60+
def get_stream(self, file_id, cell_id):
61+
"Get the stream output for a cell by file_id and cell_id."
62+
path = self._build_path(file_id, cell_id) / "stream"
63+
if not os.path.isfile(path):
64+
raise FileNotFoundError(f"The output file doesn't exist: {path}")
65+
with open(path, "r", encoding="utf-8") as f:
66+
output = f.read()
67+
return output
68+
69+
def write(self, file_id, cell_id, output):
70+
"""Write a new output for file_id and cell_id."""
71+
self.log.info(f"OutputsManager.write: {file_id} {cell_id} {output}")
72+
result = self.write_output(file_id, cell_id, output)
73+
if output["output_type"] == "stream" and self.stream_limit is not None:
74+
result = self.write_stream(file_id, cell_id, output)
75+
return result
76+
77+
def write_output(self, file_id, cell_id, output):
78+
self._ensure_path(file_id, cell_id)
79+
last_index = self._last_output_index.get(cell_id, -1)
80+
index = last_index + 1
81+
self._last_output_index[cell_id] = index
82+
path = self._build_path(file_id, cell_id, index)
83+
data = json.dumps(output, ensure_ascii=False)
84+
with open(path, "w", encoding="utf-8") as f:
85+
f.write(data)
86+
url = f"/api/outputs/{file_id}/{cell_id}/{index}.output"
87+
return Map({})
88+
89+
def write_stream(self, file_id, cell_id, output) -> Map:
90+
# How many stream outputs have been written for this cell previously
91+
count = self._stream_count.get(cell_id, 0)
92+
93+
# Go ahead and write the incoming stream
94+
self._ensure_path(file_id, cell_id)
95+
path = self._build_path(file_id, cell_id) / "stream"
96+
text = output["text"]
97+
mode = 'a' if os.path.isfile(path) else 'w'
98+
with open(path, "a", encoding="utf-8") as f:
99+
f.write(text)
100+
url = f"/api/outputs/{file_id}/{cell_id}/stream"
101+
102+
# Increment the count
103+
count = count + 1
104+
self._stream_count[cell_id] = count
105+
106+
# Now create the placeholder output
107+
if count < self.stream_limit:
108+
# Return the original if we haven't reached the limit
109+
placeholder = Map(output)
110+
elif count == self.stream_limit:
111+
# Return a link to the full stream output
112+
placeholder = Map({
113+
"output_type": "display_data",
114+
"data": {
115+
'text/html': f'<a href="{url}">Click this link to see the full stream output</a>'
116+
}
117+
})
118+
elif count > self.stream_limit:
119+
placeholder = None
120+
return placeholder
121+
122+
def clear(self, file_id, cell_id=None):
123+
"""Clear the state of the manager."""
124+
if cell_id is None:
125+
self._stream_count = {}
126+
path = self._build_path(file_id)
127+
else:
128+
try:
129+
del self._stream_count[cell_id]
130+
except KeyError:
131+
pass
132+
path = self._build_path(file_id, cell_id)
133+
shutil.rmtree(path)

0 commit comments

Comments
 (0)