diff --git a/jupyter_rtc_core/app.py b/jupyter_rtc_core/app.py index 7097d74..730e4c1 100644 --- a/jupyter_rtc_core/app.py +++ b/jupyter_rtc_core/app.py @@ -6,6 +6,7 @@ from .handlers import RouteHandler, FileIDIndexHandler from .websockets import GlobalAwarenessWebsocket, YRoomWebsocket from .rooms.yroom_manager import YRoomManager +from .outputs import OutputsManager, outputs_handlers class RtcExtensionApp(ExtensionApp): name = "jupyter_rtc_core" @@ -22,9 +23,10 @@ class RtcExtensionApp(ExtensionApp): (r"api/collaboration/room/(.*)", YRoomWebsocket), # # handler that just adds compatibility with Jupyter Collaboration's frontend # (r"api/collaboration/session/(.*)", YRoomSessionHandler), - (r"api/fileid/index", FileIDIndexHandler) + (r"api/fileid/index", FileIDIndexHandler), + *outputs_handlers ] - + yroom_manager_class = Type( klass=YRoomManager, help="""YRoom Manager Class.""", @@ -35,6 +37,18 @@ class RtcExtensionApp(ExtensionApp): def yroom_manager(self) -> YRoomManager | None: return self.settings.get("yroom_manager", None) + outputs_manager_class = Type( + klass=OutputsManager, + help="Outputs manager class.", + default_value=OutputsManager + ).tag(config=True) + + outputs_manager = Instance( + klass=OutputsManager, + help="An instance of the OutputsManager", + allow_none=True + ).tag(config=True) + def initialize(self): super().initialize() @@ -56,8 +70,11 @@ def get_fileid_manager(): loop=loop, log=log ) + + # Initialize OutputsManager + self.outputs_manager = self.outputs_manager_class(config=self.config) + self.settings["outputs_manager"] = self.outputs_manager - def _link_jupyter_server_extension(self, server_app): """Setup custom config needed by this extension.""" c = Config() diff --git a/jupyter_rtc_core/kernels/kernel_client.py b/jupyter_rtc_core/kernels/kernel_client.py index d9e20c7..abf72de 100644 --- a/jupyter_rtc_core/kernels/kernel_client.py +++ b/jupyter_rtc_core/kernels/kernel_client.py @@ -1,16 +1,17 @@ """ A new Kernel client that is aware of ydocuments. """ +import anyio import asyncio import json import typing as t -from traitlets import Set -from traitlets import Instance -from traitlets import Any -from .utils import LRUCache + +from traitlets import Set, Instance, Any, Type, default from jupyter_client.asynchronous.client import AsyncKernelClient -import anyio + +from .utils import LRUCache from jupyter_rtc_core.rooms.yroom import YRoom +from jupyter_rtc_core.outputs import OutputProcessor class DocumentAwareKernelClient(AsyncKernelClient): @@ -35,6 +36,21 @@ class DocumentAwareKernelClient(AsyncKernelClient): # status messages. _yrooms: t.Set[YRoom] = Set(trait=Instance(YRoom), default_value=set()) + output_processor = Instance( + OutputProcessor, + allow_none=True + ) + + output_process_class = Type( + klass=OutputProcessor, + default_value=OutputProcessor + ).tag(config=True) + + @default("output_processor") + def _default_output_processor(self) -> OutputProcessor: + self.log.info("Creating output processor") + return OutputProcessor(parent=self, config=self.config) + async def start_listening(self): """Start listening to messages coming from the kernel. @@ -77,7 +93,8 @@ def handle_incoming_message(self, channel_name: str, msg: list[bytes]): # Cache the message ID and its socket name so that # any response message can be mapped back to the # source channel. - header = header = json.loads(msg[0]) + self.output_processor.process_incoming_message(channel=channel_name, msg=msg) + header = json.loads(msg[0]) # TODO: use session.unpack msg_id = header["msg_id"] self.message_source_cache[msg_id] = channel_name channel = getattr(self, f"{channel_name}_channel") @@ -158,10 +175,9 @@ async def handle_outgoing_message(self, channel_name: str, msg: list[bytes]): # Intercept messages that are IOPub focused. if channel_name == "iopub": message_returned = await self.handle_iopub_message(msg) - # TODO: If the message is not returned by the iopub handler, then + # If the message is not returned by the iopub handler, then # return here and do not forward to listeners. if not message_returned: - self.log.warn(f"If message is handled donot forward after adding output manager") return # Update the last activity. @@ -181,11 +197,9 @@ async def handle_iopub_message(self, msg: list[bytes]) -> t.Optional[list[bytes] Returns ------- Returns the message if it should be forwarded to listeners. Otherwise, - returns `None` and keeps (i.e. intercepts) the message from going + returns `None` and prevents (i.e. intercepts) the message from going to listeners. """ - # NOTE: Here's where we will inject the kernel state - # into the awareness of a document. try: dmsg = self.session.deserialize(msg, content=False) @@ -193,17 +207,15 @@ async def handle_iopub_message(self, msg: list[bytes]) -> t.Optional[list[bytes] self.log.error(f"Error deserializing message: {e}") raise - # NOTE: Inject display data into ydoc. - if dmsg["msg_type"] == "display_data": - # Forward to all yrooms. - for yroom in self._yrooms: - update_document_message = b"" - # yroom.add_message(update_document_message) + if self.output_processor is not None and dmsg["msg_type"] in ("stream", "display_data", "execute_result", "error"): + dmsg = self.output_processor.process_outgoing_message(dmsg) - # TODO: returning message temporarily to not break UI - # If the message isn't handled above, return it and it will - # be forwarded to all listeners - return msg + # If process_outgoing_message returns None, return None so the message isn't + # sent to clients, otherwise return the original serialized message. + if dmsg is None: + return None + else: + return msg def send_kernel_awareness(self, kernel_status: dict): """ @@ -218,7 +230,6 @@ def send_kernel_awareness(self, kernel_status: dict): awareness.set_local_state_field("kernel", kernel_status) self.log.debug(f"current state: {awareness.get_local_state()} room_id: {yroom.room_id}") - async def add_yroom(self, yroom: YRoom): """ Register a YRoom with this kernel client. YRooms will diff --git a/jupyter_rtc_core/outputs/manager.py b/jupyter_rtc_core/outputs/manager.py index 347af95..15aa73a 100644 --- a/jupyter_rtc_core/outputs/manager.py +++ b/jupyter_rtc_core/outputs/manager.py @@ -7,17 +7,10 @@ from traitlets.config import LoggingConfigurable from traitlets import ( - Any, - Bool, Dict, Instance, - List, - TraitError, - Type, - Unicode, Int, - default, - validate, + default ) from jupyter_core.paths import jupyter_runtime_dir @@ -36,7 +29,6 @@ def _default_outputs_path(self): def _ensure_path(self, file_id, cell_id): nested_dir = self.outputs_path / file_id / cell_id - self.log.info(f"Creating directory: {nested_dir}") nested_dir.mkdir(parents=True, exist_ok=True) def _build_path(self, file_id, cell_id=None, output_index=None): @@ -49,7 +41,6 @@ def _build_path(self, file_id, cell_id=None, output_index=None): def get_output(self, file_id, cell_id, output_index): """Get an outputs by file_id, cell_id, and output_index.""" - self.log.info(f"OutputsManager.get: {file_id} {cell_id} {output_index}") path = self._build_path(file_id, cell_id, output_index) if not os.path.isfile(path): raise FileNotFoundError(f"The output file doesn't exist: {path}") @@ -67,12 +58,15 @@ def get_stream(self, file_id, cell_id): return output def write(self, file_id, cell_id, output): - """Write a new output for file_id and cell_id.""" - self.log.info(f"OutputsManager.write: {file_id} {cell_id} {output}") - result = self.write_output(file_id, cell_id, output) + """Write a new output for file_id and cell_id. + + Returns a placeholder output (pycrdt.Map) or None if no placeholder + output should be written to the ydoc. + """ + placeholder = self.write_output(file_id, cell_id, output) if output["output_type"] == "stream" and self.stream_limit is not None: - result = self.write_stream(file_id, cell_id, output) - return result + placeholder = self.write_stream(file_id, cell_id, output, placeholder) + return placeholder def write_output(self, file_id, cell_id, output): self._ensure_path(file_id, cell_id) @@ -84,9 +78,10 @@ def write_output(self, file_id, cell_id, output): with open(path, "w", encoding="utf-8") as f: f.write(data) url = f"/api/outputs/{file_id}/{cell_id}/{index}.output" - return Map({}) + self.log.info(f"Wrote output: {url}") + return create_placeholder_output(output["output_type"], url) - def write_stream(self, file_id, cell_id, output) -> Map: + def write_stream(self, file_id, cell_id, output, placeholder) -> Map: # How many stream outputs have been written for this cell previously count = self._stream_count.get(cell_id, 0) @@ -98,6 +93,7 @@ def write_stream(self, file_id, cell_id, output) -> Map: with open(path, "a", encoding="utf-8") as f: f.write(text) url = f"/api/outputs/{file_id}/{cell_id}/stream" + self.log.info(f"Wrote stream: {url}") # Increment the count count = count + 1 @@ -105,8 +101,8 @@ def write_stream(self, file_id, cell_id, output) -> Map: # Now create the placeholder output if count < self.stream_limit: - # Return the original if we haven't reached the limit - placeholder = Map(output) + # Return the original placeholder if we haven't reached the limit + placeholder = placeholder elif count == self.stream_limit: # Return a link to the full stream output placeholder = Map({ @@ -116,6 +112,7 @@ def write_stream(self, file_id, cell_id, output) -> Map: } }) elif count > self.stream_limit: + # Return None to indicate that no placeholder should be written to the ydoc placeholder = None return placeholder @@ -130,4 +127,33 @@ def clear(self, file_id, cell_id=None): except KeyError: pass path = self._build_path(file_id, cell_id) - shutil.rmtree(path) + try: + shutil.rmtree(path) + except FileNotFoundError: + pass + + +def create_placeholder_output(output_type: str, url: str): + metadata = dict(url=url) + if output_type == "stream": + output = Map({ + "output_type": "stream", + "text": "", + "metadata": metadata + }) + elif output_type == "display_data": + output = Map({ + "output_type": "display_data", + "metadata": metadata + }) + elif output_type == "execute_result": + output = Map({ + "output_type": "execute_result", + "metadata": metadata + }) + elif output_type == "error": + output = Map({ + "output_type": "error", + "metadata": metadata + }) + return output \ No newline at end of file diff --git a/jupyter_rtc_core/outputs/output_processor.py b/jupyter_rtc_core/outputs/output_processor.py index 9532f98..a9033c1 100644 --- a/jupyter_rtc_core/outputs/output_processor.py +++ b/jupyter_rtc_core/outputs/output_processor.py @@ -15,14 +15,14 @@ class OutputProcessor(LoggingConfigurable): use_outputs_service = Bool( default_value=True, - config=True, help="Should outputs be routed to the outputs service to minimize the in memory ydoc size." - ) + ).tag(config=True) @property def settings(self): """A shortcut for the Tornado web app settings.""" - return self.parent.parent.webapp.settings + # self.KernelClient.KernelManager.AsyncMultiKernelManager.ServerApp + return self.parent.parent.parent.parent.web_app.settings @property def kernel_client(self): @@ -45,9 +45,9 @@ def file_id_manager(self): return self.settings["file_id_manager"] @property - def jupyter_server_ydoc(self): + def yroom_manager(self): """A shortcut for the jupyter server ydoc manager.""" - return self.settings["jupyter_server_ydoc"] + return self.settings["yroom_manager"] def clear(self, cell_id=None): """Clear the state of the output processor. @@ -107,52 +107,56 @@ def process_incoming_message(self, channel: str, msg: list[bytes]): if self._file_id is not None: if self.use_outputs_service: self.outputs_manager.clear(file_id=self._file_id, cell_id=cell_id) + self.outputs_manager.clear(file_id=self._file_id) self.log.info(f"Saving (msg_id, cell_id): ({msg_id} {cell_id})") self.set_cell_id(msg_id, cell_id) # Outgoing messages - def process_outgoing_message(self, channel: str, msg: list[bytes]): - """Process outgoing messagers from the kernel.""" - dmsg = self.kernel_client.session.deserialize(msg) + def process_outgoing_message(self, dmsg: dict): + """Process outgoing messages from the kernel. + + This returns the input dmsg if no the message should be sent to + clients, or None if it should not be sent. + + The dmsg is a deserialized message generated by calling: + + > self.kernel_client.session.deserialize(dmsg, content=False) + + The content has not been deserialized yet as we need to verify we + should process it. + """ msg_type = dmsg["header"]["msg_type"] + if msg_type not in ("stream", "display_data", "execute_result", "error"): + return dmsg msg_id = dmsg["parent_header"]["msg_id"] - content = dmsg["content"] + content = self.parent.session.unpack(dmsg["content"]) cell_id = self.get_cell_id(msg_id) if cell_id is None: # This is valid as cell_id is optional - return + return dmsg asyncio.create_task(self.output_task(msg_type, cell_id, content)) return None # Don't allow the original message to propagate to the frontend async def output_task(self, msg_type, cell_id, content): """A coroutine to handle output messages.""" try: - kernel_session = await self.session_manager.get_session(kernel_id=self.kernel_id) - except: # TODO: what exception to catch and log? + # TODO: The session manager may have multiple notebooks connected to the kernel + # but currently get_session only returns the first. We need to fix this and + # find the notebook with the right cell_id. + kernel_session = await self.session_manager.get_session(kernel_id=self.parent.parent.kernel_id) + except Exception as e: + self.log.error(f"An exception occurred when processing output for cell {cell_id}") + self.log.exception(e) return else: path = kernel_session["path"] file_id = self.file_id_manager.get_id(path) if file_id is None: + self.log.error(f"Could not find file_id for path: {path}") return self._file_id = file_id - try: - notebook = await self.jupyter_server_ydoc.get_document( - path=path, - copy=False, - file_format='json', - content_type='notebook' - ) - except: # TODO: what exception to catch and log? - return - cells = notebook.ycells - - cell_index, target_cell = self.find_cell(cell_id, cells) - if target_cell is None: - # This is valid as cell_id is optional - return # Convert from the message spec to the nbformat output structure if self.use_outputs_service: @@ -160,8 +164,22 @@ async def output_task(self, msg_type, cell_id, content): output = self.outputs_manager.write(file_id, cell_id, output) else: output = self.transform_output(msg_type, content, ydoc=True) - if output is not None: + + # Get the notebook ydoc from the room + room_id = f"json:notebook:{file_id}" + room = self.yroom_manager.get_room(room_id) + if room is None: + self.log.error(f"YRoom not found: {room_id}") + return + notebook = await room.get_jupyter_ydoc() + self.log.info(f"Notebook: {notebook}") + + # Write the outputs to the ydoc cell. + cells = notebook.ycells + cell_index, target_cell = self.find_cell(cell_id, cells) + if target_cell is not None and output is not None: target_cell["outputs"].append(output) + self.log.info(f"Write output to ydoc: {path} {cell_id} {output}") def find_cell(self, cell_id, cells): """Find a cell with a given cell_id in the list of cells.