Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""
Loop Iterative Accumulation
============================

Demonstrates that Loop iterations carry forward the output from the previous iteration.
Each iteration receives the previous iteration's output via `step_input.get_last_step_content()`,
enabling iterative processing patterns like accumulation, refinement, and convergence.

This example increments a numeric value by 10 each iteration, stopping when it reaches 50 or more.
Starting from 35, the loop should:
- Iteration 1: 35 -> 45
- Iteration 2: 45 -> 55 (>= 50, end condition met)
"""

from agno.workflow import Loop, Step, Workflow
from agno.workflow.types import StepInput, StepOutput


def increment_executor(step_input: StepInput) -> StepOutput:
"""Increment the previous step's numeric content by 10."""
last_content = step_input.get_last_step_content()
if last_content and last_content.isdigit():
new_value = int(last_content) + 10
return StepOutput(content=str(new_value))
return StepOutput(content="0")


workflow = Workflow(
name="Iterative Accumulation Workflow",
description="Demonstrates loop iterations carrying forward output from previous iterations.",
steps=[
Step(
name="Initial Value",
description="Pass through the initial input value.",
executor=lambda step_input: StepOutput(content=step_input.input),
),
Loop(
name="Increment Loop",
description="Increment value by 10 each iteration until it reaches 50.",
steps=[
Step(
name="Increment Step",
description="Add 10 to the current value.",
executor=increment_executor,
)
],
end_condition=lambda step_outputs: int(step_outputs[-1].content) >= 50,
max_iterations=10,
),
],
)

