|
18 | 18 | import static io.serverlessworkflow.fluent.agentic.AgentWorkflowBuilder.workflow; |
19 | 19 | import static io.serverlessworkflow.fluent.agentic.dsl.AgenticDSL.conditional; |
20 | 20 | import static io.serverlessworkflow.fluent.agentic.dsl.AgenticDSL.doTasks; |
| 21 | +import static io.serverlessworkflow.fluent.agentic.dsl.AgenticDSL.fn; |
21 | 22 | import static io.serverlessworkflow.fluent.agentic.dsl.AgenticDSL.loop; |
22 | 23 | import static org.assertj.core.api.Assertions.assertThat; |
23 | 24 | import static org.junit.jupiter.api.Assertions.assertEquals; |
|
30 | 31 | import io.serverlessworkflow.api.types.func.CallTaskJava; |
31 | 32 | import io.serverlessworkflow.api.types.func.ForTaskFunction; |
32 | 33 | import io.serverlessworkflow.impl.WorkflowApplication; |
| 34 | +import java.util.ArrayList; |
33 | 35 | import java.util.List; |
34 | 36 | import java.util.Map; |
35 | 37 | import java.util.concurrent.atomic.AtomicReference; |
@@ -147,13 +149,49 @@ public void loopWorkflowWithMaxIterations() { |
147 | 149 | assertThat(result).containsKey("story"); |
148 | 150 | } |
149 | 151 |
|
| 152 | + public record EveningPlan(String movie, String meal) {} |
| 153 | + |
150 | 154 | @Test |
151 | 155 | @DisplayName("Parallel agents via DSL.parallel(...)") |
152 | 156 | public void parallelWorkflow() { |
153 | 157 | var foodExpert = AgentsUtils.newFoodExpert(); |
154 | 158 | var movieExpert = AgentsUtils.newMovieExpert(); |
155 | 159 |
|
156 | | - Workflow wf = workflow("forkFlow").parallel("fanout", foodExpert, movieExpert).build(); |
| 160 | + workflow("forkFlow") |
| 161 | + .tasks( |
| 162 | + d -> |
| 163 | + d.parallel(foodExpert, movieExpert) |
| 164 | + .callFn( |
| 165 | + fn( |
| 166 | + f -> { |
| 167 | + Map<String, List<String>> asMap = (Map<String, List<String>>) f; |
| 168 | + List<EveningPlan> result = new ArrayList<>(); |
| 169 | + int max = |
| 170 | + asMap.values().stream() |
| 171 | + .map(List::size) |
| 172 | + .min(Integer::compareTo) |
| 173 | + .orElse(0); |
| 174 | + for (int i = 0; i < max; i++) { |
| 175 | + result.add( |
| 176 | + new EveningPlan( |
| 177 | + asMap.get("movies").get(i), asMap.get("meals").get(i))); |
| 178 | + } |
| 179 | + return result; |
| 180 | + }))) |
| 181 | + .build(); |
| 182 | + |
| 183 | + Workflow wf = workflow("forkFlow") |
| 184 | + .tasks(d -> d |
| 185 | + .parallel("fanout", foodExpert, movieExpert) |
| 186 | + .callFn(fn((Map<String, List<String>> m) -> { |
| 187 | + var movies = m.getOrDefault("movies", List.of()); |
| 188 | + var meals = m.getOrDefault("meals", List.of()); |
| 189 | + return java.util.stream.IntStream |
| 190 | + .range(0, Math.min(movies.size(), meals.size())) |
| 191 | + .mapToObj(i -> new EveningPlan(movies.get(i), meals.get(i))) |
| 192 | + .toList(); |
| 193 | + })) |
| 194 | + ).build(); |
157 | 195 |
|
158 | 196 | List<TaskItem> items = wf.getDo(); |
159 | 197 | assertThat(items).hasSize(1); |
|
0 commit comments