Skip to content
Merged
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ for the frontend extension.
## Try it out

Run with the proper configuration

```
jupyter lab --config jupyter_config.py
```


## Requirements

- JupyterLab >= 4.0.0
Expand Down Expand Up @@ -62,7 +62,6 @@ Activating an environment is required to access any Python packages installed in
that environment. You should activate the environment before developing any
changes to the `jupyter_rtc_core` package locally.


### Development install

After ensuring that the `rtccore` environment is activated, you can install an
Expand Down Expand Up @@ -116,7 +115,6 @@ Here is a summary of the commands to run after making changes:
- Finally, refresh the JupyterLab page in the browser to load the new
frontend assets and use the new backend.


### Building on change (frontend only)

You can watch the source directory and run JupyterLab at the same time in
Expand Down
3 changes: 3 additions & 0 deletions jupyter_rtc_core/outputs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .handlers import outputs_handlers
from .manager import OutputsManager
from .output_processor import OutputProcessor
94 changes: 94 additions & 0 deletions jupyter_rtc_core/outputs/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import json

from tornado import web

from jupyter_server.auth.decorator import authorized
from jupyter_server.base.handlers import APIHandler
from jupyter_server.utils import url_path_join


class OutputsAPIHandler(APIHandler):
"""An outputs service API handler."""

auth_resource = "outputs"

@property
def outputs(self):
return self.settings["outputs_manager"]

@web.authenticated
@authorized
async def get(self, file_id=None, cell_id=None, output_index=None):
try:
output = self.outputs.get_output(file_id, cell_id, output_index)
except (FileNotFoundError, KeyError):
self.set_status(404)
self.finish({"error": "Output not found."})
else:
self.set_status(200)
self.set_header("Content-Type", "application/json")
self.write(output)


class StreamAPIHandler(APIHandler):
"""An outputs service API handler."""

auth_resource = "outputs"

@property
def outputs(self):
return self.settings["outputs_manager"]

@web.authenticated
@authorized
async def get(self, file_id=None, cell_id=None):
try:
output = self.outputs.get_stream(file_id, cell_id)
except (FileNotFoundError, KeyError):
self.set_status(404)
self.finish({"error": "Stream output not found."})
else:
# self.set_header("Content-Type", "text/plain; charset=uft-8")
self.set_header("Cache-Control", "no-store, no-cache, must-revalidate, max-age=0")
self.set_header("Pragma", "no-cache")
self.set_header("Expires", "0")
self.set_status(200)
self.write(output)
self.finish(set_content_type="text/plain; charset=utf-8")


# -----------------------------------------------------------------------------
# URL to handler mappings
# -----------------------------------------------------------------------------

# Strict UUID regex (matches standard 8-4-4-4-12 UUIDs)
_uuid_regex = r"[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}"

_file_id_regex = rf"(?P<file_id>{_uuid_regex})"
_cell_id_regex = rf"(?P<cell_id>{_uuid_regex})"

# non-negative integers
_output_index_regex = r"(?P<output_index>0|[1-9]\d*)"

outputs_handlers = [
(rf"/api/outputs/{_file_id_regex}/{_cell_id_regex}/{_output_index_regex}.output", OutputsAPIHandler),
(rf"/api/outputs/{_file_id_regex}/{_cell_id_regex}/stream", StreamAPIHandler),
]

# def setup_handlers(web_app):
# """Setup the handlers for the outputs service."""

# handlers = [
# (rf"/api/outputs/{_file_id_regex}/{_cell_id_regex}/{_output_index_regex}.output", OutputsAPIHandler),
# (rf"/api/outputs/{_file_id_regex}/{_cell_id_regex}/stream", StreamAPIHandler),
# ]

# base_url = web_app.settings["base_url"]
# new_handlers = []
# for handler in handlers:
# pattern = url_path_join(base_url, handler[0])
# new_handler = (pattern, *handler[1:])
# new_handlers.append(new_handler)

# # Add the handler for all hosts
# web_app.add_handlers(".*$", new_handlers)
133 changes: 133 additions & 0 deletions jupyter_rtc_core/outputs/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import json
import os
from pathlib import Path, PurePath
import shutil

