Skip to content

Commit 7926933

Browse files
mrchtrsjrl
authored andcommitted
fix: include components with consumed outputs in pipeline results (#10065)
* include components with consumed outputs in pipeline results * update test * update test * release notes * Add async pipeline test and update more checks in async pipeline --------- Co-authored-by: Sebastian Husch Lee <[email protected]>
1 parent 1e4763b commit 7926933

File tree

5 files changed

+144
-5
lines changed

5 files changed

+144
-5
lines changed

haystack/core/pipeline/async_pipeline.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,11 +281,11 @@ async def _run_highest_in_isolation(component_name: str) -> AsyncIterator[dict[s
281281
receivers=cached_receivers[component_name],
282282
include_outputs_from=include_outputs_from,
283283
)
284-
if pruned:
284+
if pruned or component_name in include_outputs_from:
285285
pipeline_outputs[component_name] = pruned
286286

287287
scheduled_components.remove(component_name)
288-
if pruned:
288+
if pruned or component_name in include_outputs_from:
289289
yield {component_name: _deepcopy_with_exceptions(pruned)}
290290

291291
async def _schedule_task(component_name: str) -> None:
@@ -329,7 +329,7 @@ async def _runner():
329329
receivers=cached_receivers[component_name],
330330
include_outputs_from=include_outputs_from,
331331
)
332-
if pruned:
332+
if pruned or component_name in include_outputs_from:
333333
pipeline_outputs[component_name] = pruned
334334

335335
scheduled_components.remove(component_name)

haystack/core/pipeline/pipeline.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ def run( # noqa: PLR0915, PLR0912, C901, pylint: disable=too-many-branches
434434
include_outputs_from=include_outputs_from,
435435
)
436436

437-
if component_pipeline_outputs:
437+
if component_pipeline_outputs or component_name in include_outputs_from:
438438
pipeline_outputs[component_name] = deepcopy(component_pipeline_outputs)
439439
if self._is_queue_stale(priority_queue):
440440
priority_queue = self._fill_queue(ordered_component_names, inputs, component_visits)
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
fixes:
3+
- |
4+
Fixed a bug where components explicitly listed in `include_outputs_from` would not appear
5+
in the pipeline results if they returned an empty dictionary.
6+
Now, any component specified in `include_outputs_from` will be included in the results
7+
regardless of whether its output is empty.

test/core/pipeline/test_async_pipeline.py

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
# SPDX-License-Identifier: Apache-2.0
44

55
import asyncio
6+
from typing import Optional
67

78
import pytest
89

9-
from haystack import AsyncPipeline
10+
from haystack import AsyncPipeline, component
1011

1112

1213
def test_async_pipeline_reentrance(waiting_component, spying_tracer):
@@ -43,3 +44,77 @@ async def call_run():
4344

4445
with pytest.raises(RuntimeError, match="Cannot call run\\(\\) from within an async context"):
4546
asyncio.run(call_run())
47+
48+
49+
def test_component_with_empty_dict_as_output_appears_in_results():
50+
"""Test that components that return an empty dict as output appear in results as an empty dict"""
51+
52+
@component
53+
class Producer:
54+
def __init__(self, prefix: str):
55+
self.prefix = prefix
56+
57+
@component.output_types(value=Optional[str])
58+
def run(self, text: Optional[str]):
59+
return {"value": f"{self.prefix}: {text}"}
60+
61+
@component.output_types(value=Optional[str])
62+
async def run_async(self, text: Optional[str]):
63+
return {"value": f"{self.prefix}: {text}"}
64+
65+
@component
66+
class EmptyProcessor:
67+
@component.output_types()
68+
def run(self, sources: list[str]):
69+
# Returns empty dict when sources is empty
70+
return {}
71+
72+
@component.output_types()
73+
async def run_async(self, sources: list[str]):
74+
# Returns empty dict when sources is empty
75+
return {}
76+
77+
@component
78+
class Combiner:
79+
@component.output_types(combined=str)
80+
def run(self, input_a: Optional[str], input_b: Optional[str]):
81+
if input_a is None:
82+
input_a = ""
83+
if input_b is None:
84+
input_b = ""
85+
return {"combined": f"{input_a} | {input_b}"}
86+
87+
@component.output_types(combined=str)
88+
async def run_async(self, input_a: Optional[str], input_b: Optional[str]):
89+
if input_a is None:
90+
input_a = ""
91+
if input_b is None:
92+
input_b = ""
93+
return {"combined": f"{input_a} | {input_b}"}
94+
95+
pp = AsyncPipeline()
96+
pp.add_component("producer_a", Producer("A"))
97+
pp.add_component("producer_b", Producer("B"))
98+
pp.add_component("empty_processor", EmptyProcessor())
99+
pp.add_component("combiner", Combiner())
100+
101+
pp.connect("producer_a.value", "combiner.input_a")
102+
pp.connect("producer_b.value", "combiner.input_b")
103+
104+
result = pp.run(
105+
{"producer_a": {"text": "hello"}, "producer_b": {"text": "world"}, "empty_processor": {"sources": []}},
106+
include_outputs_from={"producer_a", "empty_processor", "combiner"},
107+
)
108+
109+
# Producer A should appear in results because it's in include_outputs_from
110+
assert "producer_a" in result
111+
assert result["producer_a"] == {"value": "A: hello"}
112+
# Producer B should NOT appear since it's not in include_outputs_from
113+
assert "producer_b" not in result
114+
# Combiner should appear in results
115+
assert "combiner" in result
116+
assert result["combiner"] == {"combined": "A: hello | B: world"}
117+
# Empty processor should appear in results even though it returns an empty dict
118+
# because it's in include_outputs_from
119+
assert "empty_processor" in result
120+
assert result["empty_processor"] == {}

test/core/pipeline/test_pipeline.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# SPDX-License-Identifier: Apache-2.0
44

55
from concurrent.futures import ThreadPoolExecutor
6+
from typing import Optional
67

78
import pytest
89

@@ -123,3 +124,59 @@ def run(self):
123124
component_visits={"erroring_component": 0},
124125
)
125126
assert "Component name: 'erroring_component'" in str(exc_info.value)
127+
128+
def test_component_with_empty_dict_as_output_appears_in_results(self):
129+
"""Test that components that return an empty dict as output appear in results as an empty dict"""
130+
131+
@component
132+
class Producer:
133+
def __init__(self, prefix: str):
134+
self.prefix = prefix
135+
136+
@component.output_types(value=Optional[str])
137+
def run(self, text: Optional[str]):
138+
return {"value": f"{self.prefix}: {text}"}
139+
140+
@component
141+
class EmptyProcessor:
142+
@component.output_types()
143+
def run(self, sources: list[str]):
144+
# Returns empty dict when sources is empty
145+
return {}
146+
147+
@component
148+
class Combiner:
149+
@component.output_types(combined=str)
150+
def run(self, input_a: Optional[str], input_b: Optional[str]):
151+
if input_a is None:
152+
input_a = ""
153+
if input_b is None:
154+
input_b = ""
155+
return {"combined": f"{input_a} | {input_b}"}
156+
157+
pp = Pipeline()
158+
pp.add_component("producer_a", Producer("A"))
159+
pp.add_component("producer_b", Producer("B"))
160+
pp.add_component("empty_processor", EmptyProcessor())
161+
pp.add_component("combiner", Combiner())
162+
163+
pp.connect("producer_a.value", "combiner.input_a")
164+
pp.connect("producer_b.value", "combiner.input_b")
165+
166+
result = pp.run(
167+
{"producer_a": {"text": "hello"}, "producer_b": {"text": "world"}, "empty_processor": {"sources": []}},
168+
include_outputs_from={"producer_a", "empty_processor", "combiner"},
169+
)
170+
171+
# Producer A should appear in results because it's in include_outputs_from
172+
assert "producer_a" in result
173+
assert result["producer_a"] == {"value": "A: hello"}
174+
# Producer B should NOT appear since it's not in include_outputs_from
175+
assert "producer_b" not in result
176+
# Combiner should appear in results
177+
assert "combiner" in result
178+
assert result["combiner"] == {"combined": "A: hello | B: world"}
179+
# Empty processor should appear in results even though it returns an empty dict
180+
# because it's in include_outputs_from
181+
assert "empty_processor" in result
182+
assert result["empty_processor"] == {}

0 commit comments

Comments
 (0)