Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions jupyter_server_documents/kernels/kernel_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,11 @@ def handle_incoming_message(self, channel_name: str, msg: list[bytes]):
metadata = self.session.unpack(msg[2])
cell_id = metadata.get("cellId")

# Clear output processor if this cell already has
# an existing request.
# Clear cell outputs if cell is re-executedq
if cell_id:
existing = self.message_cache.get(cell_id=cell_id)
if existing and existing['msg_id'] != msg_id:
self.output_processor.clear(cell_id)
self.output_processor.clear_cell_outputs(cell_id)

self.message_cache.add({
"msg_id": msg_id,
Expand Down Expand Up @@ -218,7 +217,7 @@ async def handle_document_related_message(self, msg: t.List[bytes]) -> t.Optiona
except Exception as e:
self.log.error(f"Error deserializing message: {e}")
raise

parent_msg_id = dmsg["parent_header"]["msg_id"]
parent_msg_data = self.message_cache.get(parent_msg_id)
cell_id = parent_msg_data.get('cell_id')
Expand Down Expand Up @@ -273,13 +272,14 @@ async def handle_document_related_message(self, msg: t.List[bytes]) -> t.Optiona
target_cell["execution_count"] = execution_count
break

case "stream" | "display_data" | "execute_result" | "error" | "update_display_data":
case "stream" | "display_data" | "execute_result" | "error" | "update_display_data" | "clear_output":
if cell_id:
# Process specific output messages through an optional processor
if self.output_processor and cell_id:
cell_id = parent_msg_data.get('cell_id')
content = self.session.unpack(dmsg["content"])
dmsg = self.output_processor.process_output(dmsg['msg_type'], cell_id, content)
self.output_processor.process_output(dmsg['msg_type'], cell_id, content)

# Suppress forwarding of processed messages by returning None
return None

Expand Down
79 changes: 59 additions & 20 deletions jupyter_server_documents/outputs/output_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

from pycrdt import Map

from traitlets import Unicode, Bool
from traitlets import Unicode, Bool, Dict
from traitlets.config import LoggingConfigurable
from jupyter_server_documents.kernels.message_cache import KernelMessageCache

class OutputProcessor(LoggingConfigurable):

_file_id = Unicode(default_value=None, allow_none=True)
_pending_clear_output_cells = Dict(default_value={})

use_outputs_service = Bool(
default_value=True,
Expand Down Expand Up @@ -46,29 +47,54 @@ def yroom_manager(self):
"""A shortcut for the jupyter server ydoc manager."""
return self.settings["yroom_manager"]

async def clear_cell_outputs(self, cell_id, room_id):
"""Clears the outputs of a cell in ydoc"""
async def get_jupyter_ydoc(self, file_id):
room_id = f"json:notebook:{file_id}"
room = self.yroom_manager.get_room(room_id)
if room is None:
self.log.error(f"YRoom not found: {room_id}")
return
notebook = await room.get_jupyter_ydoc()
self.log.info(f"Notebook: {notebook}")

ydoc = await room.get_jupyter_ydoc()

return ydoc

async def _clear_ydoc_outputs(self, cell_id):
"""Clears the outputs of a cell in ydoc"""

if not self._file_id:
return

notebook = await self.get_jupyter_ydoc(self._file_id)
cell_index, target_cell = notebook.find_cell(cell_id)
if target_cell is not None:
target_cell["outputs"].clear()
self.log.info(f"Cleared outputs for ydoc: {room_id} {cell_index}")
self.log.info(f"Cleared outputs for {self._file_id=}, {cell_index=}")
Copy link
Collaborator

@dlqqq dlqqq Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


def clear_cell_outputs(self, cell_id) -> asyncio.Task | None:
"""
Clears all outputs for a cell on disk and in ydoc. Returns an
`asyncio.Task` that clears outputs for the cell in ydoc. Callers
should wait for this task to complete, if they expect to update
the ydoc.

```
clear_output_task = clear("cellid")
await clear_outputs_task
```
"""

clear_outputs_task = None

def clear(self, cell_id):
"""Clear all outputs for a given cell Id."""
if self._file_id is not None:
if self.use_outputs_service:
room_id = f"json:notebook:{self._file_id}"
asyncio.create_task(self.clear_cell_outputs(cell_id, room_id))
clear_outputs_task = asyncio.create_task(
self._clear_ydoc_outputs(cell_id)
)
self.outputs_manager.clear(file_id=self._file_id, cell_id=cell_id)

self._pending_clear_output_cells.pop(cell_id, None)

return clear_outputs_task

# Outgoing messages
def process_output(self, msg_type: str, cell_id: str, content: dict):
"""Process outgoing messages from the kernel.

Expand All @@ -87,6 +113,24 @@ def process_output(self, msg_type: str, cell_id: str, content: dict):

async def output_task(self, msg_type, cell_id, content):
"""A coroutine to handle output messages."""

if msg_type == "clear_output":
wait = content.get("wait", False)
if not wait:
clear_task = self.clear_cell_outputs(cell_id)
if clear_task:
await clear_task
else:
self._pending_clear_output_cells[cell_id] = True

return

# Check for pending clear_output before processing output
if cell_id in self._pending_clear_output_cells and (
clear_task := self.clear_cell_outputs(cell_id)
):
await clear_task

try:
# TODO: The session manager may have multiple notebooks connected to the kernel
# but currently get_session only returns the first. We need to fix this and
Expand Down Expand Up @@ -117,15 +161,10 @@ async def output_task(self, msg_type, cell_id, content):
else:
output = self.transform_output(msg_type, content, ydoc=True)

# Get the notebook ydoc from the room
room_id = f"json:notebook:{file_id}"
room = self.yroom_manager.get_room(room_id)
if room is None:
self.log.error(f"YRoom not found: {room_id}")
notebook = await self.get_jupyter_ydoc(file_id)
if not notebook:
return
notebook = await room.get_jupyter_ydoc()
self.log.info(f"Notebook: {notebook}")


# Write the outputs to the ydoc cell.
_, target_cell = notebook.find_cell(cell_id)
if target_cell is not None and output is not None:
Expand Down
Loading