diff --git a/experimental/agentic/pom.xml b/experimental/agentic/pom.xml index dec8a5e5..2d648084 100644 --- a/experimental/agentic/pom.xml +++ b/experimental/agentic/pom.xml @@ -7,7 +7,7 @@ 8.0.0-SNAPSHOT serverlessworkflow-experimental-agentic - ServelessWorkflow:: Experimental:: Agentic + Serveless Workflow :: Experimental :: Agentic io.serverlessworkflow diff --git a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModel.java b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModel.java index 3d352812..ef1f0a55 100644 --- a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModel.java +++ b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModel.java @@ -24,31 +24,33 @@ class AgenticModel extends JavaModel { - AgenticModel(AgenticScope agenticScope) { - super(agenticScope); + private final AgenticScope agenticScope; + + AgenticModel(AgenticScope agenticScope, Object object) { + super(object); + this.agenticScope = agenticScope; } - @Override - public void setObject(Object obj) { - super.setObject(obj); + public AgenticScope getAgenticScope() { + return agenticScope; } @Override public Collection asCollection() { - throw new UnsupportedOperationException("Not supported yet."); + throw new UnsupportedOperationException("asCollection() is not supported yet."); } @Override public Optional> asMap() { - return Optional.of(((AgenticScope) object).state()); + return Optional.of(this.agenticScope.state()); } @Override public Optional as(Class clazz) { if (AgenticScope.class.isAssignableFrom(clazz)) { - return Optional.of(clazz.cast(object)); + return Optional.of(clazz.cast(this.agenticScope)); } else if (Map.class.isAssignableFrom(clazz)) { - return Optional.of(clazz.cast(((AgenticScope) object).state())); + return asMap().map(clazz::cast); } else { return super.as(clazz); } diff --git a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelCollection.java b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelCollection.java index 2ea0e382..4b23e690 100644 --- a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelCollection.java +++ b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelCollection.java @@ -15,29 +15,54 @@ */ package io.serverlessworkflow.impl.expressions.agentic; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import dev.langchain4j.agentic.scope.AgenticScope; import dev.langchain4j.agentic.scope.ResultWithAgenticScope; +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.expressions.func.JavaModelCollection; -import java.util.Collection; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; import java.util.Optional; -class AgenticModelCollection extends JavaModelCollection { +public class AgenticModelCollection extends JavaModelCollection { private final AgenticScope agenticScope; - - AgenticModelCollection(Collection object, AgenticScope agenticScope) { - super(object); - this.agenticScope = agenticScope; - } + private final ObjectMapper mapper = new ObjectMapper(); AgenticModelCollection(AgenticScope agenticScope) { + super(Collections.emptyList()); this.agenticScope = agenticScope; } @Override - protected WorkflowModel nextItem(Object obj) { - return new AgenticModel((AgenticScope) obj); + public boolean add(WorkflowModel e) { + Optional> asMap = e.asMap(); + if (asMap.isPresent()) { + this.agenticScope.writeStates(asMap.get()); + return super.add(e); + } + + // Update the agenticScope with the event body, so agents can use the event data as input + Object javaObj = e.asJavaObject(); + try { + if (javaObj instanceof CloudEvent ce && ce.getData() != null) { + agenticScope.writeStates( + mapper.readValue(ce.getData().toBytes(), new TypeReference<>() {})); + } else if (javaObj instanceof CloudEventData ced) { + agenticScope.writeStates(mapper.readValue(ced.toBytes(), new TypeReference<>() {})); + } else { + agenticScope.writeState(AgenticModelFactory.DEFAULT_AGENTIC_SCOPE_STATE_KEY, javaObj); + } + } catch (IOException ex) { + throw new IllegalArgumentException("Unable to parse CloudEvent data as JSON", ex); + } + + // add to the collection + return super.add(e); } @Override diff --git a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java index 9f95c0d5..67391ae8 100644 --- a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java +++ b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java @@ -22,25 +22,43 @@ import io.serverlessworkflow.impl.WorkflowModelCollection; import io.serverlessworkflow.impl.WorkflowModelFactory; import io.serverlessworkflow.impl.expressions.agentic.langchain4j.AgenticScopeRegistryAssessor; -import io.serverlessworkflow.impl.expressions.func.JavaModel; import java.time.OffsetDateTime; import java.util.Map; class AgenticModelFactory implements WorkflowModelFactory { - /** - * Applies any change to the model after running as task. We will always set it to a @AgenticScope - * object since @AgentExecutor is always adding the output to the agenticScope. We just have to - * make sure that agenticScope is always passed to the next input task. - * - * @param prev the global AgenticScope object getting updated by the workflow context - * @param obj the same AgenticScope object updated by the AgentExecutor - * @return the workflow context model holding the agenticScope object. - */ + static final String DEFAULT_AGENTIC_SCOPE_STATE_KEY = "input"; + private final AgenticScopeRegistryAssessor scopeRegistryAssessor = + new AgenticScopeRegistryAssessor(); + + private void updateAgenticScope(Object value) { + this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value); + } + + private void updateAgenticScope(Map state) { + this.scopeRegistryAssessor.getAgenticScope().writeStates(state); + } + + private AgenticModel asAgenticModel(Object value) { + return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), value); + } + @Override + @SuppressWarnings("unchecked") public WorkflowModel fromAny(WorkflowModel prev, Object obj) { - // We ignore `obj` since it's already included in `prev` within the agenticScope instance - return prev; + // TODO: we shouldn't update the state if the previous task was an agent call since under the + // hood, the agent already updated it. + if (prev instanceof AgenticModel agenticModel) { + this.scopeRegistryAssessor.setAgenticScope(agenticModel.getAgenticScope()); + } + + if (obj instanceof Map) { + this.updateAgenticScope((Map) obj); + } else { + this.updateAgenticScope(obj); + } + + return asAgenticModel(obj); } @Override @@ -53,58 +71,60 @@ public WorkflowModel combine(Map workflowVariables) { @Override public WorkflowModelCollection createCollection() { - throw new UnsupportedOperationException(); + return new AgenticModelCollection(this.scopeRegistryAssessor.getAgenticScope()); } - // TODO: all these methods can use agenticScope as long as we have access to the `outputName` - @Override public WorkflowModel from(boolean value) { - return new JavaModel(value); + this.updateAgenticScope(value); + return asAgenticModel(value); } @Override public WorkflowModel from(Number value) { - return new JavaModel(value); + this.updateAgenticScope(value); + return asAgenticModel(value); } @Override public WorkflowModel from(String value) { - return new JavaModel(value); + this.updateAgenticScope(value); + return asAgenticModel(value); } @Override public WorkflowModel from(CloudEvent ce) { - return new JavaModel(ce); + return asAgenticModel(ce); } @Override public WorkflowModel from(CloudEventData ce) { - return new JavaModel(ce); + return asAgenticModel(ce); } @Override public WorkflowModel from(OffsetDateTime value) { - return new JavaModel(value); + this.updateAgenticScope(value); + return asAgenticModel(value); } @Override public WorkflowModel from(Map map) { - final AgenticScope agenticScope = new AgenticScopeRegistryAssessor().getAgenticScope(); - agenticScope.writeStates(map); - return new AgenticModel(agenticScope); + this.updateAgenticScope(map); + return asAgenticModel(map); } @Override public WorkflowModel fromNull() { - return new JavaModel(null); + return asAgenticModel(null); } @Override public WorkflowModel fromOther(Object value) { - if (value instanceof AgenticScope) { - return new AgenticModel((AgenticScope) value); + if (value instanceof AgenticScope scope) { + return new AgenticModel(scope, scope.state()); } - return new JavaModel(value); + this.updateAgenticScope(value); + return asAgenticModel(value); } } diff --git a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/langchain4j/AgenticScopeRegistryAssessor.java b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/langchain4j/AgenticScopeRegistryAssessor.java index 01ccd1cd..390a3a42 100644 --- a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/langchain4j/AgenticScopeRegistryAssessor.java +++ b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/langchain4j/AgenticScopeRegistryAssessor.java @@ -16,6 +16,7 @@ package io.serverlessworkflow.impl.expressions.agentic.langchain4j; import dev.langchain4j.agentic.internal.AgenticScopeOwner; +import dev.langchain4j.agentic.scope.AgenticScope; import dev.langchain4j.agentic.scope.AgenticScopeRegistry; import dev.langchain4j.agentic.scope.DefaultAgenticScope; import java.util.Objects; @@ -27,7 +28,7 @@ public class AgenticScopeRegistryAssessor implements AgenticScopeOwner { private final AtomicReference agenticScopeRegistry = new AtomicReference<>(); private final String agentId; - private DefaultAgenticScope agenticScope; + private AgenticScope agenticScope; private Object memoryId; public AgenticScopeRegistryAssessor(String agentId) { @@ -44,7 +45,7 @@ public void setMemoryId(Object memoryId) { this.memoryId = memoryId; } - public DefaultAgenticScope getAgenticScope() { + public AgenticScope getAgenticScope() { if (agenticScope != null) { return agenticScope; } @@ -57,9 +58,13 @@ public DefaultAgenticScope getAgenticScope() { return this.agenticScope; } + public void setAgenticScope(AgenticScope agenticScope) { + this.agenticScope = Objects.requireNonNull(agenticScope, "AgenticScope cannot be null"); + } + @Override public AgenticScopeOwner withAgenticScope(DefaultAgenticScope agenticScope) { - this.agenticScope = agenticScope; + this.setAgenticScope(agenticScope); return this; } diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/expressions/func/JavaModelCollection.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/expressions/func/JavaModelCollection.java index f4ac21a7..2f84411a 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/expressions/func/JavaModelCollection.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/expressions/func/JavaModelCollection.java @@ -46,7 +46,7 @@ public boolean isEmpty() { @Override public boolean contains(Object o) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("contains() is not supported yet"); } private class ModelIterator implements Iterator { @@ -80,12 +80,12 @@ public Iterator iterator() { @Override public Object[] toArray() { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("toArray is not supported yet"); } @Override public T[] toArray(T[] a) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("toArray is not supported yet"); } @Override @@ -100,7 +100,7 @@ public boolean remove(Object o) { @Override public boolean containsAll(Collection c) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("containsAll is not supported yet"); } @Override @@ -119,7 +119,7 @@ public boolean removeAll(Collection c) { @Override public boolean retainAll(Collection c) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("retainAll() is not supported yet"); } @Override diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/expressions/func/JavaModelFactory.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/expressions/func/JavaModelFactory.java index c314bea7..4502abf1 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/expressions/func/JavaModelFactory.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/expressions/func/JavaModelFactory.java @@ -23,7 +23,7 @@ import java.time.OffsetDateTime; import java.util.Map; -class JavaModelFactory implements WorkflowModelFactory { +public class JavaModelFactory implements WorkflowModelFactory { private final JavaModel TrueModel = new JavaModel(Boolean.TRUE); private final JavaModel FalseModel = new JavaModel(Boolean.FALSE); private final JavaModel NullModel = new JavaModel(null); diff --git a/fluent/agentic-langchain4j/src/main/java/io/serverlessworkflow/fluent/agentic/langchain4j/WorkflowInvocationHandler.java b/fluent/agentic-langchain4j/src/main/java/io/serverlessworkflow/fluent/agentic/langchain4j/WorkflowInvocationHandler.java index 0f7ce656..5ffbfb3a 100644 --- a/fluent/agentic-langchain4j/src/main/java/io/serverlessworkflow/fluent/agentic/langchain4j/WorkflowInvocationHandler.java +++ b/fluent/agentic-langchain4j/src/main/java/io/serverlessworkflow/fluent/agentic/langchain4j/WorkflowInvocationHandler.java @@ -115,20 +115,20 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl } // invoke - return executeWorkflow(currentCognisphere(method, args), method, args); + return executeWorkflow(currentAgenticScope(method, args), method, args); } - private Object executeWorkflow(DefaultAgenticScope agenticScope, Method method, Object[] args) { + private Object executeWorkflow(AgenticScope agenticScope, Method method, Object[] args) { writeAgenticScopeState(agenticScope, method, args); try (WorkflowApplication app = workflowApplicationBuilder.build()) { // TODO improve result handling - DefaultAgenticScope output = + AgenticScope output = app.workflowDefinition(workflow) .instance(agenticScope) .start() .get() - .as(DefaultAgenticScope.class) + .as(AgenticScope.class) .orElseThrow( () -> new IllegalArgumentException( @@ -149,7 +149,7 @@ private Object executeWorkflow(DefaultAgenticScope agenticScope, Method method, } } - private DefaultAgenticScope currentCognisphere(Method method, Object[] args) { + private AgenticScope currentAgenticScope(Method method, Object[] args) { Object memoryId = memoryId(method, args); this.agenticScopeRegistryAssessor.setMemoryId(memoryId); return this.agenticScopeRegistryAssessor.getAgenticScope(); diff --git a/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentDoTaskBuilder.java b/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentDoTaskBuilder.java index 526deac0..5dddd851 100644 --- a/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentDoTaskBuilder.java +++ b/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentDoTaskBuilder.java @@ -20,6 +20,7 @@ import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder; import io.serverlessworkflow.fluent.func.FuncForTaskBuilder; import io.serverlessworkflow.fluent.func.FuncForkTaskBuilder; +import io.serverlessworkflow.fluent.func.FuncListenTaskBuilder; import io.serverlessworkflow.fluent.func.FuncSetTaskBuilder; import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder; import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; @@ -81,6 +82,12 @@ public AgentDoTaskBuilder emit(String name, Consumer itemsC return self(); } + @Override + public AgentDoTaskBuilder listen(String name, Consumer itemsConfigurer) { + this.listBuilder().listen(name, itemsConfigurer); + return self(); + } + @Override public AgentDoTaskBuilder forEach(String name, Consumer itemsConfigurer) { this.listBuilder().forEach(name, itemsConfigurer); diff --git a/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentTaskItemListBuilder.java b/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentTaskItemListBuilder.java index a26f0b45..0b164a0d 100644 --- a/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentTaskItemListBuilder.java +++ b/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentTaskItemListBuilder.java @@ -24,6 +24,7 @@ import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder; import io.serverlessworkflow.fluent.func.FuncForTaskBuilder; import io.serverlessworkflow.fluent.func.FuncForkTaskBuilder; +import io.serverlessworkflow.fluent.func.FuncListenTaskBuilder; import io.serverlessworkflow.fluent.func.FuncSetTaskBuilder; import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder; import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder; @@ -113,6 +114,13 @@ public AgentTaskItemListBuilder emit(String name, Consumer return self(); } + @Override + public AgentTaskItemListBuilder listen( + String name, Consumer itemsConfigurer) { + this.delegate.listen(name, itemsConfigurer); + return self(); + } + @Override public AgentTaskItemListBuilder forEach( String name, Consumer itemsConfigurer) { diff --git a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Agents.java b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Agents.java index a0c970ab..c1c17020 100644 --- a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Agents.java +++ b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Agents.java @@ -17,12 +17,22 @@ import dev.langchain4j.agentic.Agent; import dev.langchain4j.agentic.internal.AgentSpecification; +import dev.langchain4j.service.MemoryId; import dev.langchain4j.service.UserMessage; import dev.langchain4j.service.V; import java.util.List; public interface Agents { + interface ChatBot { + @UserMessage( + """ + You are a happy chat bot. + """) + @Agent + String chat(@MemoryId String memoryId, @V("message") String message); + } + interface MovieExpert { @UserMessage( diff --git a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java new file mode 100644 index 00000000..44884266 --- /dev/null +++ b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java @@ -0,0 +1,208 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.agentic; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.spy; + +import dev.langchain4j.agentic.AgenticServices; +import dev.langchain4j.agentic.scope.AgenticScope; +import dev.langchain4j.memory.chat.MessageWindowChatMemory; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.v1.CloudEventBuilder; +import io.serverlessworkflow.api.types.EventFilter; +import io.serverlessworkflow.api.types.EventProperties; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowStatus; +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +public class ChatBotIT { + + @Test + @SuppressWarnings("unchecked") + @Disabled("Figuring out event processing") + void chat_bot() { + Agents.ChatBot chatBot = + spy( + AgenticServices.agentBuilder(Agents.ChatBot.class) + .chatModel(Models.BASE_MODEL) + .chatMemoryProvider(memoryId -> MessageWindowChatMemory.withMaxMessages(10)) + .outputName("message") + .build()); + Collection publishedEvents = new ArrayList<>(); + + // 1. listen to an event containing `message` key in the body + // 2. if contains, call the agent, if not end the workflow + // 3. After replying to the chat, return + final Workflow listenWorkflow = + AgentWorkflowBuilder.workflow("chat-bot") + .tasks( + t -> + t.listen( + "listenToMessages", + l -> + l.until(message -> "".equals(message.get("message")), Map.class) + .one( + c -> + c.with( + event -> event.type("org.acme.chatbot.request")))) + .agent(chatBot) + .emit(emit -> emit.event(e -> e.type("org.acme.chatbot.reply")))) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + app.eventConsumer() + .register( + app.eventConsumer() + .listen( + new EventFilter() + .withWith(new EventProperties().withType("org.acme.chatbot.reply")), + app), + ce -> publishedEvents.add((CloudEvent) ce)); + + final WorkflowInstance waitingInstance = + app.workflowDefinition(listenWorkflow).instance(null); + final CompletableFuture runningModel = waitingInstance.start(); + + // The workflow is just waiting for the event + assertEquals(WorkflowStatus.WAITING, waitingInstance.status()); + + // Publish the event + app.eventPublisher().publish(newMessageEvent("Hello World!")); + + AgenticScope scope = runningModel.get().as(AgenticScope.class).orElseThrow(); + assertNotNull(scope.readState("message")); + assertFalse(scope.readState("message").toString().isEmpty()); + assertEquals(1, publishedEvents.size()); + + // We ingested the event, and we keep waiting for the next + // assertEquals(WorkflowStatus.WAITING, waitingInstance.status()); + + // Publish the event with an empty message to wrap up + app.eventPublisher().publish(newMessageEvent("")); + + scope = runningModel.join().as(AgenticScope.class).orElseThrow(); + assertNotNull(scope.readState("message")); + assertTrue(scope.readState("message").toString().isEmpty()); + assertEquals(2, publishedEvents.size()); + + // Workflow should be done + assertEquals(WorkflowStatus.COMPLETED, waitingInstance.status()); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + /** + * In this test we validate a workflow mixed with agents and regular Java calls + * + *

+ * + *

    + *
  1. The first function prints the message input and converts the data into a Map for the + * agent ingestion + *
  2. Internally, our factories will add the output to a new AgenticScope since under the hood, + * we are call `as(AgenticScope)` + *
  3. The agent is then called with a scope with a state as `message="input"` + *
  4. The agent updates the state automatically in the AgenticScope and returns the message as + * a string, this string is then served to the next task + *
  5. The next task process the agent response and returns it ending the workflow. Meanwhile, + * the AgenticScope is always updated with the latest result from the given task. + *
+ */ + @Test + void mixed_workflow() { + Agents.ChatBot chatBot = + spy( + AgenticServices.agentBuilder(Agents.ChatBot.class) + .chatModel(Models.BASE_MODEL) + .chatMemoryProvider(memoryId -> MessageWindowChatMemory.withMaxMessages(10)) + .outputName("message") + .build()); + + final Workflow mixedWorkflow = + AgentWorkflowBuilder.workflow("chat-bot") + .tasks( + t -> + t.callFn( + callJ -> + callJ.function( + input -> { + System.out.println(input); + return Map.of("message", input); + }, + String.class)) + .agent(chatBot) + .callFn( + callJ -> + callJ.function( + input -> { + System.out.println(input); + // Here, we are return a simple string so the internal + // AgenticScope will add it to the default `input` key + // If we want to really manipulate it, we could return a + // Map<>(message, input) + return "I've changed the input [" + input + "]"; + }, + String.class))) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + WorkflowModel model = + app.workflowDefinition(mixedWorkflow).instance("Hello World!").start().join(); + + Optional resultAsString = model.as(String.class); + + assertTrue(resultAsString.isPresent()); + assertFalse(resultAsString.get().isEmpty()); + assertTrue(resultAsString.get().contains("changed the input")); + + Optional resultAsScope = model.as(AgenticScope.class); + + assertTrue(resultAsScope.isPresent()); + assertFalse(resultAsScope.get().readState("input").toString().isEmpty()); + assertTrue(resultAsScope.get().readState("input").toString().contains("changed the input")); + } + } + + private CloudEvent newMessageEvent(String message) { + return new CloudEventBuilder() + .withData(String.format("{\"message\": \"%s\"}", message).getBytes()) + .withType("org.acme.chatbot.request") + .withId(UUID.randomUUID().toString()) + .withDataContentType("application/json") + .withSource(URI.create("test://localhost")) + .withSubject("A chatbot message") + .withTime(OffsetDateTime.now()) + .build(); + } +} diff --git a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Models.java b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Models.java index e06aafda..170281c1 100644 --- a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Models.java +++ b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Models.java @@ -22,7 +22,7 @@ public class Models { static final ChatModel BASE_MODEL = OllamaChatModel.builder() - .baseUrl("http://127.0.0.1:1143") + .baseUrl("http://127.0.0.1:11434") .modelName("qwen2.5:7b") .timeout(Duration.ofMinutes(10)) .temperature(0.0) diff --git a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/WorkflowTests.java b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/WorkflowTests.java index acf5c411..d38e863c 100644 --- a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/WorkflowTests.java +++ b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/WorkflowTests.java @@ -23,7 +23,7 @@ import static org.mockito.Mockito.when; import dev.langchain4j.agentic.AgenticServices; -import dev.langchain4j.agentic.scope.DefaultAgenticScope; +import dev.langchain4j.agentic.scope.AgenticScope; import dev.langchain4j.agentic.workflow.HumanInTheLoop; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.WorkflowApplication; @@ -53,12 +53,12 @@ public void testAgent() throws ExecutionException, InterruptedException { topic.put("title", "A Great Story"); try (WorkflowApplication app = WorkflowApplication.builder().build()) { - DefaultAgenticScope result = + AgenticScope result = app.workflowDefinition(workflow) .instance(topic) .start() .get() - .as(DefaultAgenticScope.class) + .as(AgenticScope.class) .orElseThrow(); assertEquals("storySeedAgent", result.readState("premise")); @@ -93,12 +93,12 @@ public void testAgents() throws ExecutionException, InterruptedException { topic.put("title", "A Great Story"); try (WorkflowApplication app = WorkflowApplication.builder().build()) { - DefaultAgenticScope result = + AgenticScope result = app.workflowDefinition(workflow) .instance(topic) .start() .get() - .as(DefaultAgenticScope.class) + .as(AgenticScope.class) .orElseThrow(); assertEquals("sceneAgent", result.readState("story")); @@ -129,12 +129,12 @@ public void testSequence() throws ExecutionException, InterruptedException { topic.put("title", "A Great Story"); try (WorkflowApplication app = WorkflowApplication.builder().build()) { - DefaultAgenticScope result = + AgenticScope result = app.workflowDefinition(workflow) .instance(topic) .start() .get() - .as(DefaultAgenticScope.class) + .as(AgenticScope.class) .orElseThrow(); assertEquals("sceneAgent", result.readState("story")); @@ -166,12 +166,12 @@ public void testParallel() throws ExecutionException, InterruptedException { topic.put("style", "sci-fi"); try (WorkflowApplication app = WorkflowApplication.builder().build()) { - DefaultAgenticScope result = + AgenticScope result = app.workflowDefinition(workflow) .instance(topic) .start() .get() - .as(DefaultAgenticScope.class) + .as(AgenticScope.class) .orElseThrow(); assertEquals("Fake conflict response", result.readState("setting")); @@ -212,12 +212,12 @@ public void testSeqAndThenParallel() throws ExecutionException, InterruptedExcep topic.put("fact", "alien"); try (WorkflowApplication app = WorkflowApplication.builder().build()) { - DefaultAgenticScope result = + AgenticScope result = app.workflowDefinition(workflow) .instance(topic) .start() .get() - .as(DefaultAgenticScope.class) + .as(AgenticScope.class) .orElseThrow(); assertEquals(cultureTraits, result.readState("culture")); @@ -274,12 +274,12 @@ public void humanInTheLoop() throws ExecutionException, InterruptedException { initialValues.put("agenda", "Discuss project updates"); try (WorkflowApplication app = WorkflowApplication.builder().build()) { - DefaultAgenticScope result = + AgenticScope result = app.workflowDefinition(workflow) .instance(initialValues) .start() .get() - .as(DefaultAgenticScope.class) + .as(AgenticScope.class) .orElseThrow(); assertEquals("Styled meeting invitation for John Doe", result.readState("styled")); diff --git a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskBuilder.java b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskBuilder.java index 723e8d23..613f76a2 100644 --- a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskBuilder.java +++ b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskBuilder.java @@ -41,6 +41,12 @@ public FuncDoTaskBuilder emit(String name, Consumer itemsCo return this; } + @Override + public FuncDoTaskBuilder listen(String name, Consumer itemsConfigurer) { + this.listBuilder().listen(name, itemsConfigurer); + return this; + } + @Override public FuncDoTaskBuilder forEach(String name, Consumer itemsConfigurer) { this.listBuilder().forEach(name, itemsConfigurer); diff --git a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitTaskBuilder.java b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitTaskBuilder.java index 89325ee8..28c8b8a4 100644 --- a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitTaskBuilder.java +++ b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitTaskBuilder.java @@ -16,10 +16,12 @@ package io.serverlessworkflow.fluent.func; import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; +import io.serverlessworkflow.fluent.func.spi.FuncTransformations; import io.serverlessworkflow.fluent.spec.EmitTaskBuilder; public class FuncEmitTaskBuilder extends EmitTaskBuilder - implements ConditionalTaskBuilder { + implements ConditionalTaskBuilder, + FuncTransformations { FuncEmitTaskBuilder() { super(); } diff --git a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenTaskBuilder.java b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenTaskBuilder.java new file mode 100644 index 00000000..bcdd82dd --- /dev/null +++ b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenTaskBuilder.java @@ -0,0 +1,51 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func; + +import io.serverlessworkflow.api.types.AnyEventConsumptionStrategy; +import io.serverlessworkflow.api.types.ListenTask; +import io.serverlessworkflow.api.types.func.UntilPredicate; +import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; +import io.serverlessworkflow.fluent.func.spi.FuncTransformations; +import io.serverlessworkflow.fluent.spec.ListenTaskBuilder; +import java.util.function.Predicate; + +public class FuncListenTaskBuilder extends ListenTaskBuilder + implements ConditionalTaskBuilder, + FuncTransformations { + + private UntilPredicate untilPredicate; + + FuncListenTaskBuilder() { + super(); + } + + public FuncListenTaskBuilder until(Predicate predicate, Class predClass) { + untilPredicate = new UntilPredicate().withPredicate(predicate, predClass); + return this; + } + + @Override + public ListenTask build() { + ListenTask task = super.build(); + AnyEventConsumptionStrategy anyEvent = + task.getListen().getTo().getAnyEventConsumptionStrategy(); + if (untilPredicate != null && anyEvent != null) { + anyEvent.withUntil(untilPredicate); + } + return task; + } +} diff --git a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java index 2c8b5524..6ef8d7b0 100644 --- a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java +++ b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java @@ -79,6 +79,16 @@ public FuncTaskItemListBuilder emit(String name, Consumer i new TaskItem(name, new Task().withEmitTask(emitTaskJavaBuilder.build()))); } + @Override + public FuncTaskItemListBuilder listen( + String name, Consumer itemsConfigurer) { + this.requireNameAndConfig(name, itemsConfigurer); + final FuncListenTaskBuilder listenTaskJavaBuilder = new FuncListenTaskBuilder(); + itemsConfigurer.accept(listenTaskJavaBuilder); + return this.addTaskItem( + new TaskItem(name, new Task().withListenTask(listenTaskJavaBuilder.build()))); + } + @Override public FuncTaskItemListBuilder forEach( String name, Consumer itemsConfigurer) { diff --git a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncDoFluent.java b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncDoFluent.java index 434304d5..b452f5df 100644 --- a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncDoFluent.java +++ b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncDoFluent.java @@ -19,11 +19,13 @@ import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder; import io.serverlessworkflow.fluent.func.FuncForTaskBuilder; import io.serverlessworkflow.fluent.func.FuncForkTaskBuilder; +import io.serverlessworkflow.fluent.func.FuncListenTaskBuilder; import io.serverlessworkflow.fluent.func.FuncSetTaskBuilder; import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder; import io.serverlessworkflow.fluent.spec.spi.EmitFluent; import io.serverlessworkflow.fluent.spec.spi.ForEachFluent; import io.serverlessworkflow.fluent.spec.spi.ForkFluent; +import io.serverlessworkflow.fluent.spec.spi.ListenFluent; import io.serverlessworkflow.fluent.spec.spi.SetFluent; import io.serverlessworkflow.fluent.spec.spi.SwitchFluent; import java.util.UUID; @@ -36,7 +38,8 @@ public interface FuncDoFluent> EmitFluent, ForEachFluent, SwitchFluent, - ForkFluent { + ForkFluent, + ListenFluent { SELF callFn(String name, Consumer cfg); diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskBaseBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskBaseBuilder.java index 3ce5c203..817f7828 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskBaseBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskBaseBuilder.java @@ -80,6 +80,11 @@ public T then(FlowDirectiveEnum then) { return self(); } + public T then(String taskName) { + this.task.setThen(new FlowDirective().withString(taskName)); + return self(); + } + public T exportAs(Object exportAs) { this.task.setExport(new ExportBuilder().as(exportAs).build()); return self(); diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/ListenFluent.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/ListenFluent.java index c3d32e14..ec950456 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/ListenFluent.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/ListenFluent.java @@ -19,7 +19,7 @@ import java.util.UUID; import java.util.function.Consumer; -public interface ListenFluent, LIST> { +public interface ListenFluent, LIST> { LIST listen(String name, Consumer itemsConfigurer); diff --git a/impl/jackson/src/main/java/io/serverlessworkflow/impl/expressions/jq/JacksonModelCollection.java b/impl/jackson/src/main/java/io/serverlessworkflow/impl/expressions/jq/JacksonModelCollection.java index b5420419..456db165 100644 --- a/impl/jackson/src/main/java/io/serverlessworkflow/impl/expressions/jq/JacksonModelCollection.java +++ b/impl/jackson/src/main/java/io/serverlessworkflow/impl/expressions/jq/JacksonModelCollection.java @@ -55,7 +55,7 @@ public boolean isEmpty() { @Override public boolean contains(Object o) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("contains() is not supported yet"); } @Override @@ -85,12 +85,12 @@ public WorkflowModel next() { @Override public Object[] toArray() { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("toArray() is not supported yet"); } @Override public T[] toArray(T[] a) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("toArray() is not supported yet"); } @Override @@ -109,7 +109,7 @@ public boolean remove(Object o) { @Override public boolean containsAll(Collection c) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("containsAll() is not supported yet"); } @Override @@ -127,7 +127,7 @@ public boolean removeAll(Collection c) { @Override public boolean retainAll(Collection c) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("retainAll() is not supported yet"); } @Override