Skip to content

New integration tests #703

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion experimental/agentic/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<version>8.0.0-SNAPSHOT</version>
</parent>
<artifactId>serverlessworkflow-experimental-agentic</artifactId>
<name>ServelessWorkflow:: Experimental:: Agentic</name>
<name>Serveless Workflow :: Experimental :: Agentic</name>
<dependencies>
<dependency>
<groupId>io.serverlessworkflow</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,33 @@

class AgenticModel extends JavaModel {

AgenticModel(AgenticScope agenticScope) {
super(agenticScope);
private final AgenticScope agenticScope;

AgenticModel(AgenticScope agenticScope, Object object) {
super(object);
this.agenticScope = agenticScope;
}

@Override
public void setObject(Object obj) {
super.setObject(obj);
public AgenticScope getAgenticScope() {
return agenticScope;
}

@Override
public Collection<WorkflowModel> asCollection() {
throw new UnsupportedOperationException("Not supported yet.");
throw new UnsupportedOperationException("asCollection() is not supported yet.");
}

@Override
public Optional<Map<String, Object>> asMap() {
return Optional.of(((AgenticScope) object).state());
return Optional.of(this.agenticScope.state());
}

@Override
public <T> Optional<T> as(Class<T> clazz) {
if (AgenticScope.class.isAssignableFrom(clazz)) {
return Optional.of(clazz.cast(object));
return Optional.of(clazz.cast(this.agenticScope));
} else if (Map.class.isAssignableFrom(clazz)) {
return Optional.of(clazz.cast(((AgenticScope) object).state()));
return asMap().map(clazz::cast);
} else {
return super.as(clazz);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,54 @@
*/
package io.serverlessworkflow.impl.expressions.agentic;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import dev.langchain4j.agentic.scope.AgenticScope;
import dev.langchain4j.agentic.scope.ResultWithAgenticScope;
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.expressions.func.JavaModelCollection;
import java.util.Collection;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;

class AgenticModelCollection extends JavaModelCollection {
public class AgenticModelCollection extends JavaModelCollection {

private final AgenticScope agenticScope;

AgenticModelCollection(Collection<?> object, AgenticScope agenticScope) {
super(object);
this.agenticScope = agenticScope;
}
private final ObjectMapper mapper = new ObjectMapper();

AgenticModelCollection(AgenticScope agenticScope) {
super(Collections.emptyList());
this.agenticScope = agenticScope;
}

@Override
protected WorkflowModel nextItem(Object obj) {
return new AgenticModel((AgenticScope) obj);
public boolean add(WorkflowModel e) {
Optional<Map<String, Object>> asMap = e.asMap();
if (asMap.isPresent()) {
this.agenticScope.writeStates(asMap.get());
return super.add(e);
}

// Update the agenticScope with the event body, so agents can use the event data as input
Object javaObj = e.asJavaObject();
try {
if (javaObj instanceof CloudEvent ce && ce.getData() != null) {
agenticScope.writeStates(
mapper.readValue(ce.getData().toBytes(), new TypeReference<>() {}));
} else if (javaObj instanceof CloudEventData ced) {
agenticScope.writeStates(mapper.readValue(ced.toBytes(), new TypeReference<>() {}));
} else {
agenticScope.writeState(AgenticModelFactory.DEFAULT_AGENTIC_SCOPE_STATE_KEY, javaObj);
}
} catch (IOException ex) {
throw new IllegalArgumentException("Unable to parse CloudEvent data as JSON", ex);
}

// add to the collection
return super.add(e);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,43 @@
import io.serverlessworkflow.impl.WorkflowModelCollection;
import io.serverlessworkflow.impl.WorkflowModelFactory;
import io.serverlessworkflow.impl.expressions.agentic.langchain4j.AgenticScopeRegistryAssessor;
import io.serverlessworkflow.impl.expressions.func.JavaModel;
import java.time.OffsetDateTime;
import java.util.Map;

class AgenticModelFactory implements WorkflowModelFactory {

/**
* Applies any change to the model after running as task. We will always set it to a @AgenticScope
* object since @AgentExecutor is always adding the output to the agenticScope. We just have to
* make sure that agenticScope is always passed to the next input task.
*
* @param prev the global AgenticScope object getting updated by the workflow context
* @param obj the same AgenticScope object updated by the AgentExecutor
* @return the workflow context model holding the agenticScope object.
*/
static final String DEFAULT_AGENTIC_SCOPE_STATE_KEY = "input";
private final AgenticScopeRegistryAssessor scopeRegistryAssessor =
new AgenticScopeRegistryAssessor();

private void updateAgenticScope(Object value) {
this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value);
}

private void updateAgenticScope(Map<String, Object> state) {
this.scopeRegistryAssessor.getAgenticScope().writeStates(state);
}

private AgenticModel asAgenticModel(Object value) {
return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), value);
}

@Override
@SuppressWarnings("unchecked")
public WorkflowModel fromAny(WorkflowModel prev, Object obj) {
// We ignore `obj` since it's already included in `prev` within the agenticScope instance
return prev;
// TODO: we shouldn't update the state if the previous task was an agent call since under the
// hood, the agent already updated it.
if (prev instanceof AgenticModel agenticModel) {
this.scopeRegistryAssessor.setAgenticScope(agenticModel.getAgenticScope());
}

if (obj instanceof Map) {
this.updateAgenticScope((Map<String, Object>) obj);
} else {
this.updateAgenticScope(obj);
}

return asAgenticModel(obj);
}

@Override
Expand All @@ -53,58 +71,60 @@ public WorkflowModel combine(Map<String, WorkflowModel> workflowVariables) {

@Override
public WorkflowModelCollection createCollection() {
throw new UnsupportedOperationException();
return new AgenticModelCollection(this.scopeRegistryAssessor.getAgenticScope());
}

// TODO: all these methods can use agenticScope as long as we have access to the `outputName`

@Override
public WorkflowModel from(boolean value) {
return new JavaModel(value);
this.updateAgenticScope(value);
return asAgenticModel(value);
}

@Override
public WorkflowModel from(Number value) {
return new JavaModel(value);
this.updateAgenticScope(value);
return asAgenticModel(value);
}

@Override
public WorkflowModel from(String value) {
return new JavaModel(value);
this.updateAgenticScope(value);
return asAgenticModel(value);
}

@Override
public WorkflowModel from(CloudEvent ce) {
return new JavaModel(ce);
return asAgenticModel(ce);
}

@Override
public WorkflowModel from(CloudEventData ce) {
return new JavaModel(ce);
return asAgenticModel(ce);
}

@Override
public WorkflowModel from(OffsetDateTime value) {
return new JavaModel(value);
this.updateAgenticScope(value);
return asAgenticModel(value);
}

@Override
public WorkflowModel from(Map<String, Object> map) {
final AgenticScope agenticScope = new AgenticScopeRegistryAssessor().getAgenticScope();
agenticScope.writeStates(map);
return new AgenticModel(agenticScope);
this.updateAgenticScope(map);
return asAgenticModel(map);
}

@Override
public WorkflowModel fromNull() {
return new JavaModel(null);
return asAgenticModel(null);
}

@Override
public WorkflowModel fromOther(Object value) {
if (value instanceof AgenticScope) {
return new AgenticModel((AgenticScope) value);
if (value instanceof AgenticScope scope) {
return new AgenticModel(scope, scope.state());
}
return new JavaModel(value);
this.updateAgenticScope(value);
return asAgenticModel(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.serverlessworkflow.impl.expressions.agentic.langchain4j;

import dev.langchain4j.agentic.internal.AgenticScopeOwner;
import dev.langchain4j.agentic.scope.AgenticScope;
import dev.langchain4j.agentic.scope.AgenticScopeRegistry;
import dev.langchain4j.agentic.scope.DefaultAgenticScope;
import java.util.Objects;
Expand All @@ -27,7 +28,7 @@ public class AgenticScopeRegistryAssessor implements AgenticScopeOwner {
private final AtomicReference<AgenticScopeRegistry> agenticScopeRegistry =
new AtomicReference<>();
private final String agentId;
private DefaultAgenticScope agenticScope;
private AgenticScope agenticScope;
private Object memoryId;

public AgenticScopeRegistryAssessor(String agentId) {
Expand All @@ -44,7 +45,7 @@ public void setMemoryId(Object memoryId) {
this.memoryId = memoryId;
}

public DefaultAgenticScope getAgenticScope() {
public AgenticScope getAgenticScope() {
if (agenticScope != null) {
return agenticScope;
}
Expand All @@ -57,9 +58,13 @@ public DefaultAgenticScope getAgenticScope() {
return this.agenticScope;
}

public void setAgenticScope(AgenticScope agenticScope) {
this.agenticScope = Objects.requireNonNull(agenticScope, "AgenticScope cannot be null");
}

@Override
public AgenticScopeOwner withAgenticScope(DefaultAgenticScope agenticScope) {
this.agenticScope = agenticScope;
this.setAgenticScope(agenticScope);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public boolean isEmpty() {

@Override
public boolean contains(Object o) {
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("contains() is not supported yet");
}

private class ModelIterator implements Iterator<WorkflowModel> {
Expand Down Expand Up @@ -80,12 +80,12 @@ public Iterator<WorkflowModel> iterator() {

@Override
public Object[] toArray() {
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("toArray is not supported yet");
}

@Override
public <T> T[] toArray(T[] a) {
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("toArray is not supported yet");
}

@Override
Expand All @@ -100,7 +100,7 @@ public boolean remove(Object o) {

@Override
public boolean containsAll(Collection<?> c) {
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("containsAll is not supported yet");
}

@Override
Expand All @@ -119,7 +119,7 @@ public boolean removeAll(Collection<?> c) {

@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("retainAll() is not supported yet");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.time.OffsetDateTime;
import java.util.Map;

class JavaModelFactory implements WorkflowModelFactory {
public class JavaModelFactory implements WorkflowModelFactory {
private final JavaModel TrueModel = new JavaModel(Boolean.TRUE);
private final JavaModel FalseModel = new JavaModel(Boolean.FALSE);
private final JavaModel NullModel = new JavaModel(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,20 +115,20 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
}

// invoke
return executeWorkflow(currentCognisphere(method, args), method, args);
return executeWorkflow(currentAgenticScope(method, args), method, args);
}

private Object executeWorkflow(DefaultAgenticScope agenticScope, Method method, Object[] args) {
private Object executeWorkflow(AgenticScope agenticScope, Method method, Object[] args) {
writeAgenticScopeState(agenticScope, method, args);

try (WorkflowApplication app = workflowApplicationBuilder.build()) {
// TODO improve result handling
DefaultAgenticScope output =
AgenticScope output =
app.workflowDefinition(workflow)
.instance(agenticScope)
.start()
.get()
.as(DefaultAgenticScope.class)
.as(AgenticScope.class)
.orElseThrow(
() ->
new IllegalArgumentException(
Expand All @@ -149,7 +149,7 @@ private Object executeWorkflow(DefaultAgenticScope agenticScope, Method method,
}
}

private DefaultAgenticScope currentCognisphere(Method method, Object[] args) {
private AgenticScope currentAgenticScope(Method method, Object[] args) {
Object memoryId = memoryId(method, args);
this.agenticScopeRegistryAssessor.setMemoryId(memoryId);
return this.agenticScopeRegistryAssessor.getAgenticScope();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder;
import io.serverlessworkflow.fluent.func.FuncForTaskBuilder;
import io.serverlessworkflow.fluent.func.FuncForkTaskBuilder;
import io.serverlessworkflow.fluent.func.FuncListenTaskBuilder;
import io.serverlessworkflow.fluent.func.FuncSetTaskBuilder;
import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder;
import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder;
Expand Down Expand Up @@ -81,6 +82,12 @@ public AgentDoTaskBuilder emit(String name, Consumer<FuncEmitTaskBuilder> itemsC
return self();
}

@Override
public AgentDoTaskBuilder listen(String name, Consumer<FuncListenTaskBuilder> itemsConfigurer) {
this.listBuilder().listen(name, itemsConfigurer);
return self();
}

@Override
public AgentDoTaskBuilder forEach(String name, Consumer<FuncForTaskBuilder> itemsConfigurer) {
this.listBuilder().forEach(name, itemsConfigurer);
Expand Down
Loading