Skip to content

Commit d596acb

Browse files
committed
Added forkFn support and parallel
Signed-off-by: Ricardo Zanini <[email protected]>
1 parent 83deef1 commit d596acb

File tree

10 files changed

+169
-5
lines changed

10 files changed

+169
-5
lines changed

fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentDoTaskFluent.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,10 @@ default SELF sequence(Object... agents) {
3737
default SELF loop(Consumer<LoopAgentsBuilder> builder) {
3838
return loop("loop-" + UUID.randomUUID(), builder);
3939
}
40+
41+
SELF parallel(String name, Object... agents);
42+
43+
default SELF parallel(Object... agents) {
44+
return parallel("par-" + UUID.randomUUID(), agents);
45+
}
4046
}

fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentTaskItemListBuilder.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
*/
1616
package io.serverlessworkflow.fluent.agentic;
1717

18+
import dev.langchain4j.agentic.internal.AgentExecutor;
1819
import io.serverlessworkflow.api.types.Task;
1920
import io.serverlessworkflow.api.types.TaskItem;
2021
import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder;
2122
import io.serverlessworkflow.fluent.spec.BaseTaskItemListBuilder;
23+
import java.util.List;
2224
import java.util.function.Consumer;
2325

2426
public class AgentTaskItemListBuilder extends BaseTaskItemListBuilder<AgentTaskItemListBuilder>
@@ -42,7 +44,7 @@ public AgentTaskItemListBuilder agent(String name, Object agent) {
4244
.forEach(
4345
exec ->
4446
this.funcDelegate.callFn(name, fn -> fn.function(AgentAdapters.toFunction(exec))));
45-
return this;
47+
return self();
4648
}
4749

4850
@Override
@@ -61,6 +63,20 @@ public AgentTaskItemListBuilder loop(String name, Consumer<LoopAgentsBuilder> co
6163
return self();
6264
}
6365

66+
@Override
67+
public AgentTaskItemListBuilder parallel(String name, Object... agents) {
68+
this.funcDelegate.forkFn(
69+
name,
70+
fork -> {
71+
List<AgentExecutor> execs = AgentAdapters.toExecutors(agents);
72+
for (int i = 0; i < execs.size(); i++) {
73+
AgentExecutor ex = execs.get(i);
74+
fork.branch("branch-" + i + "-" + name, AgentAdapters.toFunction(ex));
75+
}
76+
});
77+
return self();
78+
}
79+
6480
@Override
6581
public AgentTaskItemListBuilder self() {
6682
return this;

fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/DelegatingAgentDoTaskFluent.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,16 @@ default SELF sequence(String name, Object... agents) {
4242
}
4343

4444
@SuppressWarnings("unchecked")
45+
@Override
4546
default SELF loop(String name, Consumer<LoopAgentsBuilder> consumer) {
4647
d().loop(name, consumer);
4748
return (SELF) this;
4849
}
50+
51+
@SuppressWarnings("unchecked")
52+
@Override
53+
default SELF parallel(String name, Object... agents) {
54+
d().parallel(name, agents);
55+
return (SELF) this;
56+
}
4957
}

fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/AgentTaskItemListBuilderTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,12 @@ void testSequence() {
6161
assertThat(items.get(1).getName()).isEqualTo("seq-1");
6262
assertThat(items.get(2).getName()).isEqualTo("seq-2");
6363

64-
// All must be call tasks
64+
// All must be call branche
6565
items.forEach(it -> assertThat(it.getTask().getCallTask().get()).isNotNull());
6666
}
6767

