From 49062c5e9ec5842f06470f369b1c7be0defee5fc Mon Sep 17 00:00:00 2001 From: "Brian E. Granger" Date: Mon, 12 May 2025 15:39:35 -0700 Subject: [PATCH 01/14] Adding the output processor. --- jupyter_rtc_core/outputs/output_processor.py | 233 +++++++++++++++++++ 1 file changed, 233 insertions(+) create mode 100644 jupyter_rtc_core/outputs/output_processor.py diff --git a/jupyter_rtc_core/outputs/output_processor.py b/jupyter_rtc_core/outputs/output_processor.py new file mode 100644 index 0000000..dede3a6 --- /dev/null +++ b/jupyter_rtc_core/outputs/output_processor.py @@ -0,0 +1,233 @@ +import asyncio +import json + +from pycrdt import Map + +from traitlets import LoggingConfigurable, Dict, Unicode + + +class OutputProcessor(LoggingConfigurable): + + _cell_ids = Dict(default_value={}) # a map from msg_id -> cell_id + _cell_indices = Dict(default_value={}) # a map from cell_id -> cell index in notebook + _file_id = Unicode(default_value=None, allow_None=True) + + @property + def settings(self): + """A shortcut for the Tornado web app settings.""" + return self.parent.parent.webapp.settings + + @property + def kernel_client(self): + """A shortcut to the kernel client this output processor is attached to.""" + return self.parent + + @property + def outputs_manager(self): + """A shortcut for the OutputsManager instance.""" + return self.settings["outputs_manager"] + + @property + def session_manager(self): + """A shortcut for the kernel session manager.""" + return self.settings["session_manager"] + + @property + def file_id_manager(self): + """A shortcut for the file id manager.""" + return self.settings["file_id_manager"] + + @property + def jupyter_server_ydoc(self): + """A shortcut for the jupyter server ydoc manager.""" + return self.settings["jupyter_server_ydoc"] + + def clear(self, cell_id=None): + """Clear the state of the output processor. + + This clears the state (saved msg_ids, cell_ids, cell indices) for the output + processor. If cell_id is provided, only the state for that cell is cleared. + """ + if cell_id is None: + self._cell_ids = {} + self._cell_indices = {} + else: + msg_id = self.get_msg_id(cell_id) + if (msg_id is not None) and (msg_id in self._cell_ids): del self._cell_ids[msg_id] + if cell_id in self._cell_indices: del self._cell_indices[cell_id] + + def set_cell_id(self, msg_id, cell_id): + """Set the cell_id for a msg_id.""" + self._cell_ids[msg_id] = cell_id + + def get_cell_id(self, msg_id): + """Retrieve a cell_id from a parent msg_id.""" + return self._cell_ids.get(msg_id) + + def get_msg_id(self, cell_id): + """Retrieve a msg_id from a cell_id.""" + return {v: k for k, v in self._cell_ids.items()}.get(cell_id) + + # Incoming messages + + def process_incoming_message(self, channel: str, msg: list[bytes]): + """Process incoming messages from the frontend. + + Save the cell_id <-> msg_id mapping + + msg = [p_header,p_parent,p_metadata,p_content,buffer1,buffer2,...] + + This method is used to create a map between cell_id and msg_id. + Incoming execute_request messages have both a cell_id and msg_id. + When output messages are send back to the frontend, this map is used + to find the cell_id for a given parent msg_id. + """ + header = json.loads(msg[0]) + msg_id = header["msg_id"] + msg_type = header["msg_type"] + metadata = json.loads(msg[2]) + cell_id = metadata.get("cellId") + if cell_id is None: + return + + existing_msg_id = self.get_msg_id(cell_id) + if existing_msg_id != msg_id: # cell is being re-run, clear output state + if self._file_id is not None: + self.log.info(f"Cell has been rerun, removing old outputs: {self._file_id} {cell_id}") + self.clear(cell_id) + self.outputs_manager.clear(file_id=self._file_id, cell_id=cell_id) + self.log.info(f"Saving (msg_id, cell_id): ({msg_id} {cell_id})") + self.set_cell_id(msg_id, cell_id) + + # Outgoing messages + + def process_outgoing_message(self, channel: str, msg: list[bytes]): + """Process outgoing messagers from the kernel.""" + dmsg = self.kernel_client.session.deserialize(msg) + msg_type = dmsg["header"]["msg_type"] + msg_id = dmsg["parent_header"]["msg_id"] + content = dmsg["content"] + cell_id = self.get_cell_id(msg_id) + if cell_id is None: + return + asyncio.create_task(self.output_task(msg_type, cell_id, content)) + return None # Don't allow the original message to propagate to the frontend + + async def output_task(self, msg_type, cell_id, content): + """A coroutine to handle output messages.""" + try: + kernel_session = await self.session_manager.get_session(kernel_id=self.kernel_id) + except: # what exception to catch? + return + else: + path = kernel_session["path"] + + file_id = self.file_id_manager.get_id(path) + if file_id is None: + return + self._file_id = file_id + try: + notebook = await self.jupyter_server_ydoc.get_document( + path=path, + copy=False, + file_format='json + content_type='notebook' + ) + except: # what exception to catch? + return + cells = notebook.ycells + + cell_index, target_cell = self.find_cell(cell_id, cells) + if target_cell is None: + return + + # Convert from the message spec to the nbformat output structure + output = self.transform_output(msg_type, content, ydoc=False) + output_url = self.outputs_manager.write(file_id, cell_id, output) + nb_output = Map({ + "output_type": "display_data", + "data": { + 'text/html': f'Output' + }, + "metadata": { + "outputs_service": True + } + }) + target_cell["outputs"].append(nb_output) + + def find_cell(self, cell_id, cells): + """Find a cell with a given cell_id in the list of cells. + + This uses caching if we have seen the cell previously. + """ + # Find the target_cell and its cell_index and cache + target_cell = None + cell_index = None + try: + # See if we have a cached value for the cell_index + cell_index = self._cell_indices[cell_id] + target_cell = cells[cell_index] + except KeyError: + # Do a linear scan to find the cell + self.log.info(f"Linear scan: {cell_id}") + cell_index, target_cell = self.scan_cells(cell_id, cells) + else: + # Verify that the cached value still matches + if target_cell["id"] != cell_id: + self.log.info(f"Invalid cache hit: {cell_id}") + cell_index, target_cell = self.scan_cells(cell_id, cells) + else: + self.log.info(f"Validated cache hit: {cell_id}") + return cell_index, target_cell + + def scan_cells(self, cell_id, cells): + """Find the cell with a given cell_id in the list of cells. + + This does a simple linear scan of the cells, but in reverse order because + we believe that users are more often running cells towards the end of the + notebook. + """ + target_cell = None + cell_index = None + for i in reversed(range(0, len(cells))): + cell = cells[i] + if cell["id"] == cell_id: + target_cell = cell + cell_index = i + self._cell_indices[cell_id] = cell_index + break + return cell_index, target_cell + + def transform_output(self, msg_type, content, ydoc=False): + """Transform output from IOPub messages to the nbformat specification.""" + if ydoc: + factory = Map + else: + factory = lambda x: x + if msg_type == "stream": + output = factory({ + "output_type": "stream", + "text": content["text"], + "name": content["name"] + }) + elif msg_type == "display_data": + output = factory({ + "output_type": "display_data", + "data": content["data"], + "metadata": content["metadata"] + }) + elif msg_type == "execute_result": + output = factory({ + "output_type": "execute_result", + "data": content["data"], + "metadata": content["metadata"], + "execution_count": content["execution_count"] + }) + elif msg_type == "error": + output = factory({ + "output_type": "error", + "traceback": content["traceback"], + "ename": content["ename"], + "evalue": content["evalue"] + }) + return output From 3c22892af7094b467a4271dd8df030cb5a880383 Mon Sep 17 00:00:00 2001 From: "Brian E. Granger" Date: Mon, 12 May 2025 15:39:43 -0700 Subject: [PATCH 02/14] Adding the outputs manager. --- jupyter_rtc_core/outputs/manager.py | 100 ++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 jupyter_rtc_core/outputs/manager.py diff --git a/jupyter_rtc_core/outputs/manager.py b/jupyter_rtc_core/outputs/manager.py new file mode 100644 index 0000000..f4ccfdf --- /dev/null +++ b/jupyter_rtc_core/outputs/manager.py @@ -0,0 +1,100 @@ +import json +import os +from pathlib import Path, PurePath +import shutil + + +from traitlets.config.configurable import LoggingConfigurable +from traitlets import ( + Any, + Bool, + Dict, + Instance, + List, + TraitError, + Type, + Unicode, + default, + validate, +) + +from jupyter_core.paths import jupyter_runtime_dir + +class OutputsManager(LoggingConfigurable): + + outputs_path = Instance(PurePath, help="The local runtime dir") + _last_output_index = Dict(default_value={}) + + @default("outputs_path") + def _default_outputs_path(self): + return Path(jupyter_runtime_dir()) / "outputs" + + def _ensure_path(self, file_id, cell_id): + nested_dir = self.outputs_path / file_id / cell_id + self.log.info(f"Creating directory: {nested_dir}") + nested_dir.mkdir(parents=True, exist_ok=True) + + def _build_path(self, file_id, cell_id=None, output_index=None): + path = self.outputs_path / file_id + if cell_id is not None: + path = path / cell_id + if output_index is not None: + path = path / f"{output_index}.output" + return path + + def get(self, file_id, cell_id, output_index): + """Get an outputs by file_id, cell_id, and output_index.""" + self.log.info(f"OutputsManager.get: {file_id} {cell_id} {output_index}") + path = self._build_path(file_id, cell_id, output_index) + if not os.path.isfile(path): + raise FileNotFoundError(f"The output file doesn't exist: {path}") + with open(path, "r", encoding="utf-8") as f: + output = json.loads(f.read()) + return output + + def get_stream(self, file_id, cell_id): + "Get the stream output for a cell by file_id and cell_id." + path = self._build_path(file_id, cell_id) / "stream" + if not os.path.isfile(path): + raise FileNotFoundError(f"The output file doesn't exist: {path}") + with open(path, "r", encoding="utf-8") as f: + output = f.read() + return output + + def write(self, file_id, cell_id, output): + """Write a new output for file_id and cell_id.""" + self.log.info(f"OutputsManager.write: {file_id} {cell_id} {output}") + if output["output_type"] == "stream": + url = self.write_stream(file_id, cell_id, output) + else: + url = self.write_output(file_id, cell_id, output) + return url + + def write_output(self, file_id, cell_id, output): + self._ensure_path(file_id, cell_id) + last_index = self._last_output_index.get(cell_id, -1) + index = last_index + 1 + self._last_output_index[cell_id] = index + path = self._build_path(file_id, cell_id, index) + data = json.dumps(output, ensure_ascii=False) + with open(path, "w", encoding="utf-8") as f: + f.write(data) + url = f"/api/outputs/{file_id}/{cell_id}/{index}.output" + return url + + def write_stream(self, file_id, cell_id, output): + self._ensure_path(file_id, cell_id) + path = self._build_path(file_id, cell_id) / "stream" + text = output["text"] + mode = 'a' if os.path.isfile(path) else 'w' + with open(path, "a", encoding="utf-8") as f: + f.write(text) + url = f"/api/outputs/{file_id}/{cell_id}/stream" + return url + + def clear(self, file_id, cell_id=None): + path = self._build_path(file_id, cell_id) + shutil.rmtree(path) + + + From 79e3526834d2bbb94aa67099710424a2a8c0e9c6 Mon Sep 17 00:00:00 2001 From: "Brian E. Granger" Date: Mon, 12 May 2025 15:40:00 -0700 Subject: [PATCH 03/14] Adding the outputs handlers. --- jupyter_rtc_core/outputs/handlers.py | 97 ++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 jupyter_rtc_core/outputs/handlers.py diff --git a/jupyter_rtc_core/outputs/handlers.py b/jupyter_rtc_core/outputs/handlers.py new file mode 100644 index 0000000..08aff20 --- /dev/null +++ b/jupyter_rtc_core/outputs/handlers.py @@ -0,0 +1,97 @@ +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. + +import json + +from tornado import web + +from jupyter_server.auth.decorator import authorized +from jupyter_server.base.handlers import APIHandler +from jupyter_server.utils import url_path_join + + +class OutputsAPIHandler(APIHandler): + """An outputs service API handler.""" + + auth_resource = "outputs" + + @property + def outputs(self): + return self.settings["outputs_manager"] + + @web.authenticated + @authorized + async def get(self, file_id=None, cell_id=None, output_index=None): + try: + output = self.outputs.get(file_id, cell_id, output_index) + except (FileNotFoundError, KeyError): + self.set_status(404) + self.finish({"error": "Output not found."}) + else: + self.set_status(200) + self.set_header("Content-Type", "application/json") + self.write(output) + + +class StreamAPIHandler(APIHandler): + """An outputs service API handler.""" + + auth_resource = "outputs" + + @property + def outputs(self): + return self.settings["outputs_manager"] + + @web.authenticated + @authorized + async def get(self, file_id=None, cell_id=None): + try: + output = self.outputs.get_stream(file_id, cell_id) + except (FileNotFoundError, KeyError): + self.set_status(404) + self.finish({"error": "Stream output not found."}) + else: + # self.set_header("Content-Type", "text/plain; charset=uft-8") + self.set_header("Cache-Control", "no-store, no-cache, must-revalidate, max-age=0") + self.set_header("Pragma", "no-cache") + self.set_header("Expires", "0") + self.set_status(200) + self.write(output) + self.finish(set_content_type="text/plain; charset=utf-8") + + +# ----------------------------------------------------------------------------- +# URL to handler mappings +# ----------------------------------------------------------------------------- + +# Strict UUID regex (matches standard 8-4-4-4-12 UUIDs) +_uuid_regex = r"[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}" + +_file_id_regex = rf"(?P{_uuid_regex})" +_cell_id_regex = rf"(?P{_uuid_regex})" + +# non-negative integers +_output_index_regex = r"(?P0|[1-9]\d*)" + +handlers = [ + (rf"/api/outputs/{_file_id_regex}/{_cell_id_regex}/{_output_index_regex}.output", OutputsAPIHandler), + (rf"/api/outputs/{_file_id_regex}/{_cell_id_regex}/stream", StreamAPIHandler), +] + +# def setup_handlers(web_app): +# """Setup the handlers for the outputs service.""" + +# handlers = [ +# (rf"/api/outputs/{_file_id_regex}/{_cell_id_regex}/{_output_index_regex}.output", OutputsAPIHandler), +# (rf"/api/outputs/{_file_id_regex}/{_cell_id_regex}/stream", StreamAPIHandler), +# ] + +# base_url = web_app.settings["base_url"] +# new_handlers = [] +# for handler in handlers: +# pattern = url_path_join(base_url, handler[0]) +# new_handler = (pattern, *handler[1:]) +# new_handlers.append(new_handler) + +# # Add the handler for all hosts +# web_app.add_handlers(".*$", new_handlers) From 39c8afc2fa75a584fd7c6e087e7ba243b7da2095 Mon Sep 17 00:00:00 2001 From: "Brian E. Granger" Date: Mon, 12 May 2025 16:02:56 -0700 Subject: [PATCH 04/14] Setup test files for outputs. --- jupyter_rtc_core/outputs/handlers.py | 2 +- jupyter_rtc_core/outputs/manager.py | 2 +- jupyter_rtc_core/outputs/output_processor.py | 5 +++-- jupyter_rtc_core/tests/test_output_processor.py | 4 ++++ jupyter_rtc_core/tests/test_outputs_manager.py | 5 +++++ 5 files changed, 14 insertions(+), 4 deletions(-) create mode 100644 jupyter_rtc_core/tests/test_output_processor.py create mode 100644 jupyter_rtc_core/tests/test_outputs_manager.py diff --git a/jupyter_rtc_core/outputs/handlers.py b/jupyter_rtc_core/outputs/handlers.py index 08aff20..d31d4e6 100644 --- a/jupyter_rtc_core/outputs/handlers.py +++ b/jupyter_rtc_core/outputs/handlers.py @@ -73,7 +73,7 @@ async def get(self, file_id=None, cell_id=None): # non-negative integers _output_index_regex = r"(?P0|[1-9]\d*)" -handlers = [ +outputs_handlers = [ (rf"/api/outputs/{_file_id_regex}/{_cell_id_regex}/{_output_index_regex}.output", OutputsAPIHandler), (rf"/api/outputs/{_file_id_regex}/{_cell_id_regex}/stream", StreamAPIHandler), ] diff --git a/jupyter_rtc_core/outputs/manager.py b/jupyter_rtc_core/outputs/manager.py index f4ccfdf..3b67d61 100644 --- a/jupyter_rtc_core/outputs/manager.py +++ b/jupyter_rtc_core/outputs/manager.py @@ -4,7 +4,7 @@ import shutil -from traitlets.config.configurable import LoggingConfigurable +from traitlets.config import LoggingConfigurable from traitlets import ( Any, Bool, diff --git a/jupyter_rtc_core/outputs/output_processor.py b/jupyter_rtc_core/outputs/output_processor.py index dede3a6..02184d8 100644 --- a/jupyter_rtc_core/outputs/output_processor.py +++ b/jupyter_rtc_core/outputs/output_processor.py @@ -3,7 +3,8 @@ from pycrdt import Map -from traitlets import LoggingConfigurable, Dict, Unicode +from traitlets import Dict, Unicode +from traitlets.config import LoggingConfigurable class OutputProcessor(LoggingConfigurable): @@ -130,7 +131,7 @@ async def output_task(self, msg_type, cell_id, content): notebook = await self.jupyter_server_ydoc.get_document( path=path, copy=False, - file_format='json + file_format='json', content_type='notebook' ) except: # what exception to catch? diff --git a/jupyter_rtc_core/tests/test_output_processor.py b/jupyter_rtc_core/tests/test_output_processor.py new file mode 100644 index 0000000..0f753ba --- /dev/null +++ b/jupyter_rtc_core/tests/test_output_processor.py @@ -0,0 +1,4 @@ +from ..outputs import OutputProcessor + +def test_instantiation(): + op = OutputProcessor() \ No newline at end of file diff --git a/jupyter_rtc_core/tests/test_outputs_manager.py b/jupyter_rtc_core/tests/test_outputs_manager.py new file mode 100644 index 0000000..68d9d6f --- /dev/null +++ b/jupyter_rtc_core/tests/test_outputs_manager.py @@ -0,0 +1,5 @@ +from ..outputs import OutputsManager + +def test_instantiation(): + op = OutputsManager() + From 3471224b5f8ad1503aae1ba3d555a3de9c46d86d Mon Sep 17 00:00:00 2001 From: "Brian E. Granger" Date: Mon, 12 May 2025 16:03:07 -0700 Subject: [PATCH 05/14] Init file for outputs. --- jupyter_rtc_core/outputs/__init__.py | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 jupyter_rtc_core/outputs/__init__.py diff --git a/jupyter_rtc_core/outputs/__init__.py b/jupyter_rtc_core/outputs/__init__.py new file mode 100644 index 0000000..04c7607 --- /dev/null +++ b/jupyter_rtc_core/outputs/__init__.py @@ -0,0 +1,3 @@ +from .handlers import outputs_handlers +from .manager import OutputsManager +from .output_processor import OutputProcessor From fa5e71813456b1f14ae4751c00c54eb6c6933fcd Mon Sep 17 00:00:00 2001 From: "Brian E. Granger" Date: Mon, 12 May 2025 17:18:48 -0700 Subject: [PATCH 06/14] Added test_stream. --- .../tests/test_outputs_manager.py | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/jupyter_rtc_core/tests/test_outputs_manager.py b/jupyter_rtc_core/tests/test_outputs_manager.py index 68d9d6f..bb9ade6 100644 --- a/jupyter_rtc_core/tests/test_outputs_manager.py +++ b/jupyter_rtc_core/tests/test_outputs_manager.py @@ -1,5 +1,35 @@ +from tempfile import TemporaryDirectory +from pathlib import Path +from uuid import uuid4 + from ..outputs import OutputsManager +text = ''.join([str(i) for i in range(10)]) + +def so(i): + return { + "output_type": "stream", + "name": "stdout", + "text": str(i), + } + +stream_outputs = list([so(i) for i in range(10)]) + + def test_instantiation(): op = OutputsManager() + assert isinstance(op, OutputsManager) + +def test_stream(): + with TemporaryDirectory() as td: + op = OutputsManager() + op.outputs_path = Path(td) / "outputs" + file_id = str(uuid4()) + cell_id = str(uuid4()) + output_index = 0 + assert op._build_path(file_id, cell_id, output_index) == \ + op.outputs_path / file_id / cell_id / f"{output_index}.output" + for stream in stream_outputs: + op.write_stream(file_id, cell_id, stream) + assert op.get_stream(file_id, cell_id) == text From 8c2a0b3755be7e1c2dbc1fab54555c5dfed42796 Mon Sep 17 00:00:00 2001 From: "Brian E. Granger" Date: Mon, 12 May 2025 19:39:36 -0700 Subject: [PATCH 07/14] Finish tests for outputs manager. --- jupyter_rtc_core/outputs/manager.py | 2 +- .../tests/test_outputs_manager.py | 78 ++++++++++++++++--- 2 files changed, 69 insertions(+), 11 deletions(-) diff --git a/jupyter_rtc_core/outputs/manager.py b/jupyter_rtc_core/outputs/manager.py index 3b67d61..3bd349c 100644 --- a/jupyter_rtc_core/outputs/manager.py +++ b/jupyter_rtc_core/outputs/manager.py @@ -42,7 +42,7 @@ def _build_path(self, file_id, cell_id=None, output_index=None): path = path / f"{output_index}.output" return path - def get(self, file_id, cell_id, output_index): + def get_output(self, file_id, cell_id, output_index): """Get an outputs by file_id, cell_id, and output_index.""" self.log.info(f"OutputsManager.get: {file_id} {cell_id} {output_index}") path = self._build_path(file_id, cell_id, output_index) diff --git a/jupyter_rtc_core/tests/test_outputs_manager.py b/jupyter_rtc_core/tests/test_outputs_manager.py index bb9ade6..e172f7f 100644 --- a/jupyter_rtc_core/tests/test_outputs_manager.py +++ b/jupyter_rtc_core/tests/test_outputs_manager.py @@ -2,34 +2,92 @@ from pathlib import Path from uuid import uuid4 +import pytest + from ..outputs import OutputsManager -text = ''.join([str(i) for i in range(10)]) -def so(i): +def stream(text: str): return { "output_type": "stream", "name": "stdout", - "text": str(i), + "text": text } -stream_outputs = list([so(i) for i in range(10)]) - +def display_data_text(text: str): + return { + "output_type": "display_data", + "data": { + "text/plain": text + } +} def test_instantiation(): op = OutputsManager() assert isinstance(op, OutputsManager) +def test_paths(): + """Verify that the paths are working properly.""" + op = OutputsManager() + file_id = str(uuid4()) + cell_id = str(uuid4()) + with TemporaryDirectory() as td: + op.outputs_path = Path(td) / "outputs" + output_index = 0 + assert op._build_path(file_id, cell_id, output_index) == \ + op.outputs_path / file_id / cell_id / f"{output_index}.output" + def test_stream(): + """Test stream outputs.""" + text = "0123456789" + streams = list([stream(c) for c in text]) with TemporaryDirectory() as td: op = OutputsManager() op.outputs_path = Path(td) / "outputs" file_id = str(uuid4()) cell_id = str(uuid4()) - output_index = 0 - assert op._build_path(file_id, cell_id, output_index) == \ - op.outputs_path / file_id / cell_id / f"{output_index}.output" - for stream in stream_outputs: - op.write_stream(file_id, cell_id, stream) + for s in streams: + op.write_stream(file_id, cell_id, s) assert op.get_stream(file_id, cell_id) == text +def test_display_data(): + """Test display data.""" + texts = [ + "Hello World!", + "Hola Mundo!", + "Bonjour le monde!" + ] + outputs = list([display_data_text(t) for t in texts]) + with TemporaryDirectory() as td: + op = OutputsManager() + op.outputs_path = Path(td) / "outputs" + file_id = str(uuid4()) + cell_id = str(uuid4()) + for (i, output) in enumerate(outputs): + op.write_output(file_id, cell_id, output) + for (i, output) in enumerate(outputs): + assert op.get_output(file_id, cell_id, i) == outputs[i] + +def test_clear(): + """Test the clearing of outputs for a file_id.""" + output = display_data_text("Hello World!") + with TemporaryDirectory() as td: + op = OutputsManager() + op.outputs_path = Path(td) / "outputs" + file_id = str(uuid4()) + cell_id = str(uuid4()) + op.write_output(file_id, cell_id, output) + path = op._build_path(file_id, cell_id, output_index=0) + assert path.exists() + op.clear(file_id) + assert not path.exists() + +def file_not_found(): + """Test to ensure FileNotFoundError is raised.""" + with TemporaryDirectory() as td: + op = OutputsManager() + op.outputs_path = Path(td) / "outputs" + with pytest.raises(FileNotFoundError): + op.get_output('a','b',0) + with pytest.raises(FileNotFoundError): + op.get_stream('a','b') From 17f16d87e7eb4c16d0b21b82103d672843b14275 Mon Sep 17 00:00:00 2001 From: "Brian E. Granger" Date: Mon, 12 May 2025 19:43:41 -0700 Subject: [PATCH 08/14] Minor cleanup on outputs handlers. --- jupyter_rtc_core/outputs/handlers.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/jupyter_rtc_core/outputs/handlers.py b/jupyter_rtc_core/outputs/handlers.py index d31d4e6..a25623f 100644 --- a/jupyter_rtc_core/outputs/handlers.py +++ b/jupyter_rtc_core/outputs/handlers.py @@ -1,6 +1,3 @@ -# Copyright (c) Jupyter Development Team. -# Distributed under the terms of the Modified BSD License. - import json from tornado import web @@ -23,7 +20,7 @@ def outputs(self): @authorized async def get(self, file_id=None, cell_id=None, output_index=None): try: - output = self.outputs.get(file_id, cell_id, output_index) + output = self.outputs.get_output(file_id, cell_id, output_index) except (FileNotFoundError, KeyError): self.set_status(404) self.finish({"error": "Output not found."}) From 84b17cf06a51874fb6273940237adbf021ad6180 Mon Sep 17 00:00:00 2001 From: "Brian E. Granger" Date: Mon, 12 May 2025 20:20:28 -0700 Subject: [PATCH 09/14] Work on outputs processor tests. --- jupyter_rtc_core/outputs/output_processor.py | 17 ++++--- .../tests/test_output_processor.py | 51 ++++++++++++++++++- 2 files changed, 59 insertions(+), 9 deletions(-) diff --git a/jupyter_rtc_core/outputs/output_processor.py b/jupyter_rtc_core/outputs/output_processor.py index 02184d8..4c39fee 100644 --- a/jupyter_rtc_core/outputs/output_processor.py +++ b/jupyter_rtc_core/outputs/output_processor.py @@ -11,7 +11,7 @@ class OutputProcessor(LoggingConfigurable): _cell_ids = Dict(default_value={}) # a map from msg_id -> cell_id _cell_indices = Dict(default_value={}) # a map from cell_id -> cell index in notebook - _file_id = Unicode(default_value=None, allow_None=True) + _file_id = Unicode(default_value=None, allow_none=True) @property def settings(self): @@ -83,19 +83,22 @@ def process_incoming_message(self, channel: str, msg: list[bytes]): When output messages are send back to the frontend, this map is used to find the cell_id for a given parent msg_id. """ - header = json.loads(msg[0]) - msg_id = header["msg_id"] - msg_type = header["msg_type"] - metadata = json.loads(msg[2]) + if channel != "shell": + return + header = json.loads(msg[0]) # TODO use session unpack + msg_type = header.get("msg_type") + if msg_type != "execute_request": + return + msg_id = header.get("msg_id") + metadata = json.loads(msg[2]) # TODO use session unpack cell_id = metadata.get("cellId") if cell_id is None: return existing_msg_id = self.get_msg_id(cell_id) if existing_msg_id != msg_id: # cell is being re-run, clear output state + self.clear(cell_id) if self._file_id is not None: - self.log.info(f"Cell has been rerun, removing old outputs: {self._file_id} {cell_id}") - self.clear(cell_id) self.outputs_manager.clear(file_id=self._file_id, cell_id=cell_id) self.log.info(f"Saving (msg_id, cell_id): ({msg_id} {cell_id})") self.set_cell_id(msg_id, cell_id) diff --git a/jupyter_rtc_core/tests/test_output_processor.py b/jupyter_rtc_core/tests/test_output_processor.py index 0f753ba..38b0cc8 100644 --- a/jupyter_rtc_core/tests/test_output_processor.py +++ b/jupyter_rtc_core/tests/test_output_processor.py @@ -1,4 +1,51 @@ -from ..outputs import OutputProcessor +import json +from pathlib import Path +from tempfile import TemporaryDirectory +from uuid import uuid4 + +from ..outputs import OutputProcessor, OutputsManager + +class TestOutputProcessor(OutputProcessor): + + settings = {} + def test_instantiation(): - op = OutputProcessor() \ No newline at end of file + op = OutputProcessor() + +def create_incoming_message(cell_id): + msg_id = str(uuid4()) + header = {"msg_id": msg_id, "msg_type": "shell"} + parent_header = {} + metadata = {"cellId": cell_id} + msg = [json.dumps(item) for item in [header, parent_header, metadata]] + return msg_id, msg + +def test_incoming_message(): + + with TemporaryDirectory() as td: + op = TestOutputProcessor() + om = OutputsManager() + op.settings["outputs_manager"] = om + op.outputs_path = Path(td) / "outputs" + # Simulate the running of a cell + cell_id = str(uuid4()) + msg_id, msg = create_incoming_message(cell_id) + op.process_incoming_message('shell', msg) + assert op.get_cell_id(msg_id) == cell_id + assert op.get_msg_id(cell_id) == msg_id + # # Clear the cell_id + # op.clear(cell_id) + # assert op.get_cell_id(msg_id) is None + # assert op.get_msg_id(cell_id) is None + # # Simulate the running of a cell + # cell_id = str(uuid4()) + # msg_id, msg = create_incoming_message(cell_id) + # op.process_incoming_message('shell', msg) + # assert op.get_cell_id(msg_id) == cell_id + # assert op.get_msg_id(cell_id) == msg_id + # # Run it again without clearing to ensure it self clears + # cell_id = str(uuid4()) + # msg_id, msg = create_incoming_message(cell_id) + # assert op.get_cell_id(msg_id) == cell_id + # assert op.get_msg_id(cell_id) == msg_id From d18d9afdd89478888e801031c743a8172799291a Mon Sep 17 00:00:00 2001 From: "Brian E. Granger" Date: Tue, 13 May 2025 11:09:35 -0700 Subject: [PATCH 10/14] Update tests for output processor. --- .../tests/test_output_processor.py | 42 ++++++++++--------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/jupyter_rtc_core/tests/test_output_processor.py b/jupyter_rtc_core/tests/test_output_processor.py index 38b0cc8..0b94a0b 100644 --- a/jupyter_rtc_core/tests/test_output_processor.py +++ b/jupyter_rtc_core/tests/test_output_processor.py @@ -9,20 +9,21 @@ class TestOutputProcessor(OutputProcessor): settings = {} - -def test_instantiation(): - op = OutputProcessor() - def create_incoming_message(cell_id): msg_id = str(uuid4()) - header = {"msg_id": msg_id, "msg_type": "shell"} + header = {"msg_id": msg_id, "msg_type": "execute_request"} parent_header = {} metadata = {"cellId": cell_id} msg = [json.dumps(item) for item in [header, parent_header, metadata]] return msg_id, msg -def test_incoming_message(): +def test_instantiation(): + """Test instantiation of the output processor.""" + op = OutputProcessor() + assert isinstance(op, OutputProcessor) +def test_incoming_message(): + """Test incoming message processing.""" with TemporaryDirectory() as td: op = TestOutputProcessor() om = OutputsManager() @@ -34,18 +35,19 @@ def test_incoming_message(): op.process_incoming_message('shell', msg) assert op.get_cell_id(msg_id) == cell_id assert op.get_msg_id(cell_id) == msg_id - # # Clear the cell_id - # op.clear(cell_id) - # assert op.get_cell_id(msg_id) is None - # assert op.get_msg_id(cell_id) is None - # # Simulate the running of a cell - # cell_id = str(uuid4()) - # msg_id, msg = create_incoming_message(cell_id) - # op.process_incoming_message('shell', msg) - # assert op.get_cell_id(msg_id) == cell_id - # assert op.get_msg_id(cell_id) == msg_id + # Clear the cell_id + op.clear(cell_id) + assert op.get_cell_id(msg_id) is None + assert op.get_msg_id(cell_id) is None + # Simulate the running of a cell + cell_id = str(uuid4()) + msg_id, msg = create_incoming_message(cell_id) + op.process_incoming_message('shell', msg) + assert op.get_cell_id(msg_id) == cell_id + assert op.get_msg_id(cell_id) == msg_id # # Run it again without clearing to ensure it self clears - # cell_id = str(uuid4()) - # msg_id, msg = create_incoming_message(cell_id) - # assert op.get_cell_id(msg_id) == cell_id - # assert op.get_msg_id(cell_id) == msg_id + cell_id = str(uuid4()) + msg_id, msg = create_incoming_message(cell_id) + op.process_incoming_message('shell', msg) + assert op.get_cell_id(msg_id) == cell_id + assert op.get_msg_id(cell_id) == msg_id From c21aa4553631ec4f0c26cd54c70acbdaa2d2d537 Mon Sep 17 00:00:00 2001 From: "Brian E. Granger" Date: Tue, 13 May 2025 13:37:58 -0700 Subject: [PATCH 11/14] More intelligent stream handling. --- jupyter_rtc_core/outputs/manager.py | 51 ++++++++++++++++---- jupyter_rtc_core/outputs/output_processor.py | 30 ++++++------ 2 files changed, 58 insertions(+), 23 deletions(-) diff --git a/jupyter_rtc_core/outputs/manager.py b/jupyter_rtc_core/outputs/manager.py index 3bd349c..548d9fd 100644 --- a/jupyter_rtc_core/outputs/manager.py +++ b/jupyter_rtc_core/outputs/manager.py @@ -3,6 +3,7 @@ from pathlib import Path, PurePath import shutil +from pycrdt import Map from traitlets.config import LoggingConfigurable from traitlets import ( @@ -14,6 +15,7 @@ TraitError, Type, Unicode, + Int, default, validate, ) @@ -22,8 +24,11 @@ class OutputsManager(LoggingConfigurable): - outputs_path = Instance(PurePath, help="The local runtime dir") _last_output_index = Dict(default_value={}) + _stream_count = Dict(default_value={}) + + outputs_path = Instance(PurePath, help="The local runtime dir") + stream_limit = Int(default_value=10, config=True, allow_none=True) @default("outputs_path") def _default_outputs_path(self): @@ -64,11 +69,10 @@ def get_stream(self, file_id, cell_id): def write(self, file_id, cell_id, output): """Write a new output for file_id and cell_id.""" self.log.info(f"OutputsManager.write: {file_id} {cell_id} {output}") - if output["output_type"] == "stream": - url = self.write_stream(file_id, cell_id, output) - else: - url = self.write_output(file_id, cell_id, output) - return url + result = self.write_output(file_id, cell_id, output) + if output["output_type"] == "stream" and self.stream_limit is not None: + result = self.write_stream(file_id, cell_id, output) + return result def write_output(self, file_id, cell_id, output): self._ensure_path(file_id, cell_id) @@ -80,9 +84,13 @@ def write_output(self, file_id, cell_id, output): with open(path, "w", encoding="utf-8") as f: f.write(data) url = f"/api/outputs/{file_id}/{cell_id}/{index}.output" - return url + return Map({}) - def write_stream(self, file_id, cell_id, output): + def write_stream(self, file_id, cell_id, output) -> Map: + # How many stream outputs have been written for this cell previously + count = self._stream_count.get(cell_id, 0) + + # Go ahead and write the incoming stream self._ensure_path(file_id, cell_id) path = self._build_path(file_id, cell_id) / "stream" text = output["text"] @@ -90,7 +98,32 @@ def write_stream(self, file_id, cell_id, output): with open(path, "a", encoding="utf-8") as f: f.write(text) url = f"/api/outputs/{file_id}/{cell_id}/stream" - return url + + # Increment the count + count = count + 1 + self._stream_count[cell_id] = count + + # Now create the placeholder output + if count < self.stream_limit: + # Return the original if we haven't reached the limit + placeholder = Map(output) + elif count == self.stream_limit: + # Return a link to the full stream output + placeholder = Map({ + "output_type": "display_data", + "data": { + 'text/html': f'Click this link to see the full stream output' + } + }) + elif count > self.stream_limit: + placeholder = None + return placeholder + + + + + + def clear(self, file_id, cell_id=None): path = self._build_path(file_id, cell_id) diff --git a/jupyter_rtc_core/outputs/output_processor.py b/jupyter_rtc_core/outputs/output_processor.py index 4c39fee..912913b 100644 --- a/jupyter_rtc_core/outputs/output_processor.py +++ b/jupyter_rtc_core/outputs/output_processor.py @@ -3,7 +3,7 @@ from pycrdt import Map -from traitlets import Dict, Unicode +from traitlets import Dict, Unicode, Bool from traitlets.config import LoggingConfigurable @@ -13,6 +13,12 @@ class OutputProcessor(LoggingConfigurable): _cell_indices = Dict(default_value={}) # a map from cell_id -> cell index in notebook _file_id = Unicode(default_value=None, allow_none=True) + use_outputs_service = Bool( + default_value=True, + config=True, + help="Should outputs be routed to the outputs service to minimize the in memory ydoc size." + ) + @property def settings(self): """A shortcut for the Tornado web app settings.""" @@ -99,7 +105,8 @@ def process_incoming_message(self, channel: str, msg: list[bytes]): if existing_msg_id != msg_id: # cell is being re-run, clear output state self.clear(cell_id) if self._file_id is not None: - self.outputs_manager.clear(file_id=self._file_id, cell_id=cell_id) + if self.use_outputs_service: + self.outputs_manager.clear(file_id=self._file_id, cell_id=cell_id) self.log.info(f"Saving (msg_id, cell_id): ({msg_id} {cell_id})") self.set_cell_id(msg_id, cell_id) @@ -146,18 +153,13 @@ async def output_task(self, msg_type, cell_id, content): return # Convert from the message spec to the nbformat output structure - output = self.transform_output(msg_type, content, ydoc=False) - output_url = self.outputs_manager.write(file_id, cell_id, output) - nb_output = Map({ - "output_type": "display_data", - "data": { - 'text/html': f'Output' - }, - "metadata": { - "outputs_service": True - } - }) - target_cell["outputs"].append(nb_output) + if self.use_outputs_service: + output = self.transform_output(msg_type, content, ydoc=False) + output = self.outputs_manager.write(file_id, cell_id, output) + else: + output = self.transform_output(msg_type, content, ydoc=True) + if output is not None: + target_cell["outputs"].append(output) def find_cell(self, cell_id, cells): """Find a cell with a given cell_id in the list of cells. From e3505b17bfa6cedc270f81628209e2799c3f49b2 Mon Sep 17 00:00:00 2001 From: "Brian E. Granger" Date: Tue, 13 May 2025 13:45:04 -0700 Subject: [PATCH 12/14] Fix clear. --- jupyter_rtc_core/outputs/manager.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/jupyter_rtc_core/outputs/manager.py b/jupyter_rtc_core/outputs/manager.py index 548d9fd..347af95 100644 --- a/jupyter_rtc_core/outputs/manager.py +++ b/jupyter_rtc_core/outputs/manager.py @@ -119,15 +119,15 @@ def write_stream(self, file_id, cell_id, output) -> Map: placeholder = None return placeholder - - - - - - def clear(self, file_id, cell_id=None): - path = self._build_path(file_id, cell_id) + """Clear the state of the manager.""" + if cell_id is None: + self._stream_count = {} + path = self._build_path(file_id) + else: + try: + del self._stream_count[cell_id] + except KeyError: + pass + path = self._build_path(file_id, cell_id) shutil.rmtree(path) - - - From cd1024fa031e3d9a7ce159293ba8aa42c23b3a5f Mon Sep 17 00:00:00 2001 From: "Brian E. Granger" Date: Tue, 13 May 2025 16:33:13 -0700 Subject: [PATCH 13/14] Adding comments about exceptional cases. --- jupyter_rtc_core/outputs/output_processor.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/jupyter_rtc_core/outputs/output_processor.py b/jupyter_rtc_core/outputs/output_processor.py index 912913b..9532f98 100644 --- a/jupyter_rtc_core/outputs/output_processor.py +++ b/jupyter_rtc_core/outputs/output_processor.py @@ -99,7 +99,7 @@ def process_incoming_message(self, channel: str, msg: list[bytes]): metadata = json.loads(msg[2]) # TODO use session unpack cell_id = metadata.get("cellId") if cell_id is None: - return + return # cellId is optional, so this is valid existing_msg_id = self.get_msg_id(cell_id) if existing_msg_id != msg_id: # cell is being re-run, clear output state @@ -120,6 +120,7 @@ def process_outgoing_message(self, channel: str, msg: list[bytes]): content = dmsg["content"] cell_id = self.get_cell_id(msg_id) if cell_id is None: + # This is valid as cell_id is optional return asyncio.create_task(self.output_task(msg_type, cell_id, content)) return None # Don't allow the original message to propagate to the frontend @@ -128,7 +129,7 @@ async def output_task(self, msg_type, cell_id, content): """A coroutine to handle output messages.""" try: kernel_session = await self.session_manager.get_session(kernel_id=self.kernel_id) - except: # what exception to catch? + except: # TODO: what exception to catch and log? return else: path = kernel_session["path"] @@ -144,12 +145,13 @@ async def output_task(self, msg_type, cell_id, content): file_format='json', content_type='notebook' ) - except: # what exception to catch? + except: # TODO: what exception to catch and log? return cells = notebook.ycells cell_index, target_cell = self.find_cell(cell_id, cells) if target_cell is None: + # This is valid as cell_id is optional return # Convert from the message spec to the nbformat output structure From 16863fda59cdd02c83566c57834539bacb5f9df8 Mon Sep 17 00:00:00 2001 From: "Brian E. Granger" Date: Tue, 13 May 2025 16:34:08 -0700 Subject: [PATCH 14/14] Running linter. --- README.md | 4 +--- src/index.ts | 10 ++++++++-- ui-tests/tests/jupyter_rtc_core.spec.ts | 4 +++- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 61a5e66..f34e5c1 100644 --- a/README.md +++ b/README.md @@ -11,11 +11,11 @@ for the frontend extension. ## Try it out Run with the proper configuration + ``` jupyter lab --config jupyter_config.py ``` - ## Requirements - JupyterLab >= 4.0.0 @@ -62,7 +62,6 @@ Activating an environment is required to access any Python packages installed in that environment. You should activate the environment before developing any changes to the `jupyter_rtc_core` package locally. - ### Development install After ensuring that the `rtccore` environment is activated, you can install an @@ -116,7 +115,6 @@ Here is a summary of the commands to run after making changes: - Finally, refresh the JupyterLab page in the browser to load the new frontend assets and use the new backend. - ### Building on change (frontend only) You can watch the source directory and run JupyterLab at the same time in diff --git a/src/index.ts b/src/index.ts index ea982ca..cc3318a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,7 +15,10 @@ const plugin: JupyterFrontEndPlugin = { description: 'A JupyterLab extension that provides RTC capabilities.', autoStart: true, optional: [ISettingRegistry], - activate: (app: JupyterFrontEnd, settingRegistry: ISettingRegistry | null) => { + activate: ( + app: JupyterFrontEnd, + settingRegistry: ISettingRegistry | null + ) => { console.log('JupyterLab extension @jupyter/rtc-core is activated!'); if (settingRegistry) { @@ -25,7 +28,10 @@ const plugin: JupyterFrontEndPlugin = { console.log('@jupyter/rtc-core settings loaded:', settings.composite); }) .catch(reason => { - console.error('Failed to load settings for @jupyter/rtc-core.', reason); + console.error( + 'Failed to load settings for @jupyter/rtc-core.', + reason + ); }); } diff --git a/ui-tests/tests/jupyter_rtc_core.spec.ts b/ui-tests/tests/jupyter_rtc_core.spec.ts index 97f2ee7..746b601 100644 --- a/ui-tests/tests/jupyter_rtc_core.spec.ts +++ b/ui-tests/tests/jupyter_rtc_core.spec.ts @@ -16,6 +16,8 @@ test('should emit an activation console message', async ({ page }) => { await page.goto(); expect( - logs.filter(s => s === 'JupyterLab extension @jupyter/rtc-core is activated!') + logs.filter( + s => s === 'JupyterLab extension @jupyter/rtc-core is activated!' + ) ).toHaveLength(1); });