diff --git a/cookbook/04_workflows/03_loop_execution/loop_iterative_accumulation.py b/cookbook/04_workflows/03_loop_execution/loop_iterative_accumulation.py new file mode 100644 index 0000000000..423ba886eb --- /dev/null +++ b/cookbook/04_workflows/03_loop_execution/loop_iterative_accumulation.py @@ -0,0 +1,55 @@ +""" +Loop Iterative Accumulation +============================ + +Demonstrates that Loop iterations carry forward the output from the previous iteration. +Each iteration receives the previous iteration's output via `step_input.get_last_step_content()`, +enabling iterative processing patterns like accumulation, refinement, and convergence. + +This example increments a numeric value by 10 each iteration, stopping when it reaches 50 or more. +Starting from 35, the loop should: + - Iteration 1: 35 -> 45 + - Iteration 2: 45 -> 55 (>= 50, end condition met) +""" + +from agno.workflow import Loop, Step, Workflow +from agno.workflow.types import StepInput, StepOutput + + +def increment_executor(step_input: StepInput) -> StepOutput: + """Increment the previous step's numeric content by 10.""" + last_content = step_input.get_last_step_content() + if last_content and last_content.isdigit(): + new_value = int(last_content) + 10 + return StepOutput(content=str(new_value)) + return StepOutput(content="0") + + +workflow = Workflow( + name="Iterative Accumulation Workflow", + description="Demonstrates loop iterations carrying forward output from previous iterations.", + steps=[ + Step( + name="Initial Value", + description="Pass through the initial input value.", + executor=lambda step_input: StepOutput(content=step_input.input), + ), + Loop( + name="Increment Loop", + description="Increment value by 10 each iteration until it reaches 50.", + steps=[ + Step( + name="Increment Step", + description="Add 10 to the current value.", + executor=increment_executor, + ) + ], + end_condition=lambda step_outputs: int(step_outputs[-1].content) >= 50, + max_iterations=10, + forward_iteration_output=True, + ), + ], +) + +if __name__ == "__main__": + workflow.print_response("35") diff --git a/libs/agno/agno/workflow/loop.py b/libs/agno/agno/workflow/loop.py index 2caddad75d..4f29dee2a2 100644 --- a/libs/agno/agno/workflow/loop.py +++ b/libs/agno/agno/workflow/loop.py @@ -73,6 +73,10 @@ class Loop: max_iterations: int = 3 # Default to 3 end_condition: Optional[Union[Callable[[List[StepOutput]], bool], str]] = None + # If True, the output of each iteration is forwarded as input to the next iteration. + # When False (default), each iteration receives the original step input. + forward_iteration_output: bool = False + # HITL configuration - start confirmation # If True, the loop will pause before the first iteration and require user confirmation requires_confirmation: bool = False @@ -86,6 +90,7 @@ def __init__( description: Optional[str] = None, max_iterations: int = 3, end_condition: Optional[Union[Callable[[List[StepOutput]], bool], str]] = None, + forward_iteration_output: bool = False, requires_confirmation: bool = False, confirmation_message: Optional[str] = None, on_reject: Union[OnReject, str] = OnReject.skip, @@ -95,6 +100,7 @@ def __init__( self.description = description self.max_iterations = max_iterations self.end_condition = end_condition + self.forward_iteration_output = forward_iteration_output self.requires_confirmation = requires_confirmation self.confirmation_message = confirmation_message self.on_reject = on_reject @@ -120,6 +126,8 @@ def to_dict(self) -> Dict[str, Any]: else: raise ValueError(f"Invalid end_condition type: {type(self.end_condition).__name__}") + result["forward_iteration_output"] = self.forward_iteration_output + # Add HITL fields result["requires_confirmation"] = self.requires_confirmation result["confirmation_message"] = self.confirmation_message @@ -207,6 +215,7 @@ def deserialize_step(step_data: Dict[str, Any]) -> Any: steps=[deserialize_step(step) for step in data.get("steps", [])], max_iterations=data.get("max_iterations", 3), end_condition=end_condition, + forward_iteration_output=data.get("forward_iteration_output", False), requires_confirmation=data.get("requires_confirmation", False), confirmation_message=data.get("confirmation_message"), on_reject=data.get("on_reject", OnReject.skip), @@ -423,6 +432,10 @@ def execute( log_debug(f"Loop ending early due to step termination request at iteration {iteration}") break + # Carry forward output to next iteration + if self.forward_iteration_output: + step_input = current_step_input + log_debug(f"Loop End: {self.name} ({iteration} iterations)", center=True, symbol="=") # Return flattened results from all iterations @@ -604,6 +617,10 @@ def execute_stream( log_debug(f"Loop ending early at iteration {iteration}") break + # Carry forward output to next iteration + if self.forward_iteration_output: + step_input = current_step_input + log_debug(f"Loop End: {self.name} ({iteration} iterations)", center=True, symbol="=") if stream_events and workflow_run_response: @@ -724,6 +741,10 @@ async def aexecute( log_debug(f"Loop ending early due to step termination request at iteration {iteration}") break + # Carry forward output to next iteration + if self.forward_iteration_output: + step_input = current_step_input + log_debug(f"Async Loop End: {self.name} ({iteration} iterations)", center=True, symbol="=") # Return flattened results from all iterations @@ -905,6 +926,10 @@ async def aexecute_stream( log_debug(f"Loop ending early at iteration {iteration}") break + # Carry forward output to next iteration + if self.forward_iteration_output: + step_input = current_step_input + log_debug(f"Loop End: {self.name} ({iteration} iterations)", center=True, symbol="=") if stream_events and workflow_run_response: diff --git a/libs/agno/tests/integration/workflows/test_loop_steps.py b/libs/agno/tests/integration/workflows/test_loop_steps.py index 02a01e970f..505f095419 100644 --- a/libs/agno/tests/integration/workflows/test_loop_steps.py +++ b/libs/agno/tests/integration/workflows/test_loop_steps.py @@ -781,3 +781,219 @@ def test_cel_loop_compound_condition(): assert isinstance(result, StepOutput) assert len(result.steps) == 2 assert all(s.success for s in result.steps) + + +# ============================================================================ +# ITERATION CARRY-FORWARD TESTS +# ============================================================================ + + +def _increment_step(step_input: StepInput) -> StepOutput: + """Increment the previous step's numeric content by 10.""" + last_content = step_input.get_last_step_content() + if last_content and last_content.isdigit(): + new_value = int(last_content) + 10 + return StepOutput(step_name="increment", content=str(new_value), success=True) + return StepOutput(step_name="increment", content="0", success=True) + + +def _make_loop_step_input(value: str) -> StepInput: + """Create a StepInput with previous_step_outputs set (as the workflow does).""" + return StepInput( + input=value, + previous_step_content=value, + previous_step_outputs={"prev": StepOutput(content=value)}, + ) + + +def test_loop_carries_forward_output_between_iterations(): + """Test that each loop iteration receives the output from the previous iteration.""" + loop = Loop( + name="Carry Forward Loop", + steps=[_increment_step], + end_condition=lambda outputs: int(outputs[-1].content) >= 50, + max_iterations=10, + forward_iteration_output=True, + ) + step_input = _make_loop_step_input("35") + + result = loop.execute(step_input) + + assert isinstance(result, StepOutput) + # 35 -> 45 -> 55 (>= 50, stop). Should take exactly 2 iterations. + assert len(result.steps) == 2 + assert result.steps[0].content == "45" + assert result.steps[1].content == "55" + + +@pytest.mark.asyncio +async def test_loop_carries_forward_output_between_iterations_async(): + """Test that async loop iterations carry forward output.""" + loop = Loop( + name="Async Carry Forward Loop", + steps=[_increment_step], + end_condition=lambda outputs: int(outputs[-1].content) >= 50, + max_iterations=10, + forward_iteration_output=True, + ) + step_input = _make_loop_step_input("35") + + result = await loop.aexecute(step_input) + + assert isinstance(result, StepOutput) + assert len(result.steps) == 2 + assert result.steps[0].content == "45" + assert result.steps[1].content == "55" + + +def test_loop_carries_forward_output_stream(): + """Test that streaming loop iterations carry forward output.""" + from agno.run.workflow import WorkflowRunOutput + + loop = Loop( + name="Stream Carry Forward Loop", + steps=[_increment_step], + end_condition=lambda outputs: int(outputs[-1].content) >= 50, + max_iterations=10, + forward_iteration_output=True, + ) + step_input = _make_loop_step_input("35") + + mock_response = WorkflowRunOutput( + run_id="test-run", + workflow_name="test-workflow", + workflow_id="test-id", + session_id="test-session", + content="", + ) + + events = list(loop.execute_stream(step_input, workflow_run_response=mock_response, stream_events=True)) + + # The final event is the Loop StepOutput containing nested step results + loop_outputs = [e for e in events if isinstance(e, StepOutput)] + assert len(loop_outputs) == 1 + loop_output = loop_outputs[0] + assert len(loop_output.steps) == 2 + assert loop_output.steps[0].content == "45" + assert loop_output.steps[1].content == "55" + + +@pytest.mark.asyncio +async def test_loop_carries_forward_output_async_stream(): + """Test that async streaming loop iterations carry forward output.""" + from agno.run.workflow import WorkflowRunOutput + + loop = Loop( + name="Async Stream Carry Forward Loop", + steps=[_increment_step], + end_condition=lambda outputs: int(outputs[-1].content) >= 50, + max_iterations=10, + forward_iteration_output=True, + ) + step_input = _make_loop_step_input("35") + + mock_response = WorkflowRunOutput( + run_id="test-run", + workflow_name="test-workflow", + workflow_id="test-id", + session_id="test-session", + content="", + ) + + events = [] + async for event in loop.aexecute_stream(step_input, workflow_run_response=mock_response, stream_events=True): + events.append(event) + + # The final event is the Loop StepOutput containing nested step results + loop_outputs = [e for e in events if isinstance(e, StepOutput)] + assert len(loop_outputs) == 1 + loop_output = loop_outputs[0] + assert len(loop_output.steps) == 2 + assert loop_output.steps[0].content == "45" + assert loop_output.steps[1].content == "55" + + +def test_loop_carry_forward_in_workflow(shared_db): + """Test loop carry-forward within a full workflow with an initial step feeding into a loop.""" + + def initial_step(step_input: StepInput) -> StepOutput: + """Pass through the input value.""" + return StepOutput(step_name="initial", content=step_input.input, success=True) + + workflow = Workflow( + name="Carry Forward Workflow", + db=shared_db, + steps=[ + initial_step, + Loop( + name="Increment Loop", + steps=[_increment_step], + end_condition=lambda outputs: int(outputs[-1].content) >= 50, + max_iterations=10, + forward_iteration_output=True, + ), + ], + ) + + response = workflow.run(input="20") + + assert isinstance(response, WorkflowRunOutput) + assert len(response.step_results) == 2 + + # First step passes through: "20" + assert response.step_results[0].content == "20" + + # Loop: 20 -> 30 -> 40 -> 50 (3 iterations) + loop_output = response.step_results[1] + assert len(loop_output.steps) == 3 + assert loop_output.steps[0].content == "30" + assert loop_output.steps[1].content == "40" + assert loop_output.steps[2].content == "50" + + +def test_loop_carry_forward_multi_step_iteration(): + """Test carry-forward with multiple steps per iteration. + + Verifies that multi-step loops carry forward output across iterations. + Each iteration's step_a receives the last output from the previous iteration's step_b. + """ + contents = [] + + def step_a(step_input: StepInput) -> StepOutput: + last = step_input.get_last_step_content() or "0" + value = int(last) + 1 + contents.append(("a", value)) + return StepOutput(step_name="step_a", content=str(value), success=True) + + def step_b(step_input: StepInput) -> StepOutput: + # Within an iteration, step_b receives step_a's output via previous_step_content + last = step_input.previous_step_content or "0" + value = int(last) * 2 + contents.append(("b", value)) + return StepOutput(step_name="step_b", content=str(value), success=True) + + loop = Loop( + name="Multi Step Carry Forward", + steps=[step_a, step_b], + max_iterations=3, + forward_iteration_output=True, + ) + step_input = _make_loop_step_input("1") + + result = loop.execute(step_input) + + assert isinstance(result, StepOutput) + assert len(result.steps) == 6 # 2 steps x 3 iterations + + # Verify iteration 1 step_a sees "1" from initial input + assert contents[0] == ("a", 2) # 1 + 1 = 2 + assert contents[1] == ("b", 4) # 2 * 2 = 4 + + # Verify iteration 2 step_a sees carry-forward from iteration 1 (not the original input) + # step_a gets "4" (step_b's output from iteration 1), NOT "1" (original input) + assert contents[2][0] == "a" + assert contents[2][1] != 2, "step_a should not see the original input in iteration 2" + + # Verify iteration 3 step_a also sees carry-forward + assert contents[4][0] == "a" + assert contents[4][1] != 2, "step_a should not see the original input in iteration 3"