from pycrdt import Map

from traitlets.config import LoggingConfigurable
from traitlets import (
Any,
Bool,
Dict,
Instance,
List,
TraitError,
Type,
Unicode,
Int,
default,
validate,
)

from jupyter_core.paths import jupyter_runtime_dir

class OutputsManager(LoggingConfigurable):

_last_output_index = Dict(default_value={})
_stream_count = Dict(default_value={})

outputs_path = Instance(PurePath, help="The local runtime dir")
stream_limit = Int(default_value=10, config=True, allow_none=True)

@default("outputs_path")
def _default_outputs_path(self):
return Path(jupyter_runtime_dir()) / "outputs"

def _ensure_path(self, file_id, cell_id):
nested_dir = self.outputs_path / file_id / cell_id
self.log.info(f"Creating directory: {nested_dir}")
nested_dir.mkdir(parents=True, exist_ok=True)

def _build_path(self, file_id, cell_id=None, output_index=None):
path = self.outputs_path / file_id
if cell_id is not None:
path = path / cell_id
if output_index is not None:
path = path / f"{output_index}.output"
return path

def get_output(self, file_id, cell_id, output_index):
"""Get an outputs by file_id, cell_id, and output_index."""
self.log.info(f"OutputsManager.get: {file_id} {cell_id} {output_index}")
path = self._build_path(file_id, cell_id, output_index)
if not os.path.isfile(path):
raise FileNotFoundError(f"The output file doesn't exist: {path}")
with open(path, "r", encoding="utf-8") as f:
output = json.loads(f.read())
return output

def get_stream(self, file_id, cell_id):
"Get the stream output for a cell by file_id and cell_id."
path = self._build_path(file_id, cell_id) / "stream"
if not os.path.isfile(path):
raise FileNotFoundError(f"The output file doesn't exist: {path}")
with open(path, "r", encoding="utf-8") as f:
output = f.read()
return output

def write(self, file_id, cell_id, output):
"""Write a new output for file_id and cell_id."""
self.log.info(f"OutputsManager.write: {file_id} {cell_id} {output}")
result = self.write_output(file_id, cell_id, output)
if output["output_type"] == "stream" and self.stream_limit is not None:
result = self.write_stream(file_id, cell_id, output)
return result

def write_output(self, file_id, cell_id, output):
self._ensure_path(file_id, cell_id)
last_index = self._last_output_index.get(cell_id, -1)
index = last_index + 1
self._last_output_index[cell_id] = index
path = self._build_path(file_id, cell_id, index)
data = json.dumps(output, ensure_ascii=False)
with open(path, "w", encoding="utf-8") as f:
f.write(data)
url = f"/api/outputs/{file_id}/{cell_id}/{index}.output"
return Map({})

def write_stream(self, file_id, cell_id, output) -> Map:
# How many stream outputs have been written for this cell previously
count = self._stream_count.get(cell_id, 0)

# Go ahead and write the incoming stream
self._ensure_path(file_id, cell_id)
path = self._build_path(file_id, cell_id) / "stream"
text = output["text"]
mode = 'a' if os.path.isfile(path) else 'w'
with open(path, "a", encoding="utf-8") as f:
f.write(text)
url = f"/api/outputs/{file_id}/{cell_id}/stream"

# Increment the count
count = count + 1
self._stream_count[cell_id] = count

# Now create the placeholder output
if count < self.stream_limit:
# Return the original if we haven't reached the limit
placeholder = Map(output)
elif count == self.stream_limit:
# Return a link to the full stream output
placeholder = Map({
"output_type": "display_data",
"data": {
'text/html': f'<a href="{url}">Click this link to see the full stream output</a>'
}
})
elif count > self.stream_limit:
placeholder = None
return placeholder

def clear(self, file_id, cell_id=None):
"""Clear the state of the manager."""
if cell_id is None:
self._stream_count = {}
path = self._build_path(file_id)
else:
try:
del self._stream_count[cell_id]
except KeyError:
pass
path = self._build_path(file_id, cell_id)
shutil.rmtree(path)
Loading
Loading