Skip to content

Commit 8188e5a

Browse files
authored
feat: Update BreakpointException to include the pipeline snapshot and where it is saved (#9888)
* Add snapshot and the full file path it was saved to breakpoint exception * Add checks to test and simplify test * Make both tests unit tests * remove unused import * update reno * Add docstrings * Updated reno * Simplify test * Uupdate test * Fix mypy
1 parent dc1d722 commit 8188e5a

File tree

6 files changed

+139
-258
lines changed

6 files changed

+139
-258
lines changed

haystack/core/errors.py

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@ def __init__(
1818
component_type: Optional[type],
1919
message: str,
2020
pipeline_snapshot: Optional[PipelineSnapshot] = None,
21+
*,
22+
pipeline_snapshot_file_path: Optional[str] = None,
2123
) -> None:
2224
self.component_name = component_name
2325
self.component_type = component_type
2426
self.pipeline_snapshot = pipeline_snapshot
27+
self.pipeline_snapshot_file_path = pipeline_snapshot_file_path
2528
super().__init__(message)
2629

2730
@classmethod
@@ -109,13 +112,43 @@ def __init__(
109112
self,
110113
message: str,
111114
component: Optional[str] = None,
112-
inputs: Optional[dict[str, Any]] = None,
113-
results: Optional[dict[str, Any]] = None,
115+
pipeline_snapshot: Optional[PipelineSnapshot] = None,
116+
pipeline_snapshot_file_path: Optional[str] = None,
114117
):
115118
super().__init__(message)
116119
self.component = component
117-
self.inputs = inputs
118-
self.results = results
120+
self.pipeline_snapshot = pipeline_snapshot
121+
self.pipeline_snapshot_file_path = pipeline_snapshot_file_path
122+
123+
@property
124+
def inputs(self):
125+
"""
126+
Returns the inputs of the pipeline or agent at the breakpoint.
127+
128+
If an AgentBreakpoint caused this exception, returns the inputs of the agent's internal components.
129+
Otherwise, returns the current inputs of the pipeline.
130+
"""
131+
if not self.pipeline_snapshot:
132+
return None
133+
134+
if self.pipeline_snapshot.agent_snapshot:
135+
return self.pipeline_snapshot.agent_snapshot.component_inputs
136+
return self.pipeline_snapshot.pipeline_state.inputs
137+
138+
@property
139+
def results(self):
140+
"""
141+
Returns the results of the pipeline or agent at the breakpoint.
142+
143+
If an AgentBreakpoint caused this exception, returns the current results of the agent.
144+
Otherwise, returns the current outputs of the pipeline.
145+
"""
146+
if not self.pipeline_snapshot:
147+
return None
148+
149+
if self.pipeline_snapshot.agent_snapshot:
150+
return self.pipeline_snapshot.agent_snapshot.component_inputs["tool_invoker"]["serialized_data"]["state"]
151+
return self.pipeline_snapshot.pipeline_state.pipeline_outputs
119152

120153

121154
class PipelineInvalidPipelineSnapshotError(Exception):

haystack/core/pipeline/breakpoint.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ def load_pipeline_snapshot(file_path: Union[str, Path]) -> PipelineSnapshot:
144144
return pipeline_snapshot
145145

146146

147-
def _save_pipeline_snapshot(pipeline_snapshot: PipelineSnapshot, raise_on_failure: bool = True) -> None:
147+
def _save_pipeline_snapshot(pipeline_snapshot: PipelineSnapshot, raise_on_failure: bool = True) -> Optional[str]:
148148
"""
149149
Save the pipeline snapshot dictionary to a JSON file.
150150
@@ -158,6 +158,8 @@ def _save_pipeline_snapshot(pipeline_snapshot: PipelineSnapshot, raise_on_failur
158158
:param pipeline_snapshot: The pipeline snapshot to save.
159159
:param raise_on_failure: If True, raises an exception if saving fails. If False, logs the error and returns.
160160
161+
:returns:
162+
The full path to the saved JSON file, or None if `snapshot_file_path` is None.
161163
:raises:
162164
Exception: If saving the JSON snapshot fails.
163165
"""
@@ -169,7 +171,7 @@ def _save_pipeline_snapshot(pipeline_snapshot: PipelineSnapshot, raise_on_failur
169171
)
170172

171173
if snapshot_file_path is None:
172-
return
174+
return None
173175

174176
dt = pipeline_snapshot.timestamp or datetime.now()
175177
snapshot_dir = Path(snapshot_file_path)
@@ -201,6 +203,8 @@ def _save_pipeline_snapshot(pipeline_snapshot: PipelineSnapshot, raise_on_failur
201203
if raise_on_failure:
202204
raise
203205

206+
return str(full_path)
207+
204208

205209
def _create_pipeline_snapshot(
206210
*,
@@ -304,9 +308,9 @@ def _trigger_break_point(*, pipeline_snapshot: PipelineSnapshot) -> None:
304308
Trigger a breakpoint by saving a snapshot and raising exception.
305309
306310
:param pipeline_snapshot: The current pipeline snapshot containing the state and break point
307-
:raises PipelineBreakpointException: When breakpoint is triggered
311+
:raises BreakpointException: When breakpoint is triggered
308312
"""
309-
_save_pipeline_snapshot(pipeline_snapshot=pipeline_snapshot)
313+
full_file_path = _save_pipeline_snapshot(pipeline_snapshot=pipeline_snapshot)
310314

311315
if isinstance(pipeline_snapshot.break_point, Breakpoint):
312316
component_name = pipeline_snapshot.break_point.component_name
@@ -318,8 +322,8 @@ def _trigger_break_point(*, pipeline_snapshot: PipelineSnapshot) -> None:
318322
raise BreakpointException(
319323
message=msg,
320324
component=component_name,
321-
inputs=pipeline_snapshot.pipeline_state.inputs,
322-
results=pipeline_snapshot.pipeline_state.pipeline_outputs,
325+
pipeline_snapshot=pipeline_snapshot,
326+
pipeline_snapshot_file_path=full_file_path,
323327
)
324328

325329

@@ -502,17 +506,16 @@ def _trigger_chat_generator_breakpoint(*, pipeline_snapshot: PipelineSnapshot) -
502506
raise ValueError("PipelineSnapshot must contain an AgentSnapshot to trigger a chat generator breakpoint.")
503507

504508
break_point = pipeline_snapshot.break_point.break_point
505-
_save_pipeline_snapshot(pipeline_snapshot=pipeline_snapshot)
509+
full_file_path = _save_pipeline_snapshot(pipeline_snapshot=pipeline_snapshot)
506510
msg = (
507511
f"Breaking at {break_point.component_name} visit count "
508512
f"{pipeline_snapshot.agent_snapshot.component_visits[break_point.component_name]}"
509513
)
510-
logger.info(msg)
511514
raise BreakpointException(
512515
message=msg,
513516
component=break_point.component_name,
514-
inputs=pipeline_snapshot.agent_snapshot.component_inputs,
515-
results=pipeline_snapshot.agent_snapshot.component_inputs["tool_invoker"]["serialized_data"]["state"],
517+
pipeline_snapshot=pipeline_snapshot,
518+
pipeline_snapshot_file_path=full_file_path,
516519
)
517520

518521

@@ -546,7 +549,7 @@ def _trigger_tool_invoker_breakpoint(*, llm_messages: list[ChatMessage], pipelin
546549
if not should_break:
547550
return # No breakpoint triggered
548551

549-
_save_pipeline_snapshot(pipeline_snapshot=pipeline_snapshot)
552+
full_file_path = _save_pipeline_snapshot(pipeline_snapshot=pipeline_snapshot)
550553

551554
msg = (
552555
f"Breaking at {tool_breakpoint.component_name} visit count "
@@ -559,6 +562,6 @@ def _trigger_tool_invoker_breakpoint(*, llm_messages: list[ChatMessage], pipelin
559562
raise BreakpointException(
560563
message=msg,
561564
component=tool_breakpoint.component_name,
562-
inputs=pipeline_snapshot.agent_snapshot.component_inputs,
563-
results=pipeline_snapshot.agent_snapshot.component_inputs["tool_invoker"]["serialized_data"]["state"],
565+
pipeline_snapshot=pipeline_snapshot,
566+
pipeline_snapshot_file_path=full_file_path,
564567
)

haystack/core/pipeline/pipeline.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -390,8 +390,6 @@ def run( # noqa: PLR0915, PLR0912, C901, pylint: disable=too-many-branches
390390
parent_span=span,
391391
)
392392
except PipelineRuntimeError as error:
393-
# TODO Wrap creation of the pipeline snapshot with try-except in case it fails
394-
# (e.g. serialization issue)
395393
out_dir = _get_output_dir("pipeline_snapshot")
396394
break_point = Breakpoint(
397395
component_name=component_name,
@@ -420,7 +418,10 @@ def run( # noqa: PLR0915, PLR0912, C901, pylint: disable=too-many-branches
420418

421419
# Attach the pipeline snapshot to the error before re-raising
422420
error.pipeline_snapshot = pipeline_snapshot
423-
_save_pipeline_snapshot(pipeline_snapshot=pipeline_snapshot, raise_on_failure=False)
421+
full_file_path = _save_pipeline_snapshot(
422+
pipeline_snapshot=pipeline_snapshot, raise_on_failure=False
423+
)
424+
error.pipeline_snapshot_file_path = full_file_path
424425
raise error
425426

426427
# Updates global input state with component outputs and returns outputs that should go to
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
features:
3+
- |
4+
- Added `pipeline_snapshot` and `pipeline_snapshot_file_path` parameters to `BreakpointException` to provide more context when a pipeline breakpoint is triggered.
5+
- Added `pipeline_snapshot_file_path` parameter to `PipelineRuntimeError` to include a reference to the stored pipeline snapshot so it can be easily found.

0 commit comments

Comments
 (0)