Skip to content

Commit dd042c6

Browse files
authored
Handle clear output msg (#113)
* WIP: Working version of clear_output * Refactored * Fixed clear output in ydoc * Update clear output state to set * Simplified clear task based on feedback
1 parent c08de64 commit dd042c6

File tree

2 files changed

+51
-27
lines changed

2 files changed

+51
-27
lines changed

jupyter_server_documents/kernels/kernel_client.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,11 @@ def handle_incoming_message(self, channel_name: str, msg: list[bytes]):
106106
metadata = self.session.unpack(msg[2])
107107
cell_id = metadata.get("cellId")
108108

109-
# Clear output processor if this cell already has
110-
# an existing request.
109+
# Clear cell outputs if cell is re-executedq
111110
if cell_id:
112111
existing = self.message_cache.get(cell_id=cell_id)
113112
if existing and existing['msg_id'] != msg_id:
114-
self.output_processor.clear(cell_id)
113+
asyncio.create_task(self.output_processor.clear_cell_outputs(cell_id))
115114

116115
self.message_cache.add({
117116
"msg_id": msg_id,
@@ -218,7 +217,7 @@ async def handle_document_related_message(self, msg: t.List[bytes]) -> t.Optiona
218217
except Exception as e:
219218
self.log.error(f"Error deserializing message: {e}")
220219
raise
221-
220+
222221
parent_msg_id = dmsg["parent_header"]["msg_id"]
223222
parent_msg_data = self.message_cache.get(parent_msg_id)
224223
cell_id = parent_msg_data.get('cell_id')
@@ -273,13 +272,14 @@ async def handle_document_related_message(self, msg: t.List[bytes]) -> t.Optiona
273272
target_cell["execution_count"] = execution_count
274273
break
275274

276-
case "stream" | "display_data" | "execute_result" | "error" | "update_display_data":
275+
case "stream" | "display_data" | "execute_result" | "error" | "update_display_data" | "clear_output":
277276
if cell_id:
278277
# Process specific output messages through an optional processor
279278
if self.output_processor and cell_id:
280279
cell_id = parent_msg_data.get('cell_id')
281280
content = self.session.unpack(dmsg["content"])
282-
dmsg = self.output_processor.process_output(dmsg['msg_type'], cell_id, content)
281+
self.output_processor.process_output(dmsg['msg_type'], cell_id, content)
282+
283283
# Suppress forwarding of processed messages by returning None
284284
return None
285285

jupyter_server_documents/outputs/output_processor.py

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22

33
from pycrdt import Map
44

5-
from traitlets import Unicode, Bool
5+
from traitlets import Unicode, Bool, Set
66
from traitlets.config import LoggingConfigurable
77
from jupyter_server_documents.kernels.message_cache import KernelMessageCache
88

99
class OutputProcessor(LoggingConfigurable):
1010

1111
_file_id = Unicode(default_value=None, allow_none=True)
12+
_pending_clear_output_cells = Set(default_value=set())
1213

1314
use_outputs_service = Bool(
1415
default_value=True,
@@ -46,29 +47,39 @@ def yroom_manager(self):
4647
"""A shortcut for the jupyter server ydoc manager."""
4748
return self.settings["yroom_manager"]
4849

49-
async def clear_cell_outputs(self, cell_id, room_id):
50-
"""Clears the outputs of a cell in ydoc"""
50+
async def get_jupyter_ydoc(self, file_id):
51+
room_id = f"json:notebook:{file_id}"
5152
room = self.yroom_manager.get_room(room_id)
5253
if room is None:
5354
self.log.error(f"YRoom not found: {room_id}")
5455
return
55-
notebook = await room.get_jupyter_ydoc()
56-
self.log.info(f"Notebook: {notebook}")
57-
56+
ydoc = await room.get_jupyter_ydoc()
57+
58+
return ydoc
59+
60+
async def _clear_ydoc_outputs(self, cell_id):
61+
"""Clears the outputs of a cell in ydoc"""
62+
63+
if not self._file_id:
64+
return
65+
66+
notebook = await self.get_jupyter_ydoc(self._file_id)
5867
cell_index, target_cell = notebook.find_cell(cell_id)
5968
if target_cell is not None:
6069
target_cell["outputs"].clear()
61-
self.log.info(f"Cleared outputs for ydoc: {room_id} {cell_index}")
70+
self.log.info(f"Cleared outputs for {self._file_id=}, {cell_index=}")
71+
72+
async def clear_cell_outputs(self, cell_id):
73+
"""Clears all outputs for a cell on disk and in ydoc."""
6274

63-
def clear(self, cell_id):
64-
"""Clear all outputs for a given cell Id."""
6575
if self._file_id is not None:
76+
await self._clear_ydoc_outputs(cell_id)
77+
self._pending_clear_output_cells.discard(cell_id)
78+
6679
if self.use_outputs_service:
67-
room_id = f"json:notebook:{self._file_id}"
68-
asyncio.create_task(self.clear_cell_outputs(cell_id, room_id))
6980
self.outputs_manager.clear(file_id=self._file_id, cell_id=cell_id)
81+
7082

71-
# Outgoing messages
7283
def process_output(self, msg_type: str, cell_id: str, content: dict):
7384
"""Process outgoing messages from the kernel.
7485
@@ -82,11 +93,29 @@ def process_output(self, msg_type: str, cell_id: str, content: dict):
8293
The content has not been deserialized yet as we need to verify we
8394
should process it.
8495
"""
85-
asyncio.create_task(self.output_task(msg_type, cell_id, content))
96+
if msg_type == "clear_output":
97+
asyncio.create_task(self.clear_output_task(cell_id, content))
98+
else:
99+
asyncio.create_task(self.output_task(msg_type, cell_id, content))
100+
86101
return None # Don't allow the original message to propagate to the frontend
87102

103+
async def clear_output_task(self, cell_id, content):
104+
"""A courotine to handle clear_output messages"""
105+
106+
wait = content.get("wait", False)
107+
if wait:
108+
self._pending_clear_output_cells.add(cell_id)
109+
else:
110+
await self.clear_cell_outputs(cell_id)
111+
88112
async def output_task(self, msg_type, cell_id, content):
89113
"""A coroutine to handle output messages."""
114+
115+
# Check for pending clear_output before processing output
116+
if cell_id in self._pending_clear_output_cells:
117+
await self.clear_cell_outputs(cell_id)
118+
90119
try:
91120
# TODO: The session manager may have multiple notebooks connected to the kernel
92121
# but currently get_session only returns the first. We need to fix this and
@@ -117,15 +146,10 @@ async def output_task(self, msg_type, cell_id, content):
117146
else:
118147
output = self.transform_output(msg_type, content, ydoc=True)
119148

120-
# Get the notebook ydoc from the room
121-
room_id = f"json:notebook:{file_id}"
122-
room = self.yroom_manager.get_room(room_id)
123-
if room is None:
124-
self.log.error(f"YRoom not found: {room_id}")
149+
notebook = await self.get_jupyter_ydoc(file_id)
150+
if not notebook:
125151
return
126-
notebook = await room.get_jupyter_ydoc()
127-
self.log.info(f"Notebook: {notebook}")
128-
152+
129153
# Write the outputs to the ydoc cell.
130154
_, target_cell = notebook.find_cell(cell_id)
131155
if target_cell is not None and output is not None:

0 commit comments

Comments
 (0)