6868
@Test
69-
@DisplayName("loop(name, builder) produces a ForTaskFunction with inner call tasks")
69+
@DisplayName("loop(name, builder) produces a ForTaskFunction with inner call branche")
7070
void testLoop() {
7171
AgentTaskItemListBuilder b = new AgentTaskItemListBuilder();
7272
Agents.MovieExpert scorer = AgentsUtils.newMovieExpert();

fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/AgenticWorkflowBuilderTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,15 @@
2525

2626
import dev.langchain4j.agentic.AgentServices;
2727
import dev.langchain4j.agentic.Cognisphere;
28+
import io.serverlessworkflow.api.types.ForkTask;
2829
import io.serverlessworkflow.api.types.Task;
2930
import io.serverlessworkflow.api.types.TaskItem;
3031
import io.serverlessworkflow.api.types.Workflow;
3132
import io.serverlessworkflow.api.types.func.CallJava;
3233
import io.serverlessworkflow.api.types.func.ForTaskFunction;
3334
import java.util.concurrent.atomic.AtomicInteger;
3435
import java.util.function.Predicate;
36+
import org.junit.jupiter.api.DisplayName;
3537
import org.junit.jupiter.api.Test;
3638

3739
public class AgenticWorkflowBuilderTest {
@@ -142,4 +144,32 @@ void loopWithMaxIterationsAndExitCondition() {
142144
assertNotNull(fn.getWhilePredicate(), "While predicate set from exitCondition");
143145
assertThat(fn.getDo()).hasSize(1);
144146
}
147+
148+
@Test
149+
@DisplayName("parallel() creates one ForkTask with N callFn branches")
150+
void parallelAgents() {
151+
Agents.MovieExpert a1 = AgentsUtils.newMovieExpert();
152+
Agents.MovieExpert a2 = AgentsUtils.newMovieExpert();
153+
Agents.MovieExpert a3 = AgentsUtils.newMovieExpert();
154+
155+
Workflow wf =
156+
AgenticWorkflowBuilder.workflow("parallelFlow")
157+
.tasks(d -> d.parallel("p", a1, a2, a3))
158+
.build();
159+
160+
assertThat(wf.getDo()).hasSize(1);
161+
TaskItem top = wf.getDo().get(0);
162+
Task task = top.getTask();
163+
assertThat(task.getForkTask()).isInstanceOf(ForkTask.class);
164+
165+
ForkTask fork = task.getForkTask();
166+
assertThat(fork.getFork().getBranches()).hasSize(3);
167+
168+
fork.getFork()
169+
.getBranches()
170+
.forEach(
171+
branch -> {
172+
assertThat(branch.getTask().getCallTask().get()).isInstanceOf(CallJava.class);
173+
});
174+
}
145175
}

fluent/func/src/main/java/io/serverlessworkflow/fluent/func/DelegatingFuncDoTaskFluent.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,16 @@ default SELF switchFn(Consumer<FuncSwitchTaskBuilder> cfg) {
7171
d().switchFn(cfg);
7272
return self();
7373
}
74+
75+
@Override
76+
default SELF forkFn(String name, Consumer<FuncForkTaskBuilder> cfg) {
77+
d().forkFn(name, cfg);
78+
return self();
79+
}
80+
81+
@Override
82+
default SELF forkFn(Consumer<FuncForkTaskBuilder> cfg) {
83+
d().forkFn(cfg);
84+
return self();
85+
}
7486
}

fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskFluent.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,8 @@ public interface FuncDoTaskFluent<SELF extends FuncDoTaskFluent<SELF>> {
3030
SELF switchFn(String name, Consumer<FuncSwitchTaskBuilder> cfg);
3131

3232
SELF switchFn(Consumer<FuncSwitchTaskBuilder> cfg);
33+
34+
SELF forkFn(String name, Consumer<FuncForkTaskBuilder> cfg);
35+
36+
SELF forkFn(Consumer<FuncForkTaskBuilder> cfg);
3337
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.fluent.func;
17+
18+
import io.serverlessworkflow.api.types.ForkTask;
19+
import io.serverlessworkflow.api.types.ForkTaskConfiguration;
20+
import io.serverlessworkflow.api.types.Task;
21+
import io.serverlessworkflow.api.types.TaskItem;
22+
import io.serverlessworkflow.api.types.func.CallJava;
23+
import io.serverlessworkflow.api.types.func.CallTaskJava;
24+
import io.serverlessworkflow.fluent.spec.TaskBaseBuilder;
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
import java.util.UUID;
28+
import java.util.function.Consumer;
29+
import java.util.function.Function;
30+
31+
public class FuncForkTaskBuilder extends TaskBaseBuilder<FuncForkTaskBuilder>
32+
implements FuncTransformations<FuncForkTaskBuilder> {
33+
34+
private final ForkTask forkTask;
35+
private final List<TaskItem> items;
36+
37+
FuncForkTaskBuilder() {
38+
this.forkTask = new ForkTask();
39+
this.forkTask.setFork(new ForkTaskConfiguration());
40+
this.items = new ArrayList<>();
41+
}
42+
43+
@Override
44+
protected FuncForkTaskBuilder self() {
45+
return this;
46+
}
47+
48+
public <T, V> FuncForkTaskBuilder branch(String name, Function<T, V> function) {
49+
this.items.add(
50+
new TaskItem(name, new Task().withCallTask(new CallTaskJava(CallJava.function(function)))));
51+
return this;
52+
}
53+
54+
public <T, V> FuncForkTaskBuilder branch(Function<T, V> function) {
55+
return this.branch(UUID.randomUUID().toString(), function);
56+
}
57+
58+
public FuncForkTaskBuilder branches(Consumer<FuncTaskItemListBuilder> consumer) {
59+
final FuncTaskItemListBuilder builder = new FuncTaskItemListBuilder();
60+
consumer.accept(builder);
61+
this.items.addAll(builder.build());
62+
return this;
63+
}
64+
65+
public FuncForkTaskBuilder compete(boolean compete) {
66+
this.forkTask.getFork().setCompete(compete);
67+
return this;
68+
}
69+
70+
public ForkTask build() {
71+
this.forkTask.getFork().setBranches(this.items);
72+
return forkTask;
73+
}
74+
}

fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,18 @@ public FuncTaskItemListBuilder switchFn(String name, Consumer<FuncSwitchTaskBuil
8282
public FuncTaskItemListBuilder switchFn(Consumer<FuncSwitchTaskBuilder> consumer) {
8383
return this.switchFn(UUID.randomUUID().toString(), consumer);
8484
}
85+
86+
@Override
87+
public FuncTaskItemListBuilder forkFn(Consumer<FuncForkTaskBuilder> cfg) {
88+
return this.forkFn(UUID.randomUUID().toString(), cfg);
89+
}
90+
91+
@Override
92+
public FuncTaskItemListBuilder forkFn(String name, Consumer<FuncForkTaskBuilder> cfg) {
93+
this.requireNameAndConfig(name, cfg);
94+
final FuncForkTaskBuilder forkTaskJavaBuilder = new FuncForkTaskBuilder();
95+
cfg.accept(forkTaskJavaBuilder);
96+
return this.addTaskItem(
97+
new TaskItem(name, new Task().withForkTask(forkTaskJavaBuilder.build())));
98+
}
8599
}

fluent/func/src/test/java/io/serverlessworkflow/fluent/func/JavaWorkflowBuilderTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ void testJavaFunctionalIO() {
165165
ForTaskFunction fn = (ForTaskFunction) forTaskFnHolder.getForTask();
166166
assertNotNull(fn);
167167

168-
// Inspect nested tasks inside the function loop
168+
// Inspect nested branche inside the function loop
169169
List<TaskItem> nested = fn.getDo();
170170
assertEquals(1, nested.size());
171171
TaskBase nestedTask = nested.get(0).getTask().getSetTask();
@@ -219,7 +219,7 @@ void testCallJavaTask() {
219219
}
220220

221221
@Test
222-
@DisplayName("switchCaseFn (Java variant) coexists with spec tasks")
222+
@DisplayName("switchCaseFn (Java variant) coexists with spec branche")
223223
void testSwitchCaseJava() {
224224
Workflow wf =
225225
FuncWorkflowBuilder.workflow("switchJava")

0 commit comments

Comments
 (0)