1111logger = logging .getLogger (__name__ )
1212
1313
14- @component ()
14+ @component
1515class BranchJoiner :
1616 """
17- A component to join different branches of a pipeline into one single output.
17+ A component that merges multiple input branches of a pipeline into a single output stream .
1818
19- `BranchJoiner` receives multiple data connections of the same type from other components and passes the first
20- value coming to its single output, possibly distributing it to various other components .
19+ `BranchJoiner` receives multiple inputs of the same data type and forwards the first received value
20+ to its output. This is useful for scenarios where multiple branches need to converge before proceeding .
2121
22- `BranchJoiner` is fundamental to close loops in a pipeline, where the two branches it joins are the ones
23- coming from the previous component and one coming back from a loop. For example, `BranchJoiner` could be used
24- to send data to a component evaluating errors. `BranchJoiner` would receive two connections, one to get the
25- original data and another one to get modified data in case there was an error. In both cases, `BranchJoiner`
26- would send (or re-send in case of a loop) data to the component evaluating errors. See "Usage example" below.
22+ ### Common Use Cases:
23+ - **Loop Handling:** `BranchJoiner` helps close loops in pipelines. For example, if a pipeline component validates
24+ or modifies incoming data and produces an error-handling branch, `BranchJoiner` can merge both branches and send
25+ (or resend in the case of a loop) the data to the component that evaluates errors. See "Usage example" below.
2726
28- Another use case with a need for `BranchJoiner` is to reconcile multiple branches coming out of a decision
29- or Classifier component. For example, in a RAG pipeline, there might be a "query language classifier" component
30- sending the query to different retrievers, selecting one specifically according to the detected language. After the
31- retrieval step the pipeline would ideally continue with a `PromptBuilder`, and since we don't know in advance the
32- language of the query, all the retrievers should be ideally connected to the single `PromptBuilder`. Since the
33- `PromptBuilder` won't accept more than one connection in input, we would connect all the retrievers to a
34- `BranchJoiner` component and reconcile them in a single output that can be connected to the `PromptBuilder`
35- downstream.
36-
37- Usage example:
27+ - **Decision-Based Merging:** `BranchJoiner` reconciles branches coming from Router components (such as
28+ `ConditionalRouter`, `TextLanguageRouter`). Suppose a `TextLanguageRouter` directs user queries to different
29+ Retrievers based on the detected language. Each Retriever processes its assigned query and passes the results
30+ to `BranchJoiner`, which consolidates them into a single output before passing them to the next component, such
31+ as a `PromptBuilder`.
3832
33+ ### Example Usage:
3934 ```python
4035 import json
4136 from typing import List
@@ -47,6 +42,7 @@ class BranchJoiner:
4742 from haystack.components.validators import JsonSchemaValidator
4843 from haystack.dataclasses import ChatMessage
4944
45+ # Define a schema for validation
5046 person_schema = {
5147 "type": "object",
5248 "properties": {
@@ -62,17 +58,21 @@ class BranchJoiner:
6258
6359 # Add components to the pipeline
6460 pipe.add_component('joiner', BranchJoiner(List[ChatMessage]))
65- pipe.add_component('fc_llm ', OpenAIChatGenerator(model="gpt-4o-mini"))
61+ pipe.add_component('generator ', OpenAIChatGenerator(model="gpt-4o-mini"))
6662 pipe.add_component('validator', JsonSchemaValidator(json_schema=person_schema))
67- pipe.add_component('adapter', OutputAdapter("{{chat_message}}", List[ChatMessage])),
63+ pipe.add_component('adapter', OutputAdapter("{{chat_message}}", List[ChatMessage]))
64+
6865 # And connect them
6966 pipe.connect("adapter", "joiner")
70- pipe.connect("joiner", "fc_llm ")
71- pipe.connect("fc_llm .replies", "validator.messages")
67+ pipe.connect("joiner", "generator ")
68+ pipe.connect("generator .replies", "validator.messages")
7269 pipe.connect("validator.validation_error", "joiner")
7370
74- result = pipe.run(data={"fc_llm": {"generation_kwargs": {"response_format": {"type": "json_object"}}},
75- "adapter": {"chat_message": [ChatMessage.from_user("Create json from Peter Parker")]}})
71+ result = pipe.run(
72+ data={
73+ "generator": {"generation_kwargs": {"response_format": {"type": "json_object"}}},
74+ "adapter": {"chat_message": [ChatMessage.from_user("Create json from Peter Parker")]}}
75+ )
7676
7777 print(json.loads(result["validator"]["validated"][0].content))
7878
@@ -87,25 +87,23 @@ class BranchJoiner:
8787
8888 In the code example, `BranchJoiner` receives a looped back `List[ChatMessage]` from the `JsonSchemaValidator` and
8989 sends it down to the `OpenAIChatGenerator` for re-generation. We can have multiple loopback connections in the
90- pipeline. In this instance, the downstream component is only one (the `OpenAIChatGenerator`), but the pipeline might
90+ pipeline. In this instance, the downstream component is only one (the `OpenAIChatGenerator`), but the pipeline could
9191 have more than one downstream component.
9292 """
9393
9494 def __init__ (self , type_ : Type ):
9595 """
96- Create a `BranchJoiner` component.
96+ Creates a `BranchJoiner` component.
9797
98- :param type_: The type of data that the `BranchJoiner` will receive from the upstream connected components and
99- distribute to the downstream connected components.
98+ :param type_: The expected data type of inputs and outputs.
10099 """
101100 self .type_ = type_
102- # type_'s type can't be determined statically
103101 component .set_input_types (self , value = GreedyVariadic [type_ ]) # type: ignore
104102 component .set_output_types (self , value = type_ )
105103
106- def to_dict (self ):
104+ def to_dict (self ) -> Dict [ str , Any ] :
107105 """
108- Serializes the component to a dictionary.
106+ Serializes the component into a dictionary.
109107
110108 :returns:
111109 Dictionary with serialized data.
@@ -115,26 +113,22 @@ def to_dict(self):
115113 @classmethod
116114 def from_dict (cls , data : Dict [str , Any ]) -> "BranchJoiner" :
117115 """
118- Deserializes the component from a dictionary.
116+ Deserializes a `BranchJoiner` instance from a dictionary.
119117
120- :param data:
121- Dictionary to deserialize from.
118+ :param data: The dictionary containing serialized component data.
122119 :returns:
123- Deserialized component .
120+ A deserialized `BranchJoiner` instance .
124121 """
125122 data ["init_parameters" ]["type_" ] = deserialize_type (data ["init_parameters" ]["type_" ])
126123 return default_from_dict (cls , data )
127124
128- def run (self , ** kwargs ):
125+ def run (self , ** kwargs ) -> Dict [ str , Any ] :
129126 """
130- The run method of the `BranchJoiner` component .
127+ Executes the `BranchJoiner`, selecting the first available input value and passing it downstream .
131128
132- Multiplexes the input data from the upstream connected components and distributes it to the downstream connected
133- components.
134-
135- :param **kwargs: The input data. Must be of the type declared in `__init__`.
136- :return: A dictionary with the following keys:
137- - `value`: The input data.
129+ :param **kwargs: The input data. Must be of the type declared by `type_` during initialization.
130+ :returns:
131+ A dictionary with a single key `value`, containing the first input received.
138132 """
139133 if (inputs_count := len (kwargs ["value" ])) != 1 :
140134 raise ValueError (f"BranchJoiner expects only one input, but { inputs_count } were received." )
0 commit comments