Skip to content

Commit 0907d89

Browse files
authored
docs: new page that documents pipeline loops (#10134)
* loops docs * improvements * whitespaces * shorten sentence * update v2.20 * improvements from the review
1 parent 4d9c9bd commit 0907d89

File tree

9 files changed

+517
-7
lines changed

9 files changed

+517
-7
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ repos:
1414
- id: mixed-line-ending # normalizes line endings
1515
- id: no-commit-to-branch # prevents committing to main
1616
- id: trailing-whitespace # trims trailing whitespace
17-
args: [--markdown-linebreak-ext=md]
17+
args: [--markdown-linebreak-ext=md, --markdown-linebreak-ext=mdx]
1818

1919
- repo: https://github.com/astral-sh/ruff-pre-commit
2020
rev: v0.13.0

docs-website/docs/concepts/pipelines.mdx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ A pipeline can have multiple branches that process data concurrently. For exampl
3636
### Loops
3737

3838
Components in a pipeline can work in iterative loops, which you can cap at a desired number. This can be handy for scenarios like self-correcting loops, where you have a generator producing some output and then a validator component to check if the output is correct. If the generator's output has errors, the validator component can loop back to the generator for a corrected output. The loop goes on until the output passes the validation and can be sent further down the pipeline.
39+
40+
See [Pipeline Loops](pipelines/pipeline-loops.mdx) for a deeper explanation of how loops are executed, how they terminate, and how to use them safely.
41+
3942
<ClickableImage src="/img/2390eea-Pipeline_Illustrations_1_2.png" alt="Pipeline architecture diagram illustrating a feedback loop where output from later components loops back to earlier components" size="large" />
4043

4144
### Async Pipelines
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
---
2+
title: "Pipeline Loops"
3+
id: pipeline-loops
4+
slug: "/pipeline-loops"
5+
description: "Understand how loops work in Haystack pipelines, how they terminate, and how to use them safely for feedback and self-correction."
6+
---
7+
8+
# Pipeline Loops
9+
10+
Learn how loops work in Haystack pipelines, how they terminate, and how to use them for feedback and self-correction.
11+
12+
Haystack pipelines support **loops**: cycles in the component graph where the output of a later component is fed back into an earlier one.
13+
This enables feedback flows such as self-correction, validation, or iterative refinement, as well as more advanced [agentic behavior](../pipelines.mdx#agentic-pipelines).
14+
15+
At runtime, the pipeline re-runs a component whenever all of its required inputs are ready again.
16+
You control when loops stop either by designing your graph and routing logic carefully or by using built-in [safety limits](#loop-termination-and-safety-limits).
17+
18+
## Multiple Runs of the Same Component
19+
20+
If a component participates in a loop, it can be run multiple times within a single `Pipeline.run()` call.
21+
The pipeline keeps an internal visit counter for each component:
22+
23+
- Each time the component runs, its visit count increases by 1.
24+
- You can use this visit count in debugging tools like [breakpoints](./pipeline-breakpoints.mdx) to inspect specific iterations of a loop.
25+
26+
In the final pipeline result:
27+
28+
- For each component that ran, the pipeline returns **only the last-produced output**.
29+
- To capture outputs from intermediate components (for example, a validator or a router) in the final result dictionary, use the `include_outputs_from` argument of `Pipeline.run()`.
30+
31+
## Loop Termination and Safety Limits
32+
33+
Loops must eventually stop so that a pipeline run can complete.
34+
There are two main ways a loop ends:
35+
36+
1. **Natural completion**: No more components are runnable
37+
The pipeline finishes when the work queue is empty and no component can run again (for example, the router stops feeding inputs back into the loop).
38+
39+
2. **Reaching the maximum run count**
40+
Every pipeline has a per-component run limit, controlled by the `max_runs_per_component` parameter of the `Pipeline` (or `AsyncPipeline`) constructor, which is `100` by default. If any component exceeds this limit, Haystack raises a `PipelineMaxComponentRuns` error.
41+
42+
You can set this limit to a lower value:
43+
44+
```python
45+
from haystack import Pipeline
46+
47+
pipe = Pipeline(max_runs_per_component=5)
48+
```
49+
50+
The limit is checked before each execution, so a component with a limit of 3 will complete 3 runs successfully before the error is raised on the 4th attempt.
51+
52+
This safeguard is especially important when experimenting with new loops or complex routing logic.
53+
If your loop condition is wrong or never satisfied, the error prevents the pipeline from running indefinitely.
54+
55+
## Example: Feedback Loop for Self-Correction
56+
57+
The following example shows a simple feedback loop where:
58+
59+
- A `ChatPromptBuilder` creates a prompt that includes previous incorrect replies.
60+
- An `OpenAIChatGenerator` produces an answer.
61+
- A `ConditionalRouter` checks if the answer is correct:
62+
- If correct, it sends the answer to `final_answer` and the loop ends.
63+
- If incorrect, it sends the answer back to the `ChatPromptBuilder`, which triggers another iteration.
64+
65+
```python
66+
from haystack import Pipeline
67+
from haystack.components.builders import ChatPromptBuilder
68+
from haystack.components.generators.chat import OpenAIChatGenerator
69+
from haystack.components.routers import ConditionalRouter
70+
from haystack.dataclasses import ChatMessage
71+
72+
template = [
73+
ChatMessage.from_system("Answer the following question concisely with just the answer, no punctuation."),
74+
ChatMessage.from_user(
75+
"{% if previous_replies %}"
76+
"Previously you replied incorrectly: {{ previous_replies[0].text }}\n"
77+
"{% endif %}"
78+
"Question: {{ query }}"
79+
),
80+
]
81+
82+
prompt_builder = ChatPromptBuilder(template=template, required_variables=["query"])
83+
generator = OpenAIChatGenerator()
84+
85+
router = ConditionalRouter(
86+
routes=[
87+
{
88+
# End the loop when the answer is correct
89+
"condition": "{{ 'Rome' in replies[0].text }}",
90+
"output": "{{ replies }}",
91+
"output_name": "final_answer",
92+
"output_type": list[ChatMessage],
93+
},
94+
{
95+
# Loop back when the answer is incorrect
96+
"condition": "{{ 'Rome' not in replies[0].text }}",
97+
"output": "{{ replies }}",
98+
"output_name": "previous_replies",
99+
"output_type": list[ChatMessage],
100+
},
101+
],
102+
unsafe=True, # Required to handle ChatMessage objects
103+
)
104+
105+
pipe = Pipeline(max_runs_per_component=3)
106+
107+
pipe.add_component("prompt_builder", prompt_builder)
108+
pipe.add_component("generator", generator)
109+
pipe.add_component("router", router)
110+
111+
pipe.connect("prompt_builder.prompt", "generator.messages")
112+
pipe.connect("generator.replies", "router.replies")
113+
pipe.connect("router.previous_replies", "prompt_builder.previous_replies")
114+
115+
result = pipe.run(
116+
{
117+
"prompt_builder": {
118+
"query": "What is the capital of Italy? If the statement 'Previously you replied incorrectly:' is missing "
119+
"above then answer with Milan.",
120+
}
121+
},
122+
include_outputs_from={"router", "prompt_builder"},
123+
)
124+
125+
print(result["prompt_builder"]["prompt"][1].text) # Shows the last prompt used
126+
print(result["router"]["final_answer"][0].text) # Rome
127+
```
128+
129+
### What Happens During This Loop
130+
131+
1. **First iteration**
132+
- `prompt_builder` runs with `query="What is the capital of Italy?"` and no previous replies.
133+
- `generator` returns a `ChatMessage` with the LLM's answer.
134+
- The router evaluates its conditions and checks if `"Rome"` is in the reply.
135+
- If the answer is incorrect, `previous_replies` is fed back into `prompt_builder.previous_replies`.
136+
137+
2. **Subsequent iterations** (if needed)
138+
- `prompt_builder` runs again, now including the previous incorrect reply in the user message.
139+
- `generator` produces a new answer with the additional context.
140+
- The router checks again whether the answer contains `"Rome"`.
141+
142+
3. **Termination**
143+
- When the router routes to `final_answer`, no more inputs are fed back into the loop.
144+
- The queue empties and the pipeline run finishes successfully.
145+
146+
Because we used `max_runs_per_component=3`, any unexpected behavior that causes the loop to continue would raise a `PipelineMaxComponentRuns` error instead of looping forever.
147+
148+
## Components for Building Loops
149+
150+
Two components are particularly useful for building loops:
151+
152+
- **[`ConditionalRouter`](../../pipeline-components/routers/conditionalrouter.mdx)**: Routes data to different outputs based on conditions. Use it to decide whether to exit the loop or continue iterating. The example above uses this pattern.
153+
154+
- **[`BranchJoiner`](../../pipeline-components/joiners/branchjoiner.mdx)**: Merges inputs from multiple sources into a single output. Use it when a component inside the loop needs to receive both the initial input (on the first iteration) and looped-back values (on subsequent iterations). For example, you might use `BranchJoiner` to feed both user input and validation errors into the same Generator. See the [BranchJoiner documentation](../../pipeline-components/joiners/branchjoiner.mdx#enabling-loops) for a complete loop example.
155+
156+
## Greedy vs. Lazy Variadic Sockets in Loops
157+
158+
Some components support variadic inputs that can receive multiple values on a single socket.
159+
In loops, variadic behavior controls how inputs are consumed across iterations.
160+
161+
- **Greedy variadic sockets**
162+
Consume exactly one value at a time and remove it after the component runs.
163+
This includes user-provided inputs, which prevents them from retriggering the component indefinitely.
164+
Most variadic sockets are greedy by default.
165+
166+
- **Lazy variadic sockets**
167+
Accumulate all values received from predecessors across iterations.
168+
Useful when you need to collect multiple partial results over time (for example, gathering outputs from several loop iterations before proceeding).
169+
170+
For most loop scenarios it's sufficient to just connect components as usual and use `max_runs_per_component` to protect against mistakes.
171+
172+
## Troubleshooting Loops
173+
174+
If your pipeline seems stuck or runs longer than expected, here are common causes and how to debug them.
175+
176+
### Common Causes of Infinite Loops
177+
178+
1. **Condition never satisfied**: Your exit condition (for example, `"Rome" in reply`) might never be true due to LLM behavior or data issues. Always set a reasonable `max_runs_per_component` as a safety net.
179+
180+
2. **Relying on optional outputs**: When a component has multiple output sockets but only returns some of them, the unreturned outputs don't trigger their downstream connections. This can cause confusion in loops.
181+
182+
For example, this pattern can be problematic:
183+
184+
```python
185+
@component
186+
class Validator:
187+
@component.output_types(valid=str, invalid=Optional[str])
188+
def run(self, text: str):
189+
if is_valid(text):
190+
return {"valid": text} # "invalid" is never returned
191+
else:
192+
return {"invalid": text}
193+
```
194+
195+
If you connect `invalid` back to an upstream component for retry, but also have other connections that keep the loop alive, you might get unexpected behavior.
196+
197+
Instead, use a `ConditionalRouter` with explicit, mutually exclusive conditions:
198+
199+
```python
200+
router = ConditionalRouter(
201+
routes=[
202+
{"condition": "{{ is_valid }}", "output": "{{ text }}", "output_name": "valid", ...},
203+
{"condition": "{{ not is_valid }}", "output": "{{ text }}", "output_name": "invalid", ...},
204+
]
205+
)
206+
```
207+
208+
3. **User inputs retriggering the loop**: If a user-provided input is connected to a socket inside the loop, it might cause the loop to restart unexpectedly.
209+
210+
```python
211+
# Problematic: user input goes directly to a component inside the loop
212+
result = pipe.run({
213+
"generator": {"prompt": query}, # This input persists and may retrigger the loop
214+
})
215+
216+
# Better: use an entry-point component outside the loop
217+
result = pipe.run({
218+
"prompt_builder": {"query": query}, # Entry point feeds into the loop once
219+
})
220+
```
221+
222+
See [Greedy vs. Lazy Variadic Sockets](#greedy-vs-lazy-variadic-sockets-in-loops) for details on how inputs are consumed.
223+
224+
4. **Multiple paths feeding the same component**: If a component inside the loop receives inputs from multiple sources, it runs whenever *any* path provides input.
225+
226+
```python
227+
# Component receives from two sources – runs when either provides input
228+
pipe.connect("source_a.output", "processor.input")
229+
pipe.connect("source_b.output", "processor.input") # Variadic input
230+
```
231+
232+
Ensure you understand when each path produces output, or use `BranchJoiner` to explicitly control the merge point.
233+
234+
### Debugging Tips
235+
236+
1. **Start with a low limit**: When developing loops, set `max_runs_per_component=3` or similar. This helps you catch issues early with a clear error instead of waiting for a timeout.
237+
238+
2. **Use `include_outputs_from`**: Add intermediate components (like your router) to see what's happening at each step:
239+
```python
240+
result = pipe.run(data, include_outputs_from={"router", "validator"})
241+
```
242+
243+
3. **Enable tracing**: Use tracing to see every component execution, including inputs and outputs. This makes it easy to follow each iteration of the loop. For quick debugging, use `LoggingTracer` ([setup instructions](./debugging-pipelines.mdx#real-time-pipeline-logging)). For deeper analysis, integrate with tools like Langfuse or other [tracing backends](../../development/tracing.mdx).
244+
245+
4. **Visualize the pipeline**: Use `pipe.draw()` or `pipe.show()` to see the graph structure and verify your connections are correct. See the [Pipeline Visualization](./visualizing-pipelines.mdx) documentation for details.
246+
247+
5. **Use breakpoints**: Set a `Breakpoint` on a specific component and visit count to inspect the state at that iteration. See [Pipeline Breakpoints](./pipeline-breakpoints.mdx) for details.
248+
249+
6. **Check for blocked pipelines**: If you see a `PipelineComponentsBlockedError`, it means no components can run. This typically indicates a missing connection or a circular dependency. Check that all required inputs are provided.
250+
251+
By combining careful graph design, per-component run limits, and these debugging tools, you can build robust feedback loops in your Haystack pipelines.

docs-website/docs/concepts/pipelines/serialization.mdx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ print(pipe.dumps())
3030
##
3131
## components: {}
3232
## connections: []
33-
## max_loops_allowed: 100
33+
## max_runs_per_component: 100
3434
## metadata: {}
3535
```
3636

@@ -72,7 +72,7 @@ components:
7272
connections:
7373
- receiver: cleaner.documents
7474
sender: converter.documents
75-
max_loops_allowed: 100
75+
max_runs_per_component: 100
7676
metadata: {}
7777
"""
7878

@@ -176,7 +176,7 @@ from my_custom_marshallers import TomlMarshaller
176176
pipe = Pipeline()
177177
pipe.dumps(TomlMarshaller())
178178
## prints:
179-
## 'max_loops_allowed = 100\nconnections = []\n\n[metadata]\n\n[components]\n'
179+
## 'max_runs_per_component = 100\nconnections = []\n\n[metadata]\n\n[components]\n'
180180
```
181181

182182
## Additional References

docs-website/sidebars.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ export default {
6363
'concepts/pipelines/visualizing-pipelines',
6464
'concepts/pipelines/debugging-pipelines',
6565
'concepts/pipelines/pipeline-breakpoints',
66+
'concepts/pipelines/pipeline-loops',
6667
'concepts/pipelines/pipeline-templates',
6768
'concepts/pipelines/asyncpipeline',
6869
],

docs-website/versioned_docs/version-2.20/concepts/pipelines.mdx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ A pipeline can have multiple branches that process data concurrently. For exampl
3636
### Loops
3737

3838
Components in a pipeline can work in iterative loops, which you can cap at a desired number. This can be handy for scenarios like self-correcting loops, where you have a generator producing some output and then a validator component to check if the output is correct. If the generator's output has errors, the validator component can loop back to the generator for a corrected output. The loop goes on until the output passes the validation and can be sent further down the pipeline.
39+
40+
See [Pipeline Loops](pipelines/pipeline-loops.mdx) for a deeper explanation of how loops are executed, how they terminate, and how to use them safely.
41+
3942
<ClickableImage src="/img/2390eea-Pipeline_Illustrations_1_2.png" alt="Pipeline architecture diagram illustrating a feedback loop where output from later components loops back to earlier components" size="large" />
4043

4144
### Async Pipelines

0 commit comments

Comments
 (0)