From b62d8bedd3bd969843227c83ecc4215e66e654e4 Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Fri, 8 Aug 2025 17:24:09 -0400 Subject: [PATCH 1/4] Introduce ChatBot example Signed-off-by: Ricardo Zanini --- .../agentic/AgenticModelFactory.java | 29 ++++++++--- .../fluent/agentic/AgentDoTaskBuilder.java | 7 +++ .../agentic/AgentTaskItemListBuilder.java | 8 +++ .../fluent/agentic/Agents.java | 10 ++++ .../fluent/agentic/ChatBotIT.java | 49 +++++++++++++++++++ .../fluent/func/FuncDoTaskBuilder.java | 6 +++ .../fluent/func/FuncEmitTaskBuilder.java | 4 +- .../fluent/func/FuncListenTaskBuilder.java | 29 +++++++++++ .../fluent/func/FuncTaskItemListBuilder.java | 10 ++++ .../fluent/func/spi/FuncDoFluent.java | 5 +- .../fluent/spec/TaskBaseBuilder.java | 5 ++ .../fluent/spec/spi/ListenFluent.java | 2 +- 12 files changed, 154 insertions(+), 10 deletions(-) create mode 100644 fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java create mode 100644 fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenTaskBuilder.java diff --git a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java index 9f95c0d5..1d7089fe 100644 --- a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java +++ b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java @@ -28,6 +28,19 @@ class AgenticModelFactory implements WorkflowModelFactory { + private static final String DEFAULT_AGENTIC_SCOPE_STATE_KEY = "input"; + private final AgenticScopeRegistryAssessor scopeRegistryAssessor = + new AgenticScopeRegistryAssessor(); + + private AgenticModel asAgenticModel(Object value) { + // TODO: fetch memoryId from the object based on known premises + final AgenticScope agenticScope = this.scopeRegistryAssessor.getAgenticScope(); + if (value != null) { + agenticScope.writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value); + } + return new AgenticModel(agenticScope); + } + /** * Applies any change to the model after running as task. We will always set it to a @AgenticScope * object since @AgentExecutor is always adding the output to the agenticScope. We just have to @@ -60,44 +73,46 @@ public WorkflowModelCollection createCollection() { @Override public WorkflowModel from(boolean value) { - return new JavaModel(value); + return asAgenticModel(value); } @Override public WorkflowModel from(Number value) { - return new JavaModel(value); + return asAgenticModel(value); } @Override public WorkflowModel from(String value) { - return new JavaModel(value); + return asAgenticModel(value); } @Override public WorkflowModel from(CloudEvent ce) { + // TODO: serialize the CE into the AgenticScope return new JavaModel(ce); } @Override public WorkflowModel from(CloudEventData ce) { + // TODO: serialize the CE data into the AgenticScope return new JavaModel(ce); } @Override public WorkflowModel from(OffsetDateTime value) { - return new JavaModel(value); + return asAgenticModel(value); } @Override public WorkflowModel from(Map map) { - final AgenticScope agenticScope = new AgenticScopeRegistryAssessor().getAgenticScope(); + final AgenticScope agenticScope = this.scopeRegistryAssessor.getAgenticScope(); agenticScope.writeStates(map); return new AgenticModel(agenticScope); } @Override public WorkflowModel fromNull() { - return new JavaModel(null); + return asAgenticModel(null); } @Override @@ -105,6 +120,6 @@ public WorkflowModel fromOther(Object value) { if (value instanceof AgenticScope) { return new AgenticModel((AgenticScope) value); } - return new JavaModel(value); + return asAgenticModel(value); } } diff --git a/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentDoTaskBuilder.java b/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentDoTaskBuilder.java index 526deac0..5dddd851 100644 --- a/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentDoTaskBuilder.java +++ b/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentDoTaskBuilder.java @@ -20,6 +20,7 @@ import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder; import io.serverlessworkflow.fluent.func.FuncForTaskBuilder; import io.serverlessworkflow.fluent.func.FuncForkTaskBuilder; +import io.serverlessworkflow.fluent.func.FuncListenTaskBuilder; import io.serverlessworkflow.fluent.func.FuncSetTaskBuilder; import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder; import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; @@ -81,6 +82,12 @@ public AgentDoTaskBuilder emit(String name, Consumer itemsC return self(); } + @Override + public AgentDoTaskBuilder listen(String name, Consumer itemsConfigurer) { + this.listBuilder().listen(name, itemsConfigurer); + return self(); + } + @Override public AgentDoTaskBuilder forEach(String name, Consumer itemsConfigurer) { this.listBuilder().forEach(name, itemsConfigurer); diff --git a/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentTaskItemListBuilder.java b/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentTaskItemListBuilder.java index a26f0b45..0b164a0d 100644 --- a/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentTaskItemListBuilder.java +++ b/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentTaskItemListBuilder.java @@ -24,6 +24,7 @@ import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder; import io.serverlessworkflow.fluent.func.FuncForTaskBuilder; import io.serverlessworkflow.fluent.func.FuncForkTaskBuilder; +import io.serverlessworkflow.fluent.func.FuncListenTaskBuilder; import io.serverlessworkflow.fluent.func.FuncSetTaskBuilder; import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder; import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder; @@ -113,6 +114,13 @@ public AgentTaskItemListBuilder emit(String name, Consumer return self(); } + @Override + public AgentTaskItemListBuilder listen( + String name, Consumer itemsConfigurer) { + this.delegate.listen(name, itemsConfigurer); + return self(); + } + @Override public AgentTaskItemListBuilder forEach( String name, Consumer itemsConfigurer) { diff --git a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Agents.java b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Agents.java index a0c970ab..dde27919 100644 --- a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Agents.java +++ b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Agents.java @@ -17,12 +17,22 @@ import dev.langchain4j.agentic.Agent; import dev.langchain4j.agentic.internal.AgentSpecification; +import dev.langchain4j.service.MemoryId; +import dev.langchain4j.service.SystemMessage; import dev.langchain4j.service.UserMessage; import dev.langchain4j.service.V; import java.util.List; public interface Agents { + @SystemMessage( + """ + You are a happy chat bot. + """) + interface ChatBot { + String chat(@MemoryId String memoryId, @V("message") String message); + } + interface MovieExpert { @UserMessage( diff --git a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java new file mode 100644 index 00000000..0f653b4a --- /dev/null +++ b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java @@ -0,0 +1,49 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.agentic; + +import static org.mockito.Mockito.spy; + +import dev.langchain4j.agentic.AgenticServices; +import dev.langchain4j.agentic.scope.AgenticScope; +import org.junit.jupiter.api.Test; + +public class ChatBotIT { + + @Test + void chat_bot() { + Agents.ChatBot chatBot = + spy( + AgenticServices.agentBuilder(Agents.ChatBot.class) + .chatModel(Models.BASE_MODEL) + .outputName("message") + .build()); + // 1. listen to an event containing `message` key in the body + // 2. if contains, call the agent, if not end the workflow + // 3. After replying to the chat, return + AgentWorkflowBuilder.workflow("chat-bot") + .tasks( + t -> + t.listen( + "listenToMessages", + l -> + l.outputAs(null) + .one(c -> c.with(event -> event.type("org.acme.chatbot")))) + .when(scope -> !"".equals(scope.readState("message")), AgenticScope.class) + .agent(chatBot) + .then("listenToMessages")); + } +} diff --git a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskBuilder.java b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskBuilder.java index 723e8d23..613f76a2 100644 --- a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskBuilder.java +++ b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskBuilder.java @@ -41,6 +41,12 @@ public FuncDoTaskBuilder emit(String name, Consumer itemsCo return this; } + @Override + public FuncDoTaskBuilder listen(String name, Consumer itemsConfigurer) { + this.listBuilder().listen(name, itemsConfigurer); + return this; + } + @Override public FuncDoTaskBuilder forEach(String name, Consumer itemsConfigurer) { this.listBuilder().forEach(name, itemsConfigurer); diff --git a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitTaskBuilder.java b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitTaskBuilder.java index 89325ee8..28c8b8a4 100644 --- a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitTaskBuilder.java +++ b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitTaskBuilder.java @@ -16,10 +16,12 @@ package io.serverlessworkflow.fluent.func; import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; +import io.serverlessworkflow.fluent.func.spi.FuncTransformations; import io.serverlessworkflow.fluent.spec.EmitTaskBuilder; public class FuncEmitTaskBuilder extends EmitTaskBuilder - implements ConditionalTaskBuilder { + implements ConditionalTaskBuilder, + FuncTransformations { FuncEmitTaskBuilder() { super(); } diff --git a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenTaskBuilder.java b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenTaskBuilder.java new file mode 100644 index 00000000..54a82ed0 --- /dev/null +++ b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenTaskBuilder.java @@ -0,0 +1,29 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func; + +import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; +import io.serverlessworkflow.fluent.func.spi.FuncTransformations; +import io.serverlessworkflow.fluent.spec.ListenTaskBuilder; + +public class FuncListenTaskBuilder extends ListenTaskBuilder + implements ConditionalTaskBuilder, + FuncTransformations { + + FuncListenTaskBuilder() { + super(); + } +} diff --git a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java index 2c8b5524..6ef8d7b0 100644 --- a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java +++ b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java @@ -79,6 +79,16 @@ public FuncTaskItemListBuilder emit(String name, Consumer i new TaskItem(name, new Task().withEmitTask(emitTaskJavaBuilder.build()))); } + @Override + public FuncTaskItemListBuilder listen( + String name, Consumer itemsConfigurer) { + this.requireNameAndConfig(name, itemsConfigurer); + final FuncListenTaskBuilder listenTaskJavaBuilder = new FuncListenTaskBuilder(); + itemsConfigurer.accept(listenTaskJavaBuilder); + return this.addTaskItem( + new TaskItem(name, new Task().withListenTask(listenTaskJavaBuilder.build()))); + } + @Override public FuncTaskItemListBuilder forEach( String name, Consumer itemsConfigurer) { diff --git a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncDoFluent.java b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncDoFluent.java index 434304d5..b452f5df 100644 --- a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncDoFluent.java +++ b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncDoFluent.java @@ -19,11 +19,13 @@ import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder; import io.serverlessworkflow.fluent.func.FuncForTaskBuilder; import io.serverlessworkflow.fluent.func.FuncForkTaskBuilder; +import io.serverlessworkflow.fluent.func.FuncListenTaskBuilder; import io.serverlessworkflow.fluent.func.FuncSetTaskBuilder; import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder; import io.serverlessworkflow.fluent.spec.spi.EmitFluent; import io.serverlessworkflow.fluent.spec.spi.ForEachFluent; import io.serverlessworkflow.fluent.spec.spi.ForkFluent; +import io.serverlessworkflow.fluent.spec.spi.ListenFluent; import io.serverlessworkflow.fluent.spec.spi.SetFluent; import io.serverlessworkflow.fluent.spec.spi.SwitchFluent; import java.util.UUID; @@ -36,7 +38,8 @@ public interface FuncDoFluent> EmitFluent, ForEachFluent, SwitchFluent, - ForkFluent { + ForkFluent, + ListenFluent { SELF callFn(String name, Consumer cfg); diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskBaseBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskBaseBuilder.java index 3ce5c203..817f7828 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskBaseBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskBaseBuilder.java @@ -80,6 +80,11 @@ public T then(FlowDirectiveEnum then) { return self(); } + public T then(String taskName) { + this.task.setThen(new FlowDirective().withString(taskName)); + return self(); + } + public T exportAs(Object exportAs) { this.task.setExport(new ExportBuilder().as(exportAs).build()); return self(); diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/ListenFluent.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/ListenFluent.java index c3d32e14..ec950456 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/ListenFluent.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/ListenFluent.java @@ -19,7 +19,7 @@ import java.util.UUID; import java.util.function.Consumer; -public interface ListenFluent, LIST> { +public interface ListenFluent, LIST> { LIST listen(String name, Consumer itemsConfigurer); From 12433581cc1a8ea70ec3f8755f4befb63ddc6081 Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Mon, 11 Aug 2025 19:43:19 -0400 Subject: [PATCH 2/4] Refactor agentic to hold scope and current workflow context Signed-off-by: Ricardo Zanini --- experimental/agentic/pom.xml | 2 +- .../expressions/agentic/AgenticModel.java | 20 +- .../agentic/AgenticModelCollection.java | 55 +++++- .../agentic/AgenticModelFactory.java | 73 ++++--- .../AgenticScopeRegistryAssessor.java | 9 +- .../expressions/func/JavaModelCollection.java | 10 +- .../expressions/func/JavaModelFactory.java | 2 +- .../WorkflowInvocationHandler.java | 10 +- .../fluent/agentic/Agents.java | 10 +- .../fluent/agentic/ChatBotIT.java | 179 ++++++++++++++++-- .../fluent/agentic/Models.java | 2 +- .../fluent/agentic/WorkflowTests.java | 26 +-- .../jq/JacksonModelCollection.java | 10 +- 13 files changed, 301 insertions(+), 107 deletions(-) diff --git a/experimental/agentic/pom.xml b/experimental/agentic/pom.xml index dec8a5e5..2d648084 100644 --- a/experimental/agentic/pom.xml +++ b/experimental/agentic/pom.xml @@ -7,7 +7,7 @@ 8.0.0-SNAPSHOT serverlessworkflow-experimental-agentic - ServelessWorkflow:: Experimental:: Agentic + Serveless Workflow :: Experimental :: Agentic io.serverlessworkflow diff --git a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModel.java b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModel.java index 3d352812..ef1f0a55 100644 --- a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModel.java +++ b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModel.java @@ -24,31 +24,33 @@ class AgenticModel extends JavaModel { - AgenticModel(AgenticScope agenticScope) { - super(agenticScope); + private final AgenticScope agenticScope; + + AgenticModel(AgenticScope agenticScope, Object object) { + super(object); + this.agenticScope = agenticScope; } - @Override - public void setObject(Object obj) { - super.setObject(obj); + public AgenticScope getAgenticScope() { + return agenticScope; } @Override public Collection asCollection() { - throw new UnsupportedOperationException("Not supported yet."); + throw new UnsupportedOperationException("asCollection() is not supported yet."); } @Override public Optional> asMap() { - return Optional.of(((AgenticScope) object).state()); + return Optional.of(this.agenticScope.state()); } @Override public Optional as(Class clazz) { if (AgenticScope.class.isAssignableFrom(clazz)) { - return Optional.of(clazz.cast(object)); + return Optional.of(clazz.cast(this.agenticScope)); } else if (Map.class.isAssignableFrom(clazz)) { - return Optional.of(clazz.cast(((AgenticScope) object).state())); + return asMap().map(clazz::cast); } else { return super.as(clazz); } diff --git a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelCollection.java b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelCollection.java index 2ea0e382..68a17800 100644 --- a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelCollection.java +++ b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelCollection.java @@ -15,29 +15,66 @@ */ package io.serverlessworkflow.impl.expressions.agentic; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import dev.langchain4j.agentic.scope.AgenticScope; import dev.langchain4j.agentic.scope.ResultWithAgenticScope; +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.expressions.func.JavaModelCollection; -import java.util.Collection; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; import java.util.Optional; -class AgenticModelCollection extends JavaModelCollection { +public class AgenticModelCollection extends JavaModelCollection { private final AgenticScope agenticScope; - - AgenticModelCollection(Collection object, AgenticScope agenticScope) { - super(object); - this.agenticScope = agenticScope; - } + private final ObjectMapper mapper = new ObjectMapper(); AgenticModelCollection(AgenticScope agenticScope) { + super(Collections.emptyList()); this.agenticScope = agenticScope; } @Override - protected WorkflowModel nextItem(Object obj) { - return new AgenticModel((AgenticScope) obj); + public boolean add(WorkflowModel e) { + Optional> asMap = e.asMap(); + if (asMap.isPresent()) { + this.agenticScope.writeStates(asMap.get()); + } else { + // Update the agenticScope with the event body, so agents can use the event data as input + Object javaObj = e.asJavaObject(); + if (javaObj instanceof CloudEvent) { + try { + this.agenticScope.writeStates( + mapper.readValue( + Objects.requireNonNull(((CloudEvent) javaObj).getData()).toString(), + new TypeReference<>() {})); + } catch (JsonProcessingException ex) { + throw new IllegalArgumentException( + "Unable to parse CloudEvent, data must be a valid JSON", ex); + } + } else if (javaObj instanceof CloudEventData) { + try { + this.agenticScope.writeStates( + mapper.readValue( + Objects.requireNonNull(((CloudEventData) javaObj)).toBytes(), + new TypeReference<>() {})); + } catch (IOException ex) { + throw new IllegalArgumentException( + "Unable to parse CloudEventData, data must be a valid JSON", ex); + } + } else { + this.agenticScope.writeState(AgenticModelFactory.DEFAULT_AGENTIC_SCOPE_STATE_KEY, javaObj); + } + } + + // add to the collection + return super.add(e); } @Override diff --git a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java index 1d7089fe..00bc4e0a 100644 --- a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java +++ b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java @@ -22,38 +22,31 @@ import io.serverlessworkflow.impl.WorkflowModelCollection; import io.serverlessworkflow.impl.WorkflowModelFactory; import io.serverlessworkflow.impl.expressions.agentic.langchain4j.AgenticScopeRegistryAssessor; -import io.serverlessworkflow.impl.expressions.func.JavaModel; import java.time.OffsetDateTime; import java.util.Map; class AgenticModelFactory implements WorkflowModelFactory { - private static final String DEFAULT_AGENTIC_SCOPE_STATE_KEY = "input"; + static final String DEFAULT_AGENTIC_SCOPE_STATE_KEY = "input"; private final AgenticScopeRegistryAssessor scopeRegistryAssessor = new AgenticScopeRegistryAssessor(); - private AgenticModel asAgenticModel(Object value) { - // TODO: fetch memoryId from the object based on known premises - final AgenticScope agenticScope = this.scopeRegistryAssessor.getAgenticScope(); - if (value != null) { - agenticScope.writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value); - } - return new AgenticModel(agenticScope); - } - - /** - * Applies any change to the model after running as task. We will always set it to a @AgenticScope - * object since @AgentExecutor is always adding the output to the agenticScope. We just have to - * make sure that agenticScope is always passed to the next input task. - * - * @param prev the global AgenticScope object getting updated by the workflow context - * @param obj the same AgenticScope object updated by the AgentExecutor - * @return the workflow context model holding the agenticScope object. - */ @Override + @SuppressWarnings("unchecked") public WorkflowModel fromAny(WorkflowModel prev, Object obj) { - // We ignore `obj` since it's already included in `prev` within the agenticScope instance - return prev; + // TODO: we shouldn't update the state if the previous task was an agent call since under the + // hood, the agent already updated it. + if (prev instanceof AgenticModel agenticModel) { + this.scopeRegistryAssessor.setAgenticScope(agenticModel.getAgenticScope()); + } + + if (obj instanceof Map) { + this.scopeRegistryAssessor.getAgenticScope().writeStates((Map) obj); + } else { + this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, obj); + } + + return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), obj); } @Override @@ -66,60 +59,60 @@ public WorkflowModel combine(Map workflowVariables) { @Override public WorkflowModelCollection createCollection() { - throw new UnsupportedOperationException(); + return new AgenticModelCollection(this.scopeRegistryAssessor.getAgenticScope()); } - // TODO: all these methods can use agenticScope as long as we have access to the `outputName` - @Override public WorkflowModel from(boolean value) { - return asAgenticModel(value); + this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value); + return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), value); } @Override public WorkflowModel from(Number value) { - return asAgenticModel(value); + this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value); + return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), value); } @Override public WorkflowModel from(String value) { - return asAgenticModel(value); + this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value); + return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), value); } @Override public WorkflowModel from(CloudEvent ce) { - // TODO: serialize the CE into the AgenticScope - return new JavaModel(ce); + return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), ce); } @Override public WorkflowModel from(CloudEventData ce) { - // TODO: serialize the CE data into the AgenticScope - return new JavaModel(ce); + return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), ce); } @Override public WorkflowModel from(OffsetDateTime value) { - return asAgenticModel(value); + this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value); + return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), value); } @Override public WorkflowModel from(Map map) { - final AgenticScope agenticScope = this.scopeRegistryAssessor.getAgenticScope(); - agenticScope.writeStates(map); - return new AgenticModel(agenticScope); + this.scopeRegistryAssessor.getAgenticScope().writeStates(map); + return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), map); } @Override public WorkflowModel fromNull() { - return asAgenticModel(null); + return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), null); } @Override public WorkflowModel fromOther(Object value) { - if (value instanceof AgenticScope) { - return new AgenticModel((AgenticScope) value); + if (value instanceof AgenticScope scope) { + return new AgenticModel(scope, scope.state()); } - return asAgenticModel(value); + this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value); + return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), value); } } diff --git a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/langchain4j/AgenticScopeRegistryAssessor.java b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/langchain4j/AgenticScopeRegistryAssessor.java index 01ccd1cd..34a959c3 100644 --- a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/langchain4j/AgenticScopeRegistryAssessor.java +++ b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/langchain4j/AgenticScopeRegistryAssessor.java @@ -16,6 +16,7 @@ package io.serverlessworkflow.impl.expressions.agentic.langchain4j; import dev.langchain4j.agentic.internal.AgenticScopeOwner; +import dev.langchain4j.agentic.scope.AgenticScope; import dev.langchain4j.agentic.scope.AgenticScopeRegistry; import dev.langchain4j.agentic.scope.DefaultAgenticScope; import java.util.Objects; @@ -27,7 +28,7 @@ public class AgenticScopeRegistryAssessor implements AgenticScopeOwner { private final AtomicReference agenticScopeRegistry = new AtomicReference<>(); private final String agentId; - private DefaultAgenticScope agenticScope; + private AgenticScope agenticScope; private Object memoryId; public AgenticScopeRegistryAssessor(String agentId) { @@ -44,7 +45,7 @@ public void setMemoryId(Object memoryId) { this.memoryId = memoryId; } - public DefaultAgenticScope getAgenticScope() { + public AgenticScope getAgenticScope() { if (agenticScope != null) { return agenticScope; } @@ -57,6 +58,10 @@ public DefaultAgenticScope getAgenticScope() { return this.agenticScope; } + public void setAgenticScope(AgenticScope agenticScope) { + this.agenticScope = agenticScope; + } + @Override public AgenticScopeOwner withAgenticScope(DefaultAgenticScope agenticScope) { this.agenticScope = agenticScope; diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/expressions/func/JavaModelCollection.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/expressions/func/JavaModelCollection.java index f4ac21a7..2f84411a 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/expressions/func/JavaModelCollection.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/expressions/func/JavaModelCollection.java @@ -46,7 +46,7 @@ public boolean isEmpty() { @Override public boolean contains(Object o) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("contains() is not supported yet"); } private class ModelIterator implements Iterator { @@ -80,12 +80,12 @@ public Iterator iterator() { @Override public Object[] toArray() { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("toArray is not supported yet"); } @Override public T[] toArray(T[] a) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("toArray is not supported yet"); } @Override @@ -100,7 +100,7 @@ public boolean remove(Object o) { @Override public boolean containsAll(Collection c) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("containsAll is not supported yet"); } @Override @@ -119,7 +119,7 @@ public boolean removeAll(Collection c) { @Override public boolean retainAll(Collection c) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("retainAll() is not supported yet"); } @Override diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/expressions/func/JavaModelFactory.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/expressions/func/JavaModelFactory.java index c314bea7..4502abf1 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/expressions/func/JavaModelFactory.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/expressions/func/JavaModelFactory.java @@ -23,7 +23,7 @@ import java.time.OffsetDateTime; import java.util.Map; -class JavaModelFactory implements WorkflowModelFactory { +public class JavaModelFactory implements WorkflowModelFactory { private final JavaModel TrueModel = new JavaModel(Boolean.TRUE); private final JavaModel FalseModel = new JavaModel(Boolean.FALSE); private final JavaModel NullModel = new JavaModel(null); diff --git a/fluent/agentic-langchain4j/src/main/java/io/serverlessworkflow/fluent/agentic/langchain4j/WorkflowInvocationHandler.java b/fluent/agentic-langchain4j/src/main/java/io/serverlessworkflow/fluent/agentic/langchain4j/WorkflowInvocationHandler.java index 0f7ce656..5ffbfb3a 100644 --- a/fluent/agentic-langchain4j/src/main/java/io/serverlessworkflow/fluent/agentic/langchain4j/WorkflowInvocationHandler.java +++ b/fluent/agentic-langchain4j/src/main/java/io/serverlessworkflow/fluent/agentic/langchain4j/WorkflowInvocationHandler.java @@ -115,20 +115,20 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl } // invoke - return executeWorkflow(currentCognisphere(method, args), method, args); + return executeWorkflow(currentAgenticScope(method, args), method, args); } - private Object executeWorkflow(DefaultAgenticScope agenticScope, Method method, Object[] args) { + private Object executeWorkflow(AgenticScope agenticScope, Method method, Object[] args) { writeAgenticScopeState(agenticScope, method, args); try (WorkflowApplication app = workflowApplicationBuilder.build()) { // TODO improve result handling - DefaultAgenticScope output = + AgenticScope output = app.workflowDefinition(workflow) .instance(agenticScope) .start() .get() - .as(DefaultAgenticScope.class) + .as(AgenticScope.class) .orElseThrow( () -> new IllegalArgumentException( @@ -149,7 +149,7 @@ private Object executeWorkflow(DefaultAgenticScope agenticScope, Method method, } } - private DefaultAgenticScope currentCognisphere(Method method, Object[] args) { + private AgenticScope currentAgenticScope(Method method, Object[] args) { Object memoryId = memoryId(method, args); this.agenticScopeRegistryAssessor.setMemoryId(memoryId); return this.agenticScopeRegistryAssessor.getAgenticScope(); diff --git a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Agents.java b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Agents.java index dde27919..c1c17020 100644 --- a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Agents.java +++ b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Agents.java @@ -18,18 +18,18 @@ import dev.langchain4j.agentic.Agent; import dev.langchain4j.agentic.internal.AgentSpecification; import dev.langchain4j.service.MemoryId; -import dev.langchain4j.service.SystemMessage; import dev.langchain4j.service.UserMessage; import dev.langchain4j.service.V; import java.util.List; public interface Agents { - @SystemMessage( - """ - You are a happy chat bot. - """) interface ChatBot { + @UserMessage( + """ + You are a happy chat bot. + """) + @Agent String chat(@MemoryId String memoryId, @V("message") String message); } diff --git a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java index 0f653b4a..38e92487 100644 --- a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java +++ b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java @@ -15,35 +15,192 @@ */ package io.serverlessworkflow.fluent.agentic; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.spy; import dev.langchain4j.agentic.AgenticServices; import dev.langchain4j.agentic.scope.AgenticScope; +import dev.langchain4j.memory.chat.MessageWindowChatMemory; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.v1.CloudEventBuilder; +import io.serverlessworkflow.api.types.EventFilter; +import io.serverlessworkflow.api.types.EventProperties; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowStatus; +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; public class ChatBotIT { @Test + @SuppressWarnings("unchecked") + @Disabled("Figuring out event processing") void chat_bot() { Agents.ChatBot chatBot = spy( AgenticServices.agentBuilder(Agents.ChatBot.class) .chatModel(Models.BASE_MODEL) + .chatMemoryProvider(memoryId -> MessageWindowChatMemory.withMaxMessages(10)) .outputName("message") .build()); + Collection publishedEvents = new ArrayList<>(); + // 1. listen to an event containing `message` key in the body // 2. if contains, call the agent, if not end the workflow // 3. After replying to the chat, return - AgentWorkflowBuilder.workflow("chat-bot") - .tasks( - t -> - t.listen( - "listenToMessages", - l -> - l.outputAs(null) - .one(c -> c.with(event -> event.type("org.acme.chatbot")))) - .when(scope -> !"".equals(scope.readState("message")), AgenticScope.class) - .agent(chatBot) - .then("listenToMessages")); + final Workflow listenWorkflow = + AgentWorkflowBuilder.workflow("chat-bot") + .tasks( + t -> + t.listen( + "listenToMessages", + l -> + l.one(c -> c.with(event -> event.type("org.acme.chatbot.request")))) + .when(message -> !"".equals(message.get("message")), Map.class) + .agent(chatBot) + .emit(emit -> emit.event(e -> e.type("org.acme.chatbot.reply"))) + .then("listenToMessages")) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + app.eventConsumer() + .register( + app.eventConsumer() + .listen( + new EventFilter() + .withWith(new EventProperties().withType("org.acme.chatbot.reply")), + app), + ce -> publishedEvents.add((CloudEvent) ce)); + + final WorkflowInstance waitingInstance = + app.workflowDefinition(listenWorkflow).instance(null); + final CompletableFuture runningModel = waitingInstance.start(); + + // The workflow is just waiting for the event + assertEquals(WorkflowStatus.WAITING, waitingInstance.status()); + + // Publish the event + app.eventPublisher().publish(newMessageEvent("Hello World!")); + + AgenticScope scope = runningModel.get().as(AgenticScope.class).orElseThrow(); + assertNotNull(scope.readState("message")); + assertFalse(scope.readState("message").toString().isEmpty()); + assertEquals(1, publishedEvents.size()); + + // We ingested the event, and we keep waiting for the next + // assertEquals(WorkflowStatus.WAITING, waitingInstance.status()); + + // Publish the event with an empty message to wrap up + app.eventPublisher().publish(newMessageEvent("")); + + scope = runningModel.join().as(AgenticScope.class).orElseThrow(); + assertNotNull(scope.readState("message")); + assertTrue(scope.readState("message").toString().isEmpty()); + assertEquals(2, publishedEvents.size()); + + // Workflow should be done + assertEquals(WorkflowStatus.COMPLETED, waitingInstance.status()); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + /** + * In this test we validate a workflow mixed with agents and regular Java calls + * + *

+ * + *

    + *
  1. The first function prints the message input and converts the data into a Map for the + * agent ingestion + *
  2. Internally, our factories will add the output to a new AgenticScope since under the hood, + * we are call `as(AgenticScope)` + *
  3. The agent is then called with a scope with a state as `message="input"` + *
  4. The agent updates the state automatically in the AgenticScope and returns the message as + * a string, this string is then served to the next task + *
  5. The next task process the agent response and returns it ending the workflow. Meanwhile, + * the AgenticScope is always updated with the latest result from the given task. + *
+ */ + @Test + void mixed_workflow() { + Agents.ChatBot chatBot = + spy( + AgenticServices.agentBuilder(Agents.ChatBot.class) + .chatModel(Models.BASE_MODEL) + .chatMemoryProvider(memoryId -> MessageWindowChatMemory.withMaxMessages(10)) + .outputName("message") + .build()); + + final Workflow mixedWorkflow = + AgentWorkflowBuilder.workflow("chat-bot") + .tasks( + t -> + t.callFn( + callJ -> + callJ.function( + input -> { + System.out.println(input); + return Map.of("message", input); + }, + String.class)) + .agent(chatBot) + .callFn( + callJ -> + callJ.function( + input -> { + System.out.println(input); + // Here, we are return a simple string so the internal + // AgenticScope will add it to the default `input` key + // If we want to really manipulate it, we could return a + // Map<>(message, input) + return "I've changed the input [" + input + "]"; + }, + String.class))) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + WorkflowModel model = + app.workflowDefinition(mixedWorkflow).instance("Hello World!").start().join(); + + Optional resultAsString = model.as(String.class); + + assertTrue(resultAsString.isPresent()); + assertFalse(resultAsString.get().isEmpty()); + assertTrue(resultAsString.get().contains("changed the input")); + + Optional resultAsScope = model.as(AgenticScope.class); + + assertTrue(resultAsScope.isPresent()); + assertFalse(resultAsScope.get().readState("input").toString().isEmpty()); + assertTrue(resultAsScope.get().readState("input").toString().contains("changed the input")); + } + } + + private CloudEvent newMessageEvent(String message) { + return new CloudEventBuilder() + .withData(String.format("{\"message\": \"%s\"}", message).getBytes()) + .withType("org.acme.chatbot.request") + .withId(UUID.randomUUID().toString()) + .withDataContentType("application/json") + .withSource(URI.create("test://localhost")) + .withSubject("A chatbot message") + .withTime(OffsetDateTime.now()) + .build(); } } diff --git a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Models.java b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Models.java index e06aafda..170281c1 100644 --- a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Models.java +++ b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Models.java @@ -22,7 +22,7 @@ public class Models { static final ChatModel BASE_MODEL = OllamaChatModel.builder() - .baseUrl("http://127.0.0.1:1143") + .baseUrl("http://127.0.0.1:11434") .modelName("qwen2.5:7b") .timeout(Duration.ofMinutes(10)) .temperature(0.0) diff --git a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/WorkflowTests.java b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/WorkflowTests.java index acf5c411..d38e863c 100644 --- a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/WorkflowTests.java +++ b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/WorkflowTests.java @@ -23,7 +23,7 @@ import static org.mockito.Mockito.when; import dev.langchain4j.agentic.AgenticServices; -import dev.langchain4j.agentic.scope.DefaultAgenticScope; +import dev.langchain4j.agentic.scope.AgenticScope; import dev.langchain4j.agentic.workflow.HumanInTheLoop; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.WorkflowApplication; @@ -53,12 +53,12 @@ public void testAgent() throws ExecutionException, InterruptedException { topic.put("title", "A Great Story"); try (WorkflowApplication app = WorkflowApplication.builder().build()) { - DefaultAgenticScope result = + AgenticScope result = app.workflowDefinition(workflow) .instance(topic) .start() .get() - .as(DefaultAgenticScope.class) + .as(AgenticScope.class) .orElseThrow(); assertEquals("storySeedAgent", result.readState("premise")); @@ -93,12 +93,12 @@ public void testAgents() throws ExecutionException, InterruptedException { topic.put("title", "A Great Story"); try (WorkflowApplication app = WorkflowApplication.builder().build()) { - DefaultAgenticScope result = + AgenticScope result = app.workflowDefinition(workflow) .instance(topic) .start() .get() - .as(DefaultAgenticScope.class) + .as(AgenticScope.class) .orElseThrow(); assertEquals("sceneAgent", result.readState("story")); @@ -129,12 +129,12 @@ public void testSequence() throws ExecutionException, InterruptedException { topic.put("title", "A Great Story"); try (WorkflowApplication app = WorkflowApplication.builder().build()) { - DefaultAgenticScope result = + AgenticScope result = app.workflowDefinition(workflow) .instance(topic) .start() .get() - .as(DefaultAgenticScope.class) + .as(AgenticScope.class) .orElseThrow(); assertEquals("sceneAgent", result.readState("story")); @@ -166,12 +166,12 @@ public void testParallel() throws ExecutionException, InterruptedException { topic.put("style", "sci-fi"); try (WorkflowApplication app = WorkflowApplication.builder().build()) { - DefaultAgenticScope result = + AgenticScope result = app.workflowDefinition(workflow) .instance(topic) .start() .get() - .as(DefaultAgenticScope.class) + .as(AgenticScope.class) .orElseThrow(); assertEquals("Fake conflict response", result.readState("setting")); @@ -212,12 +212,12 @@ public void testSeqAndThenParallel() throws ExecutionException, InterruptedExcep topic.put("fact", "alien"); try (WorkflowApplication app = WorkflowApplication.builder().build()) { - DefaultAgenticScope result = + AgenticScope result = app.workflowDefinition(workflow) .instance(topic) .start() .get() - .as(DefaultAgenticScope.class) + .as(AgenticScope.class) .orElseThrow(); assertEquals(cultureTraits, result.readState("culture")); @@ -274,12 +274,12 @@ public void humanInTheLoop() throws ExecutionException, InterruptedException { initialValues.put("agenda", "Discuss project updates"); try (WorkflowApplication app = WorkflowApplication.builder().build()) { - DefaultAgenticScope result = + AgenticScope result = app.workflowDefinition(workflow) .instance(initialValues) .start() .get() - .as(DefaultAgenticScope.class) + .as(AgenticScope.class) .orElseThrow(); assertEquals("Styled meeting invitation for John Doe", result.readState("styled")); diff --git a/impl/jackson/src/main/java/io/serverlessworkflow/impl/expressions/jq/JacksonModelCollection.java b/impl/jackson/src/main/java/io/serverlessworkflow/impl/expressions/jq/JacksonModelCollection.java index b5420419..456db165 100644 --- a/impl/jackson/src/main/java/io/serverlessworkflow/impl/expressions/jq/JacksonModelCollection.java +++ b/impl/jackson/src/main/java/io/serverlessworkflow/impl/expressions/jq/JacksonModelCollection.java @@ -55,7 +55,7 @@ public boolean isEmpty() { @Override public boolean contains(Object o) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("contains() is not supported yet"); } @Override @@ -85,12 +85,12 @@ public WorkflowModel next() { @Override public Object[] toArray() { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("toArray() is not supported yet"); } @Override public T[] toArray(T[] a) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("toArray() is not supported yet"); } @Override @@ -109,7 +109,7 @@ public boolean remove(Object o) { @Override public boolean containsAll(Collection c) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("containsAll() is not supported yet"); } @Override @@ -127,7 +127,7 @@ public boolean removeAll(Collection c) { @Override public boolean retainAll(Collection c) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("retainAll() is not supported yet"); } @Override From 5c91148cb8876bc72d12d9dc3086833378c3394a Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Tue, 12 Aug 2025 15:06:37 -0400 Subject: [PATCH 3/4] Small refactor Signed-off-by: Ricardo Zanini --- .../agentic/AgenticModelCollection.java | 40 ++++++---------- .../agentic/AgenticModelFactory.java | 48 ++++++++++++------- .../AgenticScopeRegistryAssessor.java | 4 +- 3 files changed, 46 insertions(+), 46 deletions(-) diff --git a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelCollection.java b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelCollection.java index 68a17800..4b23e690 100644 --- a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelCollection.java +++ b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelCollection.java @@ -15,7 +15,6 @@ */ package io.serverlessworkflow.impl.expressions.agentic; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import dev.langchain4j.agentic.scope.AgenticScope; @@ -27,7 +26,6 @@ import java.io.IOException; import java.util.Collections; import java.util.Map; -import java.util.Objects; import java.util.Optional; public class AgenticModelCollection extends JavaModelCollection { @@ -45,32 +43,22 @@ public boolean add(WorkflowModel e) { Optional> asMap = e.asMap(); if (asMap.isPresent()) { this.agenticScope.writeStates(asMap.get()); - } else { - // Update the agenticScope with the event body, so agents can use the event data as input - Object javaObj = e.asJavaObject(); - if (javaObj instanceof CloudEvent) { - try { - this.agenticScope.writeStates( - mapper.readValue( - Objects.requireNonNull(((CloudEvent) javaObj).getData()).toString(), - new TypeReference<>() {})); - } catch (JsonProcessingException ex) { - throw new IllegalArgumentException( - "Unable to parse CloudEvent, data must be a valid JSON", ex); - } - } else if (javaObj instanceof CloudEventData) { - try { - this.agenticScope.writeStates( - mapper.readValue( - Objects.requireNonNull(((CloudEventData) javaObj)).toBytes(), - new TypeReference<>() {})); - } catch (IOException ex) { - throw new IllegalArgumentException( - "Unable to parse CloudEventData, data must be a valid JSON", ex); - } + return super.add(e); + } + + // Update the agenticScope with the event body, so agents can use the event data as input + Object javaObj = e.asJavaObject(); + try { + if (javaObj instanceof CloudEvent ce && ce.getData() != null) { + agenticScope.writeStates( + mapper.readValue(ce.getData().toBytes(), new TypeReference<>() {})); + } else if (javaObj instanceof CloudEventData ced) { + agenticScope.writeStates(mapper.readValue(ced.toBytes(), new TypeReference<>() {})); } else { - this.agenticScope.writeState(AgenticModelFactory.DEFAULT_AGENTIC_SCOPE_STATE_KEY, javaObj); + agenticScope.writeState(AgenticModelFactory.DEFAULT_AGENTIC_SCOPE_STATE_KEY, javaObj); } + } catch (IOException ex) { + throw new IllegalArgumentException("Unable to parse CloudEvent data as JSON", ex); } // add to the collection diff --git a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java index 00bc4e0a..67391ae8 100644 --- a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java +++ b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java @@ -31,6 +31,18 @@ class AgenticModelFactory implements WorkflowModelFactory { private final AgenticScopeRegistryAssessor scopeRegistryAssessor = new AgenticScopeRegistryAssessor(); + private void updateAgenticScope(Object value) { + this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value); + } + + private void updateAgenticScope(Map state) { + this.scopeRegistryAssessor.getAgenticScope().writeStates(state); + } + + private AgenticModel asAgenticModel(Object value) { + return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), value); + } + @Override @SuppressWarnings("unchecked") public WorkflowModel fromAny(WorkflowModel prev, Object obj) { @@ -41,12 +53,12 @@ public WorkflowModel fromAny(WorkflowModel prev, Object obj) { } if (obj instanceof Map) { - this.scopeRegistryAssessor.getAgenticScope().writeStates((Map) obj); + this.updateAgenticScope((Map) obj); } else { - this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, obj); + this.updateAgenticScope(obj); } - return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), obj); + return asAgenticModel(obj); } @Override @@ -64,47 +76,47 @@ public WorkflowModelCollection createCollection() { @Override public WorkflowModel from(boolean value) { - this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value); - return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), value); + this.updateAgenticScope(value); + return asAgenticModel(value); } @Override public WorkflowModel from(Number value) { - this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value); - return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), value); + this.updateAgenticScope(value); + return asAgenticModel(value); } @Override public WorkflowModel from(String value) { - this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value); - return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), value); + this.updateAgenticScope(value); + return asAgenticModel(value); } @Override public WorkflowModel from(CloudEvent ce) { - return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), ce); + return asAgenticModel(ce); } @Override public WorkflowModel from(CloudEventData ce) { - return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), ce); + return asAgenticModel(ce); } @Override public WorkflowModel from(OffsetDateTime value) { - this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value); - return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), value); + this.updateAgenticScope(value); + return asAgenticModel(value); } @Override public WorkflowModel from(Map map) { - this.scopeRegistryAssessor.getAgenticScope().writeStates(map); - return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), map); + this.updateAgenticScope(map); + return asAgenticModel(map); } @Override public WorkflowModel fromNull() { - return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), null); + return asAgenticModel(null); } @Override @@ -112,7 +124,7 @@ public WorkflowModel fromOther(Object value) { if (value instanceof AgenticScope scope) { return new AgenticModel(scope, scope.state()); } - this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value); - return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), value); + this.updateAgenticScope(value); + return asAgenticModel(value); } } diff --git a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/langchain4j/AgenticScopeRegistryAssessor.java b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/langchain4j/AgenticScopeRegistryAssessor.java index 34a959c3..390a3a42 100644 --- a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/langchain4j/AgenticScopeRegistryAssessor.java +++ b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/langchain4j/AgenticScopeRegistryAssessor.java @@ -59,12 +59,12 @@ public AgenticScope getAgenticScope() { } public void setAgenticScope(AgenticScope agenticScope) { - this.agenticScope = agenticScope; + this.agenticScope = Objects.requireNonNull(agenticScope, "AgenticScope cannot be null"); } @Override public AgenticScopeOwner withAgenticScope(DefaultAgenticScope agenticScope) { - this.agenticScope = agenticScope; + this.setAgenticScope(agenticScope); return this; } From 3533cd6d1c916c426beb7b4e448ef3b44d7f4ff1 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Wed, 13 Aug 2025 12:55:34 +0200 Subject: [PATCH 4/4] Adding until to listen Signed-off-by: fjtirado --- .../fluent/agentic/ChatBotIT.java | 10 +++++---- .../fluent/func/FuncListenTaskBuilder.java | 22 +++++++++++++++++++ 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java index 38e92487..44884266 100644 --- a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java +++ b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java @@ -70,11 +70,13 @@ void chat_bot() { t.listen( "listenToMessages", l -> - l.one(c -> c.with(event -> event.type("org.acme.chatbot.request")))) - .when(message -> !"".equals(message.get("message")), Map.class) + l.until(message -> "".equals(message.get("message")), Map.class) + .one( + c -> + c.with( + event -> event.type("org.acme.chatbot.request")))) .agent(chatBot) - .emit(emit -> emit.event(e -> e.type("org.acme.chatbot.reply"))) - .then("listenToMessages")) + .emit(emit -> emit.event(e -> e.type("org.acme.chatbot.reply")))) .build(); try (WorkflowApplication app = WorkflowApplication.builder().build()) { diff --git a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenTaskBuilder.java b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenTaskBuilder.java index 54a82ed0..bcdd82dd 100644 --- a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenTaskBuilder.java +++ b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenTaskBuilder.java @@ -15,15 +15,37 @@ */ package io.serverlessworkflow.fluent.func; +import io.serverlessworkflow.api.types.AnyEventConsumptionStrategy; +import io.serverlessworkflow.api.types.ListenTask; +import io.serverlessworkflow.api.types.func.UntilPredicate; import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; import io.serverlessworkflow.fluent.func.spi.FuncTransformations; import io.serverlessworkflow.fluent.spec.ListenTaskBuilder; +import java.util.function.Predicate; public class FuncListenTaskBuilder extends ListenTaskBuilder implements ConditionalTaskBuilder, FuncTransformations { + private UntilPredicate untilPredicate; + FuncListenTaskBuilder() { super(); } + + public FuncListenTaskBuilder until(Predicate predicate, Class predClass) { + untilPredicate = new UntilPredicate().withPredicate(predicate, predClass); + return this; + } + + @Override + public ListenTask build() { + ListenTask task = super.build(); + AnyEventConsumptionStrategy anyEvent = + task.getListen().getTo().getAnyEventConsumptionStrategy(); + if (untilPredicate != null && anyEvent != null) { + anyEvent.withUntil(untilPredicate); + } + return task; + } }