-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Closed
Labels
Description
Question
Description
When chaining multiple join operations using reduce_list_extend, the final collect join node loses synchronization — it returns only the last processed element instead of aggregating all values.
from dataclasses import dataclass
from pydantic_graph.beta import GraphBuilder, StepContext
from pydantic_graph.beta.join import reduce_list_extend
import asyncio
@dataclass
class SimpleState:
pass
async def main():
g = GraphBuilder(state_type=SimpleState, output_type=list[int])
@g.step
async def source(ctx: StepContext[SimpleState, None, None]) -> int:
return 10
@g.step
async def add_one(ctx: StepContext[SimpleState, None, int]) -> list[int]:
return [ctx.inputs + 1]
@g.step
async def add_two(ctx: StepContext[SimpleState, None, int]) -> list[int]:
return [ctx.inputs + 2]
@g.step
async def add_three(ctx: StepContext[SimpleState, None, int]) -> list[int]:
return [ctx.inputs + 3]
collect = g.join(reduce_list_extend, initial_factory=list[int])
mediator = g.join(reduce_list_extend, initial_factory=list[int])
# Broadcasting: send the value from source to all three steps
g.add(
g.edge_from(g.start_node).to(source),
g.edge_from(source).to(add_one, add_two, add_three),
g.edge_from(add_one, add_two).to(mediator),
g.edge_from(mediator).to(collect),
g.edge_from(add_three).to(collect),
g.edge_from(collect).to(g.end_node),
)
graph = g.build()
result = await graph.run(state=SimpleState())
print(sorted(result)) # Expected: [11, 12, 13]
if __name__ == "__main__":
asyncio.run(main())Expected Output
[11, 12, 13]
Actual Output
[13]
Additional Context
-
The issue occurs when connecting a downstream join (collect) to an intermediate join (mediator).
-
It seems that only the last-emitted branch result reaches collect, while earlier parallel outputs are lost.
-
Removing the mediator join restores the expected behavior.
Possible cause:
Synchronization state between multiple join nodes may not propagate correctly when one join’s output feeds another join that shares sibling paths.