Skip to content

Commit 4b61a7c

Browse files
authored
Update aggregator (#20)
* update aggregator, gate logic and sequential usage * updated the diagram
1 parent 06a200c commit 4b61a7c

File tree

12 files changed

+57
-96
lines changed

12 files changed

+57
-96
lines changed

examples/complex_llm_workflow/diagram.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ agent3[Agent3]
1010
agent4[Agent4]
1111
in --> agent1
1212
agent1 --> gate
13-
gate -->|success| agent2
14-
gate -->|success| agent3
15-
gate -->|success| agent4
16-
gate -->|default| out
13+
gate -->|failure| out
14+
gate --> agent2
15+
gate --> agent3
16+
gate --> agent4
1717
agent2 --> parallel_workflow_aggregator
1818
parallel_workflow_aggregator --> out
1919
agent3 --> parallel_workflow_aggregator

examples/complex_llm_workflow/generator.rb

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,16 +85,21 @@ def tools
8585
steps: [llm2, llm3, llm4]
8686
)
8787

88+
error_workflow = Mars::Workflows::Sequential.new(
89+
"Error workflow",
90+
steps: []
91+
)
92+
8893
gate = Mars::Gate.new(
89-
condition: ->(input) { input.split.length < 10 ? :success : :error },
94+
condition: ->(input) { input.split.length < 10 ? :success : :failure },
9095
branches: {
91-
success: parallel_workflow
96+
failure: error_workflow
9297
}
9398
)
9499

95100
sequential_workflow = Mars::Workflows::Sequential.new(
96101
"Sequential workflow",
97-
steps: [llm1, gate]
102+
steps: [llm1, gate, parallel_workflow]
98103
)
99104

100105
# Generate and save the diagram

examples/complex_workflow/diagram.md

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,23 @@ in((In))
44
out((Out))
55
agent1[Agent1]
66
gate{Gate}
7-
parallel_workflow_2_aggregator[Parallel workflow 2 Aggregator]
87
agent4[Agent4]
98
parallel_workflow_aggregator[Parallel workflow Aggregator]
109
agent2[Agent2]
1110
agent3[Agent3]
11+
parallel_workflow_2_aggregator[Parallel workflow 2 Aggregator]
1212
agent5[Agent5]
1313
in --> agent1
1414
agent1 --> gate
15-
gate -->|success| agent4
16-
gate -->|success| agent5
1715
gate -->|warning| agent4
1816
gate -->|error| agent2
1917
gate -->|error| agent3
20-
gate -->|default| out
18+
gate --> agent4
19+
gate --> agent5
2120
agent4 --> agent2
2221
agent4 --> agent3
2322
agent2 --> parallel_workflow_aggregator
2423
parallel_workflow_aggregator --> parallel_workflow_2_aggregator
25-
parallel_workflow_aggregator --> out
2624
agent3 --> parallel_workflow_aggregator
2725
parallel_workflow_2_aggregator --> out
2826
agent5 --> parallel_workflow_2_aggregator

examples/complex_workflow/generator.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ class Agent5 < Mars::Agent
4848
gate = Mars::Gate.new(
4949
condition: ->(input) { input[:result] },
5050
branches: {
51-
success: parallel_workflow2,
5251
warning: sequential_workflow,
5352
error: parallel_workflow
5453
}
@@ -57,7 +56,7 @@ class Agent5 < Mars::Agent
5756
# Create the main workflow: LLM 1 -> Gate
5857
main_workflow = Mars::Workflows::Sequential.new(
5958
"Main Pipeline",
60-
steps: [llm1, gate]
59+
steps: [llm1, gate, parallel_workflow2]
6160
)
6261

6362
# Generate and save the diagram

lib/mars/aggregator.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ def initialize(name = "Aggregator", operation: nil, **kwargs)
88
super(**kwargs)
99

1010
@name = name
11-
@operation = operation || ->(inputs) { inputs.join("\n") }
11+
@operation = operation || ->(inputs) { inputs }
1212
end
1313

1414
def run(inputs)

lib/mars/gate.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def initialize(name = "Gate", condition:, branches:, **kwargs)
1515
def run(input)
1616
result = condition.call(input)
1717

18-
branches[result]&.run(input) || input
18+
branches[result] || input
1919
end
2020

2121
private

lib/mars/rendering/graph/gate.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@ def to_graph(builder, parent_id: nil, value: nil)
1414
branch.to_graph(builder, parent_id: node_id, value: condition_result)
1515
end
1616

17-
builder.add_edge(node_id, "out", "default")
18-
1917
sink_nodes.flatten
2018
end
2119
end

lib/mars/rendering/graph/sequential_workflow.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ def to_graph(builder, parent_id: nil, value: nil)
1414
parent_id = step.node_id
1515
end
1616

17+
builder.add_edge(parent_id, "out", value) if sink_nodes.empty?
18+
1719
sink_nodes.flatten
1820
end
1921
end

lib/mars/workflows/sequential.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,14 @@ def initialize(name, steps:, **kwargs)
1414

1515
def run(input)
1616
@steps.each do |step|
17-
input = step.run(input)
17+
result = step.run(input)
18+
19+
if result.is_a?(Runnable)
20+
input = result.run(input)
21+
break
22+
else
23+
input = result
24+
end
1825
end
1926

2027
input

spec/mars/aggregator_spec.rb

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,9 @@
55
context "when called without a block" do
66
let(:aggregator) { described_class.new }
77

8-
it "joins inputs with newlines" do
9-
inputs = %w[first second third]
10-
result = aggregator.run(inputs)
11-
expect(result).to eq("first\nsecond\nthird")
12-
end
13-
14-
it "handles empty array" do
15-
result = aggregator.run([])
16-
expect(result).to eq("")
8+
it "returns the input as is" do
9+
result = aggregator.run([1, 2, 3])
10+
expect(result).to eq([1, 2, 3])
1711
end
1812
end
1913

0 commit comments

Comments
 (0)