diff --git a/experimental/fluent/agentic-langchain4j/src/main/java/io/serverlessworkflow/fluent/agentic/langchain4j/ConditionalAgentServiceImpl.java b/experimental/fluent/agentic-langchain4j/src/main/java/io/serverlessworkflow/fluent/agentic/langchain4j/ConditionalAgentServiceImpl.java index 0aff4aaf..a16501cb 100644 --- a/experimental/fluent/agentic-langchain4j/src/main/java/io/serverlessworkflow/fluent/agentic/langchain4j/ConditionalAgentServiceImpl.java +++ b/experimental/fluent/agentic-langchain4j/src/main/java/io/serverlessworkflow/fluent/agentic/langchain4j/ConditionalAgentServiceImpl.java @@ -20,8 +20,11 @@ import dev.langchain4j.agentic.internal.AgentExecutor; import dev.langchain4j.agentic.scope.AgenticScope; import dev.langchain4j.agentic.workflow.ConditionalAgentService; +import io.serverlessworkflow.fluent.agentic.AgenticScopedRequest; +import io.serverlessworkflow.fluent.agentic.AgenticScopedResponse; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Predicate; @@ -29,6 +32,11 @@ public class ConditionalAgentServiceImpl extends AbstractAgentService> implements ConditionalAgentService { + private final AtomicReference> beforeAgentInvocation = + new AtomicReference<>(); + private final AtomicReference> afterAgentInvocation = + new AtomicReference<>(); + private ConditionalAgentServiceImpl(Class agentServiceClass) { super(agentServiceClass); } @@ -39,7 +47,21 @@ public static ConditionalAgentService builder(Class agentServiceClass) @Override public ConditionalAgentService subAgents(Object... agents) { - this.workflowBuilder.tasks(t -> t.sequence(agents)); + this.workflowBuilder.tasks( + t -> + t.sequence(agents) + .inputFrom( + scope -> { + if (beforeAgentInvocation.get() != null) { + beforeAgentInvocation.get().accept((AgenticScopedRequest) scope); + } + }) + .outputAs( + scope -> { + if (afterAgentInvocation.get() != null) { + afterAgentInvocation.get().accept((AgenticScopedResponse) scope); + } + })); return this; } @@ -50,20 +72,43 @@ public ConditionalAgentService subAgents(List agentExecutors) @Override public ConditionalAgentService beforeAgentInvocation(Consumer consumer) { - throw new UnsupportedOperationException( - "Feature not implemented yet. See: https://github.com/serverlessworkflow/sdk-java/issues/836"); + beforeAgentInvocation.set( + agenticScopedRequest -> consumer.accept(agenticScopedRequest.asAgentRequest())); + return this; } @Override public ConditionalAgentService afterAgentInvocation(Consumer consumer) { - throw new UnsupportedOperationException( - "Feature not implemented yet. See: https://github.com/serverlessworkflow/sdk-java/issues/836"); + afterAgentInvocation.set( + agenticScopedResponse -> consumer.accept(agenticScopedResponse.asAgentResponse())); + return this; } @Override public ConditionalAgentService subAgents(Predicate condition, Object... agents) { this.workflowBuilder.tasks( - t -> Arrays.stream(agents).forEach(agent -> t.when(condition).agent(agent))); + t -> + Arrays.stream(agents) + .forEach( + agent -> + t.when(condition) + .agent(agent) + .inputFrom( + scope -> { + if (beforeAgentInvocation.get() != null) { + beforeAgentInvocation + .get() + .accept((AgenticScopedRequest) scope); + } + }) + .outputAs( + scope -> { + if (afterAgentInvocation.get() != null) { + afterAgentInvocation + .get() + .accept((AgenticScopedResponse) scope); + } + }))); return this; } @@ -76,7 +121,22 @@ public ConditionalAgentService subAgents( @Override public ConditionalAgentService subAgent( Predicate condition, AgentExecutor agentExecutor) { - this.workflowBuilder.tasks(t -> t.when(condition).agent(agentExecutor)); + this.workflowBuilder.tasks( + t -> + t.when(condition) + .agent(agentExecutor) + .inputFrom( + scope -> { + if (beforeAgentInvocation.get() != null) { + beforeAgentInvocation.get().accept((AgenticScopedRequest) scope); + } + }) + .outputAs( + scope -> { + if (afterAgentInvocation.get() != null) { + afterAgentInvocation.get().accept((AgenticScopedResponse) scope); + } + })); return this; } } diff --git a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentAdapters.java b/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentAdapters.java index d8a2d015..fc3fd336 100644 --- a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentAdapters.java +++ b/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentAdapters.java @@ -22,7 +22,9 @@ import dev.langchain4j.agentic.scope.DefaultAgenticScope; import io.serverlessworkflow.api.types.func.LoopPredicateIndex; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiPredicate; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -34,8 +36,26 @@ public static List toExecutors(Object... agents) { return agentsToExecutors(agents); } - public static Function toFunction(AgentExecutor exec) { - return exec::execute; + public static Function toFunction( + AgentExecutor exec, + AtomicReference> beforeAgentInvocation, + AtomicReference> afterAgentInvocation) { + String agentName = exec.agentInvoker().name(); + return defaultAgenticScope -> { + if (beforeAgentInvocation.get() != null) { + beforeAgentInvocation + .get() + .accept(new AgenticScopedRequest(defaultAgenticScope, agentName)); + } + Object result = exec.execute(defaultAgenticScope); + if (afterAgentInvocation.get() != null) { + defaultAgenticScope.writeState("input", result); + afterAgentInvocation + .get() + .accept(new AgenticScopedResponse(defaultAgenticScope, agentName, result)); + } + return result; + }; } public static LoopPredicateIndex toWhile(Predicate exit) { diff --git a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentDoTaskBuilder.java b/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentDoTaskBuilder.java index 4d7c42a2..5bd70bdb 100644 --- a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentDoTaskBuilder.java +++ b/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentDoTaskBuilder.java @@ -15,6 +15,7 @@ */ package io.serverlessworkflow.fluent.agentic; +import dev.langchain4j.agentic.scope.AgenticScope; import io.serverlessworkflow.fluent.agentic.spi.AgentDoFluent; import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder; import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder; @@ -117,4 +118,14 @@ public AgentDoTaskBuilder switchCase( this.listBuilder().switchCase(name, itemsConfigurer); return self(); } + + public AgentDoTaskBuilder inputFrom(Consumer inputFrom) { + this.listBuilder().inputFrom(inputFrom); + return self(); + } + + public AgentDoTaskBuilder outputAs(Consumer outputAs) { + this.listBuilder().outputAs(outputAs); + return self(); + } } diff --git a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentTaskItemListBuilder.java b/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentTaskItemListBuilder.java index d4820312..89143044 100644 --- a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentTaskItemListBuilder.java +++ b/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentTaskItemListBuilder.java @@ -16,6 +16,7 @@ package io.serverlessworkflow.fluent.agentic; import dev.langchain4j.agentic.internal.AgentExecutor; +import dev.langchain4j.agentic.scope.AgenticScope; import dev.langchain4j.agentic.scope.DefaultAgenticScope; import io.serverlessworkflow.api.types.Task; import io.serverlessworkflow.api.types.TaskItem; @@ -29,12 +30,17 @@ import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder; import io.serverlessworkflow.fluent.spec.BaseTaskItemListBuilder; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; public class AgentTaskItemListBuilder extends BaseTaskItemListBuilder implements AgentDoFluent { private final FuncTaskItemListBuilder delegate; + private final AtomicReference> beforeAgentInvocation = + new AtomicReference<>(); + private final AtomicReference> afterAgentInvocation = + new AtomicReference<>(); public AgentTaskItemListBuilder() { super(); @@ -58,7 +64,11 @@ public AgentTaskItemListBuilder agent(String name, Object agent) { exec -> this.delegate.function( name, - fn -> fn.function(AgentAdapters.toFunction(exec), DefaultAgenticScope.class))); + fn -> + fn.function( + AgentAdapters.toFunction( + exec, beforeAgentInvocation, afterAgentInvocation), + DefaultAgenticScope.class))); return self(); } @@ -96,7 +106,10 @@ public AgentTaskItemListBuilder parallel(String name, Object... agents) { ex.agentInvoker().name() != null ? ex.agentInvoker().name() : "branch-" + i + "-" + name; - fork.branch(agentName, AgentAdapters.toFunction(ex), DefaultAgenticScope.class); + fork.branch( + agentName, + AgentAdapters.toFunction(ex, beforeAgentInvocation, afterAgentInvocation), + DefaultAgenticScope.class); } }); return self(); @@ -154,4 +167,14 @@ public AgentTaskItemListBuilder switchCase( this.delegate.switchCase(name, itemsConfigurer); return self(); } + + public AgentTaskItemListBuilder inputFrom(Consumer beforeAgentInvocation) { + this.beforeAgentInvocation.set(beforeAgentInvocation::accept); + return self(); + } + + public AgentTaskItemListBuilder outputAs(Consumer afterAgentInvocationConsumer) { + this.afterAgentInvocation.set(afterAgentInvocationConsumer::accept); + return self(); + } } diff --git a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgenticScopedRequest.java b/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgenticScopedRequest.java new file mode 100644 index 00000000..a5e45f8c --- /dev/null +++ b/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgenticScopedRequest.java @@ -0,0 +1,87 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.agentic; + +import dev.langchain4j.agentic.agent.AgentRequest; +import dev.langchain4j.agentic.internal.AgentInvocation; +import dev.langchain4j.agentic.scope.AgenticScope; +import java.util.List; +import java.util.Map; + +public class AgenticScopedRequest implements AgenticScope { + + protected final String agentName; + private final AgenticScope wrapped; + + AgenticScopedRequest(AgenticScope wrapped, String agentName) { + this.wrapped = wrapped; + this.agentName = agentName; + } + + @Override + public Object memoryId() { + return wrapped.memoryId(); + } + + @Override + public void writeState(String s, Object o) { + wrapped.writeState(s, o); + } + + @Override + public void writeStates(Map map) { + wrapped.writeStates(map); + } + + @Override + public boolean hasState(String s) { + return wrapped.hasState(s); + } + + @Override + public Object readState(String s) { + return wrapped.readState(s); + } + + @Override + public T readState(String s, T t) { + return wrapped.readState(s, t); + } + + @Override + public Map state() { + return wrapped.state(); + } + + @Override + public String contextAsConversation(String... strings) { + return wrapped.contextAsConversation(strings); + } + + @Override + public String contextAsConversation(Object... objects) { + return wrapped.contextAsConversation(objects); + } + + @Override + public List agentInvocations(String s) { + return wrapped.agentInvocations(s); + } + + public AgentRequest asAgentRequest() { + return new AgentRequest(this, agentName, state()); + } +} diff --git a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgenticScopedResponse.java b/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgenticScopedResponse.java new file mode 100644 index 00000000..a9b78d5c --- /dev/null +++ b/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgenticScopedResponse.java @@ -0,0 +1,33 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.agentic; + +import dev.langchain4j.agentic.agent.AgentResponse; +import dev.langchain4j.agentic.scope.AgenticScope; + +public class AgenticScopedResponse extends AgenticScopedRequest { + + private final Object response; + + AgenticScopedResponse(AgenticScope wrapped, String agentName, Object response) { + super(wrapped, agentName); + this.response = response; + } + + public AgentResponse asAgentResponse() { + return new AgentResponse(this, agentName, state(), response); + } +} diff --git a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/LoopAgentsBuilder.java b/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/LoopAgentsBuilder.java index dd1d6c00..177c995e 100644 --- a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/LoopAgentsBuilder.java +++ b/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/LoopAgentsBuilder.java @@ -22,7 +22,9 @@ import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiPredicate; +import java.util.function.Consumer; import java.util.function.ObjIntConsumer; import java.util.function.Predicate; import java.util.stream.IntStream; @@ -31,6 +33,10 @@ public class LoopAgentsBuilder { private final FuncTaskItemListBuilder funcDelegate; private final ForTaskFunction forTask; + private final AtomicReference> beforeAgentInvocation = + new AtomicReference<>(); + private final AtomicReference> afterAgentInvocation = + new AtomicReference<>(); private int maxIterations = 1024; @@ -50,7 +56,11 @@ public LoopAgentsBuilder subAgents(String baseName, Object... agents) { execs, (exec, idx) -> funcDelegate.function( - baseName + "-" + idx, fn -> fn.function(AgentAdapters.toFunction(exec)))); + baseName + "-" + idx, + fn -> + fn.function( + AgentAdapters.toFunction( + exec, beforeAgentInvocation, afterAgentInvocation)))); return this; } @@ -73,6 +83,16 @@ public LoopAgentsBuilder exitCondition(BiPredicate exitCo return this; } + public LoopAgentsBuilder inputFrom(Consumer beforeAgentInvocationConsumer) { + this.beforeAgentInvocation.set(beforeAgentInvocationConsumer::accept); + return this; + } + + public LoopAgentsBuilder outputAs(Consumer afterAgentInvocationConsumer) { + this.afterAgentInvocation.set(afterAgentInvocationConsumer::accept); + return this; + } + public ForTaskFunction build() { this.forTask.setDo(this.funcDelegate.build()); this.forTask.withCollection(ignored -> IntStream.range(0, maxIterations).boxed().toList()); diff --git a/experimental/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/AgenticBeforeAfterInvokeTest.java b/experimental/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/AgenticBeforeAfterInvokeTest.java new file mode 100644 index 00000000..f8ca857a --- /dev/null +++ b/experimental/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/AgenticBeforeAfterInvokeTest.java @@ -0,0 +1,202 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.agentic; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.Test; + +public class AgenticBeforeAfterInvokeTest { + + @Test + void testAgentBeforeAfter() throws ExecutionException, InterruptedException { + Agents.ChatBot chatBot = mock(Agents.ChatBot.class); + when(chatBot.chat(eq("A Great Story"))).thenReturn("Once upon a time..."); + + Workflow wf = + AgentWorkflowBuilder.workflow() + .tasks( + t -> + t.agent(chatBot) + .inputFrom( + agenticScope -> { + assertTrue(agenticScope.state().containsKey("userInput")); + assertEquals("A Great Story", agenticScope.readState("userInput")); + agenticScope.writeState("testData", "someValue"); + }) + .outputAs( + agenticScope -> { + assertTrue(agenticScope.state().containsKey("userInput")); + assertEquals("A Great Story", agenticScope.readState("userInput")); + assertTrue(agenticScope.state().containsKey("testData")); + assertEquals("someValue", agenticScope.readState("testData")); + assertEquals("Once upon a time...", agenticScope.readState("input")); + agenticScope.writeState("outputData", "outputValue"); + })) + .build(); + + Map in = new HashMap<>(); + in.put("userInput", "A Great Story"); + + Map out; + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + out = app.workflowDefinition(wf).instance(in).start().get().asMap().orElseThrow(); + } + + assertTrue(out.containsKey("outputData")); + assertEquals("outputValue", out.get("outputData")); + assertTrue(out.containsKey("userInput")); + assertEquals("A Great Story", out.get("userInput")); + assertTrue(out.containsKey("testData")); + assertEquals("someValue", out.get("testData")); + assertTrue(out.containsKey("input")); + assertEquals("Once upon a time...", out.get("input")); + } + + @Test + void testSeqBeforeAfter() throws ExecutionException, InterruptedException { + Agents.CreativeWriter creativeWriter = mock(Agents.CreativeWriter.class); + when(creativeWriter.generateStory((eq("dragons and wizards")))) + .thenReturn("A fantasy story..."); + + Agents.StyleEditor styleEditor = mock(Agents.StyleEditor.class); + when(styleEditor.editStory(eq("A fantasy story..."), eq("young adults"))) + .thenReturn("An engaging fantasy story for young adults..."); + + Agents.StyleScorer styleScorer = mock(Agents.StyleScorer.class); + when(styleScorer.scoreStyle(eq("An engaging fantasy story for young adults..."), eq("funny"))) + .thenReturn(8.0); + + Workflow wf = + AgentWorkflowBuilder.workflow() + .tasks( + t -> + t.sequence(creativeWriter, styleEditor, styleScorer) + .inputFrom( + agenticScope -> { + if (agenticScope.readState("step") == null) { + agenticScope.writeState("topic", "dragons and wizards"); + agenticScope.writeState("step", "out_1"); + } else if (agenticScope.readState("step").equals("in_2")) { + agenticScope.writeState("story", agenticScope.readState("input")); + agenticScope.writeState("style", "young adults"); + agenticScope.writeState("step", "out_2"); + } else if (agenticScope.readState("step").equals("in_3")) { + agenticScope.writeState("story", agenticScope.readState("input")); + agenticScope.writeState("style", "funny"); + agenticScope.writeState("step", "out_3"); + } else { + throw new RuntimeException("We should not reach here"); + } + }) + .outputAs( + agenticScope -> { + if (agenticScope.readState("step").equals("out_1")) { + assertEquals("A fantasy story...", agenticScope.readState("input")); + agenticScope.writeState("step", "in_2"); + } else if (agenticScope.readState("step").equals("out_2")) { + assertEquals( + "An engaging fantasy story for young adults...", + agenticScope.readState("input")); + agenticScope.writeState("step", "in_3"); + } else if (agenticScope.readState("step").equals("out_3")) { + assertEquals(8.0, agenticScope.readState("input")); + } else { + throw new RuntimeException("We should not reach here"); + } + })) + .build(); + + Map out; + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + out = app.workflowDefinition(wf).instance(Map.of()).start().get().asMap().orElseThrow(); + } + + assertEquals(8.0, out.get("input")); + assertEquals("An engaging fantasy story for young adults...", out.get("story")); + assertEquals("funny", out.get("style")); + } + + @Test + void testParallelBeforeAfter() throws ExecutionException, InterruptedException { + Agents.ChatBot chatBot = mock(Agents.ChatBot.class); + when(chatBot.chat(eq("A Great Story"))).thenReturn("Once upon a time..."); + + Agents.MovieExpert movieExpert = mock(Agents.MovieExpert.class); + when(movieExpert.findMovie(eq("si-fi"))).thenReturn(List.of("Movie A", "Movie B")); + + Workflow wf = + AgentWorkflowBuilder.workflow() + .tasks( + t -> + t.parallel(chatBot, movieExpert) + .inputFrom( + agenticScope -> { + agenticScope.writeState("userInput", "A Great Story"); + agenticScope.writeState("mood", "si-fi"); + })) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + app.workflowDefinition(wf).instance(Map.of()).start().get().asJavaObject(); + } + } + + @Test + void testLoopBeforeAfter() throws ExecutionException, InterruptedException { + Agents.MovieExpert expert = mock(Agents.MovieExpert.class); + when(expert.findMovie(eq("si-fi"))).thenReturn(List.of("Movie A", "Movie B")); + AtomicInteger atomicInteger = new AtomicInteger(0); + + Workflow wf = + AgentWorkflowBuilder.workflow() + .tasks( + d -> + d.loop( + l -> + l.subAgents(expert) + .inputFrom(a -> atomicInteger.incrementAndGet()) + .outputAs( + agenticScope -> + agenticScope.writeState("count", atomicInteger.get())) + .exitCondition(exit -> ((Integer) exit.readState("count")) == 10))) + .build(); + + Map result; + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + result = + app.workflowDefinition(wf) + .instance(Map.of("mood", "si-fi", "count", 0)) + .start() + .get() + .asMap() + .orElseThrow(); + } + + assertEquals(10, result.get("count")); + } +}