if __name__ == "__main__":
workflow.print_response("35")
12 changes: 12 additions & 0 deletions libs/agno/agno/workflow/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,9 @@ def execute(
log_debug(f"Loop ending early due to step termination request at iteration {iteration}")
break

# Carry forward output to next iteration
step_input = current_step_input

log_debug(f"Loop End: {self.name} ({iteration} iterations)", center=True, symbol="=")

# Return flattened results from all iterations
Expand Down Expand Up @@ -604,6 +607,9 @@ def execute_stream(
log_debug(f"Loop ending early at iteration {iteration}")
break

# Carry forward output to next iteration
step_input = current_step_input

log_debug(f"Loop End: {self.name} ({iteration} iterations)", center=True, symbol="=")

if stream_events and workflow_run_response:
Expand Down Expand Up @@ -724,6 +730,9 @@ async def aexecute(
log_debug(f"Loop ending early due to step termination request at iteration {iteration}")
break

# Carry forward output to next iteration
step_input = current_step_input

log_debug(f"Async Loop End: {self.name} ({iteration} iterations)", center=True, symbol="=")

# Return flattened results from all iterations
Expand Down Expand Up @@ -905,6 +914,9 @@ async def aexecute_stream(
log_debug(f"Loop ending early at iteration {iteration}")
break

# Carry forward output to next iteration
step_input = current_step_input

log_debug(f"Loop End: {self.name} ({iteration} iterations)", center=True, symbol="=")

if stream_events and workflow_run_response:
Expand Down
210 changes: 210 additions & 0 deletions libs/agno/tests/integration/workflows/test_loop_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,3 +781,213 @@ def test_cel_loop_compound_condition():
assert isinstance(result, StepOutput)
assert len(result.steps) == 2
assert all(s.success for s in result.steps)


# ============================================================================
# ITERATION CARRY-FORWARD TESTS
# ============================================================================


def _increment_step(step_input: StepInput) -> StepOutput:
"""Increment the previous step's numeric content by 10."""
last_content = step_input.get_last_step_content()
if last_content and last_content.isdigit():
new_value = int(last_content) + 10
return StepOutput(step_name="increment", content=str(new_value), success=True)
return StepOutput(step_name="increment", content="0", success=True)


def _make_loop_step_input(value: str) -> StepInput:
"""Create a StepInput with previous_step_outputs set (as the workflow does)."""
return StepInput(
input=value,
previous_step_content=value,
previous_step_outputs={"prev": StepOutput(content=value)},
)


def test_loop_carries_forward_output_between_iterations():
"""Test that each loop iteration receives the output from the previous iteration."""
loop = Loop(
name="Carry Forward Loop",
steps=[_increment_step],
end_condition=lambda outputs: int(outputs[-1].content) >= 50,
max_iterations=10,
)
step_input = _make_loop_step_input("35")

result = loop.execute(step_input)

assert isinstance(result, StepOutput)
# 35 -> 45 -> 55 (>= 50, stop). Should take exactly 2 iterations.
assert len(result.steps) == 2
assert result.steps[0].content == "45"
assert result.steps[1].content == "55"


@pytest.mark.asyncio
async def test_loop_carries_forward_output_between_iterations_async():
"""Test that async loop iterations carry forward output."""
loop = Loop(
name="Async Carry Forward Loop",
steps=[_increment_step],
end_condition=lambda outputs: int(outputs[-1].content) >= 50,
max_iterations=10,
)
step_input = _make_loop_step_input("35")

result = await loop.aexecute(step_input)

assert isinstance(result, StepOutput)
assert len(result.steps) == 2
assert result.steps[0].content == "45"
assert result.steps[1].content == "55"


def test_loop_carries_forward_output_stream():
"""Test that streaming loop iterations carry forward output."""
from agno.run.workflow import WorkflowRunOutput

loop = Loop(
name="Stream Carry Forward Loop",
steps=[_increment_step],
end_condition=lambda outputs: int(outputs[-1].content) >= 50,
max_iterations=10,
)
step_input = _make_loop_step_input("35")

mock_response = WorkflowRunOutput(
run_id="test-run",
workflow_name="test-workflow",
workflow_id="test-id",
session_id="test-session",
content="",
)

events = list(loop.execute_stream(step_input, workflow_run_response=mock_response, stream_events=True))

# The final event is the Loop StepOutput containing nested step results
loop_outputs = [e for e in events if isinstance(e, StepOutput)]
assert len(loop_outputs) == 1
loop_output = loop_outputs[0]
assert len(loop_output.steps) == 2
assert loop_output.steps[0].content == "45"
assert loop_output.steps[1].content == "55"


@pytest.mark.asyncio
async def test_loop_carries_forward_output_async_stream():
"""Test that async streaming loop iterations carry forward output."""
from agno.run.workflow import WorkflowRunOutput

loop = Loop(
name="Async Stream Carry Forward Loop",
steps=[_increment_step],
end_condition=lambda outputs: int(outputs[-1].content) >= 50,
max_iterations=10,
)
step_input = _make_loop_step_input("35")

mock_response = WorkflowRunOutput(
run_id="test-run",
workflow_name="test-workflow",
workflow_id="test-id",
session_id="test-session",
content="",
)

events = []
async for event in loop.aexecute_stream(step_input, workflow_run_response=mock_response, stream_events=True):
events.append(event)

# The final event is the Loop StepOutput containing nested step results
loop_outputs = [e for e in events if isinstance(e, StepOutput)]
assert len(loop_outputs) == 1
loop_output = loop_outputs[0]
assert len(loop_output.steps) == 2
assert loop_output.steps[0].content == "45"
assert loop_output.steps[1].content == "55"


def test_loop_carry_forward_in_workflow(shared_db):
"""Test loop carry-forward within a full workflow with an initial step feeding into a loop."""

def initial_step(step_input: StepInput) -> StepOutput:
"""Pass through the input value."""
return StepOutput(step_name="initial", content=step_input.input, success=True)

workflow = Workflow(
name="Carry Forward Workflow",
db=shared_db,
steps=[
initial_step,
Loop(
name="Increment Loop",
steps=[_increment_step],
end_condition=lambda outputs: int(outputs[-1].content) >= 50,
max_iterations=10,
),
],
)

response = workflow.run(input="20")

assert isinstance(response, WorkflowRunOutput)
assert len(response.step_results) == 2

# First step passes through: "20"
assert response.step_results[0].content == "20"

# Loop: 20 -> 30 -> 40 -> 50 (3 iterations)
loop_output = response.step_results[1]
assert len(loop_output.steps) == 3
assert loop_output.steps[0].content == "30"
assert loop_output.steps[1].content == "40"
assert loop_output.steps[2].content == "50"


def test_loop_carry_forward_multi_step_iteration():
"""Test carry-forward with multiple steps per iteration.

Verifies that multi-step loops carry forward output across iterations.
Each iteration's step_a receives the last output from the previous iteration's step_b.
"""
contents = []

def step_a(step_input: StepInput) -> StepOutput:
last = step_input.get_last_step_content() or "0"
value = int(last) + 1
contents.append(("a", value))
return StepOutput(step_name="step_a", content=str(value), success=True)

def step_b(step_input: StepInput) -> StepOutput:
# Within an iteration, step_b receives step_a's output via previous_step_content
last = step_input.previous_step_content or "0"
value = int(last) * 2
contents.append(("b", value))
return StepOutput(step_name="step_b", content=str(value), success=True)

loop = Loop(
name="Multi Step Carry Forward",
steps=[step_a, step_b],
max_iterations=3,
)
step_input = _make_loop_step_input("1")

result = loop.execute(step_input)

assert isinstance(result, StepOutput)
assert len(result.steps) == 6 # 2 steps x 3 iterations

# Verify iteration 1 step_a sees "1" from initial input
assert contents[0] == ("a", 2) # 1 + 1 = 2
assert contents[1] == ("b", 4) # 2 * 2 = 4

# Verify iteration 2 step_a sees carry-forward from iteration 1 (not the original input)
# step_a gets "4" (step_b's output from iteration 1), NOT "1" (original input)
assert contents[2][0] == "a"
assert contents[2][1] != 2, "step_a should not see the original input in iteration 2"

# Verify iteration 3 step_a also sees carry-forward
assert contents[4][0] == "a"
assert contents[4][1] != 2, "step_a should not see the original input in iteration 3"