Skip to content

Commit 61bd518

Browse files
committed
parallelWorkflow() works
Signed-off-by: Dmitrii Tikhomirov <[email protected]>
1 parent 9d6980e commit 61bd518

File tree

5 files changed

+96
-73
lines changed

5 files changed

+96
-73
lines changed

experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,7 @@ public WorkflowModel fromAny(WorkflowModel prev, Object obj) {
6060

6161
@Override
6262
public WorkflowModel combine(Map<String, WorkflowModel> workflowVariables) {
63-
// TODO: create a new agenticScope object in the AgenticScopeRegistryAssessor per branch
64-
// TODO: Since we share the same agenticScope object, both branches are updating the same
65-
// instance, so for now we return the first key.
66-
return workflowVariables.values().iterator().next();
63+
return newAgenticModel(workflowVariables);
6764
}
6865

6966
@Override

experimental/fluent/agentic-langchain4j/src/test/java/io/serverlessworkflow/fluent/agentic/langchain4j/WorkflowAgentsIT.java

Lines changed: 74 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static io.serverlessworkflow.fluent.agentic.langchain4j.Agents.CreativeWriter;
2323
import static io.serverlessworkflow.fluent.agentic.langchain4j.Agents.StyleEditor;
2424
import static io.serverlessworkflow.fluent.agentic.langchain4j.Models.BASE_MODEL;
25+
import static org.junit.jupiter.api.Assertions.assertEquals;
2526
import static org.junit.jupiter.api.Assertions.assertNotNull;
2627
import static org.junit.jupiter.api.Assertions.assertTrue;
2728
import static org.mockito.ArgumentMatchers.any;
@@ -32,14 +33,16 @@
3233
import dev.langchain4j.agentic.UntypedAgent;
3334
import dev.langchain4j.agentic.scope.AgenticScope;
3435
import dev.langchain4j.agentic.workflow.WorkflowAgentsBuilder;
36+
import io.serverlessworkflow.fluent.agentic.AgenticServices;
37+
import io.serverlessworkflow.fluent.agentic.AgentsUtils;
3538

39+
import java.util.ArrayList;
3640
import java.util.List;
3741
import java.util.Map;
3842
import java.util.function.Function;
3943
import java.util.function.Predicate;
44+
import java.util.stream.IntStream;
4045

41-
import io.serverlessworkflow.fluent.agentic.AgenticServices;
42-
import io.serverlessworkflow.fluent.agentic.AgentsUtils;
4346
import org.junit.jupiter.api.Test;
4447

4548
public class WorkflowAgentsIT {
@@ -49,38 +52,38 @@ void sequential_agents_tests() {
4952
WorkflowAgentsBuilder builder = new LC4JWorkflowBuilder();
5053

5154
CreativeWriter creativeWriter =
52-
spy(
53-
dev.langchain4j.agentic.AgenticServices.agentBuilder(CreativeWriter.class)
54-
.chatModel(BASE_MODEL)
55-
.outputName("story")
56-
.build());
55+
spy(
56+
dev.langchain4j.agentic.AgenticServices.agentBuilder(CreativeWriter.class)
57+
.chatModel(BASE_MODEL)
58+
.outputName("story")
59+
.build());
5760

5861
AudienceEditor audienceEditor =
59-
spy(
60-
dev.langchain4j.agentic.AgenticServices.agentBuilder(AudienceEditor.class)
61-
.chatModel(BASE_MODEL)
62-
.outputName("story")
63-
.build());
62+
spy(
63+
dev.langchain4j.agentic.AgenticServices.agentBuilder(AudienceEditor.class)
64+
.chatModel(BASE_MODEL)
65+
.outputName("story")
66+
.build());
6467

6568
StyleEditor styleEditor =
66-
spy(
67-
dev.langchain4j.agentic.AgenticServices.agentBuilder(StyleEditor.class)
68-
.chatModel(BASE_MODEL)
69-
.outputName("story")
70-
.build());
69+
spy(
70+
dev.langchain4j.agentic.AgenticServices.agentBuilder(StyleEditor.class)
71+
.chatModel(BASE_MODEL)
72+
.outputName("story")
73+
.build());
7174

7275
UntypedAgent novelCreator =
73-
builder
74-
.sequenceBuilder()
75-
.subAgents(creativeWriter, audienceEditor, styleEditor)
76-
.outputName("story")
77-
.build();
76+
builder
77+
.sequenceBuilder()
78+
.subAgents(creativeWriter, audienceEditor, styleEditor)
79+
.outputName("story")
80+
.build();
7881

7982
Map<String, Object> input =
80-
Map.of(
81-
"topic", "dragons and wizards",
82-
"style", "fantasy",
83-
"audience", "young adults");
83+
Map.of(
84+
"topic", "dragons and wizards",
85+
"style", "fantasy",
86+
"audience", "young adults");
8487

8588
String story = (String) novelCreator.invoke(input);
8689
System.out.println(story);
@@ -96,10 +99,10 @@ public void sequenceHelperTest() {
9699
var audienceEditor = AgentsUtils.newAudienceEditor();
97100
var styleEditor = AgentsUtils.newStyleEditor();
98101

99-
NovelCreator novelCreator = AgenticServices.of(NovelCreator.class)
100-
.flow(workflow("seqFlow")
101-
.sequence(creativeWriter, audienceEditor, styleEditor)
102-
).build();
102+
NovelCreator novelCreator =
103+
AgenticServices.of(NovelCreator.class)
104+
.flow(workflow("seqFlow").sequence(creativeWriter, audienceEditor, styleEditor))
105+
.build();
103106

104107
String story = novelCreator.createNovel("dragons and wizards", "young adults", "fantasy");
105108
assertNotNull(story);
@@ -110,12 +113,25 @@ public void parallelWorkflow() {
110113
var foodExpert = AgentsUtils.newFoodExpert();
111114
var movieExpert = AgentsUtils.newMovieExpert();
112115

113-
EveningPlannerAgent eveningPlannerAgent = AgenticServices.of(EveningPlannerAgent.class)
114-
.flow(workflow("parallelFlow")
115-
.parallel(foodExpert, movieExpert)
116-
).build();
116+
Function<Map<String, List<String>>, List<EveningPlan>> planEvening =
117+
input -> {
118+
List<String> movies = input.getOrDefault("findMovie", List.of());
119+
List<String> meals = input.getOrDefault("findMeal", List.of());
120+
int max = Math.min(movies.size(), meals.size());
121+
122+
return IntStream.range(0, max)
123+
.mapToObj(i -> new EveningPlan(movies.get(i), meals.get(i)))
124+
.toList();
125+
};
126+
127+
EveningPlannerAgent eveningPlannerAgent =
128+
AgenticServices.of(EveningPlannerAgent.class)
129+
.flow(workflow("parallelFlow").parallel(foodExpert, movieExpert)
130+
.outputAs(planEvening))
131+
.build();
117132
List<EveningPlan> result = eveningPlannerAgent.plan("romantic");
118-
assertTrue(result.size() > 0);
133+
System.out.println(result);
134+
assertEquals(3, result.size());
119135
}
120136

121137
@Test
@@ -126,11 +142,10 @@ public void loopTest() {
126142

127143
Predicate<AgenticScope> until = s -> s.readState("score", 0).doubleValue() >= 0.8;
128144

129-
130-
StyledWriter styledWriter = AgenticServices.of(StyledWriter.class)
131-
.flow(workflow("loopFlow").agent(creativeWriter)
132-
.loop(until, scorer, editor)
133-
).build();
145+
StyledWriter styledWriter =
146+
AgenticServices.of(StyledWriter.class)
147+
.flow(workflow("loopFlow").agent(creativeWriter).loop(until, scorer, editor))
148+
.build();
134149

135150
String story = styledWriter.writeStoryWithStyle("dragons and wizards", "fantasy");
136151
assertNotNull(story);
@@ -140,25 +155,26 @@ public void loopTest() {
140155
public void humanInTheLoop() {
141156
var astrologyAgent = AgentsUtils.newAstrologyAgent();
142157

143-
var askSign = new Function<Map<String, Object>, Map<String, Object>>() {
144-
@Override
145-
public Map<String, Object> apply(Map<String, Object> holder) {
146-
System.out.println("What's your star sign?");
147-
//var sign = System.console().readLine();
148-
holder.put("sign", "piscis");
149-
return holder;
150-
}
151-
};
152-
153-
String result = AgenticServices.of(HoroscopeAgent.class)
154-
.flow(workflow("humanInTheLoop")
155-
.tasks(tasks -> tasks.callFn(fn(askSign)))
156-
.agent(astrologyAgent))
157-
.build()
158-
.invoke("My name is Mario. What is my horoscope?");
158+
var askSign =
159+
new Function<Map<String, Object>, Map<String, Object>>() {
160+
@Override
161+
public Map<String, Object> apply(Map<String, Object> holder) {
162+
System.out.println("What's your star sign?");
163+
// var sign = System.console().readLine();
164+
holder.put("sign", "piscis");
165+
return holder;
166+
}
167+
};
168+
169+
String result =
170+
AgenticServices.of(HoroscopeAgent.class)
171+
.flow(
172+
workflow("humanInTheLoop")
173+
.tasks(tasks -> tasks.callFn(fn(askSign)))
174+
.agent(astrologyAgent))
175+
.build()
176+
.invoke("My name is Mario. What is my horoscope?");
159177

160178
assertNotNull(result);
161-
162179
}
163-
164180
}

experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentTaskItemListBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ public AgentTaskItemListBuilder parallel(String name, Object... agents) {
9292
List<AgentExecutor> execs = AgentAdapters.toExecutors(agents);
9393
for (int i = 0; i < execs.size(); i++) {
9494
AgentExecutor ex = execs.get(i);
95-
fork.branch(
96-
"branch-" + i + "-" + name,
95+
String agentName = ex.agentName() != null ? ex.agentName() : "branch-" + i + "-" + name;
96+
fork.branch(agentName,
9797
AgentAdapters.toFunction(ex),
9898
DefaultAgenticScope.class);
9999
}

experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgenticServices.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import dev.langchain4j.service.V;
2020
import io.serverlessworkflow.api.types.Workflow;
2121
import io.serverlessworkflow.impl.WorkflowApplication;
22-
2322
import java.lang.annotation.Annotation;
2423
import java.lang.reflect.InvocationHandler;
2524
import java.lang.reflect.Method;
@@ -48,12 +47,12 @@ public AgenticServices<T> flow(AgentWorkflowBuilder builder) {
4847
}
4948

5049
public T build() {
51-
Objects.requireNonNull(builder, "AgenticServices.flow(AgentWorkflowBuilder) must be called before build()");
50+
Objects.requireNonNull(
51+
builder, "AgenticServices.flow(AgentWorkflowBuilder) must be called before build()");
5252
Workflow workflow = builder.build();
5353
return AgenticServiceBuilder.create(agent, new AgentInvocationHandler(workflow));
5454
}
5555

56-
5756
private static class AgenticServiceBuilder {
5857

5958
@SuppressWarnings("unchecked")
@@ -63,7 +62,7 @@ public static <T> T create(Class<T> runner, InvocationHandler h) {
6362
}
6463

6564
ClassLoader cl = runner.getClassLoader();
66-
Class<?>[] ifaces = new Class<?>[]{runner};
65+
Class<?>[] ifaces = new Class<?>[] {runner};
6766
return (T) Proxy.newProxyInstance(cl, ifaces, h);
6867
}
6968
}
@@ -82,14 +81,15 @@ public Object invoke(Object proxy, Method method, Object[] args) {
8281
return switch (method.getName()) {
8382
case "toString" -> "AgentProxy(" + workflow.getDocument().getName() + ")";
8483
case "hashCode" -> System.identityHashCode(proxy);
85-
case "equals" -> proxy == args[0];
84+
case "equals" -> proxy == args[0];
8685
default -> throw new IllegalStateException("Unexpected Object method: " + method);
8786
};
8887
}
8988

9089
Agent agent = method.getAnnotation(Agent.class);
9190
if (agent == null) {
92-
throw new IllegalStateException("Method " + method.getName() + " is not annotated with @Agent");
91+
throw new IllegalStateException(
92+
"Method " + method.getName() + " is not annotated with @Agent");
9393
}
9494

9595
Annotation[][] annotations = method.getParameterAnnotations();
@@ -106,7 +106,12 @@ public Object invoke(Object proxy, Method method, Object[] args) {
106106
}
107107
}
108108
if (!found) {
109-
throw new IllegalStateException("Parameter " + (i + 1) + " of method " + method.getName() + " is not annotated with @V");
109+
throw new IllegalStateException(
110+
"Parameter "
111+
+ (i + 1)
112+
+ " of method "
113+
+ method.getName()
114+
+ " is not annotated with @V");
110115
}
111116
}
112117

@@ -117,5 +122,4 @@ public Object invoke(Object proxy, Method method, Object[] args) {
117122
}
118123
}
119124
}
120-
121125
}

fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseWorkflowBuilder.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,12 @@
1919
import io.serverlessworkflow.api.types.Export;
2020
import io.serverlessworkflow.api.types.Input;
2121
import io.serverlessworkflow.api.types.Output;
22+
import io.serverlessworkflow.api.types.TaskItem;
2223
import io.serverlessworkflow.api.types.Workflow;
2324
import io.serverlessworkflow.fluent.spec.spi.TransformationHandlers;
25+
import java.util.ArrayList;
26+
import java.util.Collections;
27+
import java.util.List;
2428
import java.util.UUID;
2529
import java.util.function.Consumer;
2630

@@ -90,7 +94,9 @@ public SELF tasks(Consumer<DBuilder> doTaskConsumer) {
9094
if (this.workflow.getDo() == null) {
9195
this.workflow.setDo(doTaskBuilder.build().getDo());
9296
} else {
93-
this.workflow.getDo().addAll(doTaskBuilder.build().getDo());
97+
List<TaskItem> existingTasks = new ArrayList<>(this.workflow.getDo());
98+
existingTasks.addAll(doTaskBuilder.build().getDo());
99+
this.workflow.setDo(Collections.unmodifiableList(existingTasks));
94100
}
95101
return self();
96102
}

0 commit comments

Comments
 (0)