Skip to content

Commit 116af86

Browse files
committed
Refactor agentic to hold scope and current workflow context
Signed-off-by: Ricardo Zanini <[email protected]>
1 parent 564db09 commit 116af86

File tree

13 files changed

+301
-107
lines changed

13 files changed

+301
-107
lines changed

experimental/agentic/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<version>8.0.0-SNAPSHOT</version>
88
</parent>
99
<artifactId>serverlessworkflow-experimental-agentic</artifactId>
10-
<name>ServelessWorkflow:: Experimental:: Agentic</name>
10+
<name>Serveless Workflow :: Experimental :: Agentic</name>
1111
<dependencies>
1212
<dependency>
1313
<groupId>io.serverlessworkflow</groupId>

experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModel.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,31 +24,33 @@
2424

2525
class AgenticModel extends JavaModel {
2626

27-
AgenticModel(AgenticScope agenticScope) {
28-
super(agenticScope);
27+
private final AgenticScope agenticScope;
28+
29+
AgenticModel(AgenticScope agenticScope, Object object) {
30+
super(object);
31+
this.agenticScope = agenticScope;
2932
}
3033

31-
@Override
32-
public void setObject(Object obj) {
33-
super.setObject(obj);
34+
public AgenticScope getAgenticScope() {
35+
return agenticScope;
3436
}
3537

3638
@Override
3739
public Collection<WorkflowModel> asCollection() {
38-
throw new UnsupportedOperationException("Not supported yet.");
40+
throw new UnsupportedOperationException("asCollection() is not supported yet.");
3941
}
4042

4143
@Override
4244
public Optional<Map<String, Object>> asMap() {
43-
return Optional.of(((AgenticScope) object).state());
45+
return Optional.of(this.agenticScope.state());
4446
}
4547

4648
@Override
4749
public <T> Optional<T> as(Class<T> clazz) {
4850
if (AgenticScope.class.isAssignableFrom(clazz)) {
49-
return Optional.of(clazz.cast(object));
51+
return Optional.of(clazz.cast(this.agenticScope));
5052
} else if (Map.class.isAssignableFrom(clazz)) {
51-
return Optional.of(clazz.cast(((AgenticScope) object).state()));
53+
return asMap().map(clazz::cast);
5254
} else {
5355
return super.as(clazz);
5456
}

experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelCollection.java

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,29 +15,66 @@
1515
*/
1616
package io.serverlessworkflow.impl.expressions.agentic;
1717

18+
import com.fasterxml.jackson.core.JsonProcessingException;
19+
import com.fasterxml.jackson.core.type.TypeReference;
20+
import com.fasterxml.jackson.databind.ObjectMapper;
1821
import dev.langchain4j.agentic.scope.AgenticScope;
1922
import dev.langchain4j.agentic.scope.ResultWithAgenticScope;
23+
import io.cloudevents.CloudEvent;
24+
import io.cloudevents.CloudEventData;
2025
import io.serverlessworkflow.impl.WorkflowModel;
2126
import io.serverlessworkflow.impl.expressions.func.JavaModelCollection;
22-
import java.util.Collection;
27+
import java.io.IOException;
28+
import java.util.Collections;
29+
import java.util.Map;
30+
import java.util.Objects;
2331
import java.util.Optional;
2432

25-
class AgenticModelCollection extends JavaModelCollection {
33+
public class AgenticModelCollection extends JavaModelCollection {
2634

2735
private final AgenticScope agenticScope;
28-
29-
AgenticModelCollection(Collection<?> object, AgenticScope agenticScope) {
30-
super(object);
31-
this.agenticScope = agenticScope;
32-
}
36+
private final ObjectMapper mapper = new ObjectMapper();
3337

3438
AgenticModelCollection(AgenticScope agenticScope) {
39+
super(Collections.emptyList());
3540
this.agenticScope = agenticScope;
3641
}
3742

3843
@Override
39-
protected WorkflowModel nextItem(Object obj) {
40-
return new AgenticModel((AgenticScope) obj);
44+
public boolean add(WorkflowModel e) {
45+
Optional<Map<String, Object>> asMap = e.asMap();
46+
if (asMap.isPresent()) {
47+
this.agenticScope.writeStates(asMap.get());
48+
} else {
49+
// Update the agenticScope with the event body, so agents can use the event data as input
50+
Object javaObj = e.asJavaObject();
51+
if (javaObj instanceof CloudEvent) {
52+
try {
53+
this.agenticScope.writeStates(
54+
mapper.readValue(
55+
Objects.requireNonNull(((CloudEvent) javaObj).getData()).toString(),
56+
new TypeReference<>() {}));
57+
} catch (JsonProcessingException ex) {
58+
throw new IllegalArgumentException(
59+
"Unable to parse CloudEvent, data must be a valid JSON", ex);
60+
}
61+
} else if (javaObj instanceof CloudEventData) {
62+
try {
63+
this.agenticScope.writeStates(
64+
mapper.readValue(
65+
Objects.requireNonNull(((CloudEventData) javaObj)).toBytes(),
66+
new TypeReference<>() {}));
67+
} catch (IOException ex) {
68+
throw new IllegalArgumentException(
69+
"Unable to parse CloudEventData, data must be a valid JSON", ex);
70+
}
71+
} else {
72+
this.agenticScope.writeState(AgenticModelFactory.DEFAULT_AGENTIC_SCOPE_STATE_KEY, javaObj);
73+
}
74+
}
75+
76+
// add to the collection
77+
return super.add(e);
4178
}
4279

4380
@Override

experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java

Lines changed: 33 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -22,38 +22,31 @@
2222
import io.serverlessworkflow.impl.WorkflowModelCollection;
2323
import io.serverlessworkflow.impl.WorkflowModelFactory;
2424
import io.serverlessworkflow.impl.expressions.agentic.langchain4j.AgenticScopeRegistryAssessor;
25-
import io.serverlessworkflow.impl.expressions.func.JavaModel;
2625
import java.time.OffsetDateTime;
2726
import java.util.Map;
2827

2928
class AgenticModelFactory implements WorkflowModelFactory {
3029

31-
private static final String DEFAULT_AGENTIC_SCOPE_STATE_KEY = "input";
30+
static final String DEFAULT_AGENTIC_SCOPE_STATE_KEY = "input";
3231
private final AgenticScopeRegistryAssessor scopeRegistryAssessor =
3332
new AgenticScopeRegistryAssessor();
3433

35-
private AgenticModel asAgenticModel(Object value) {
36-
// TODO: fetch memoryId from the object based on known premises
37-
final AgenticScope agenticScope = this.scopeRegistryAssessor.getAgenticScope();
38-
if (value != null) {
39-
agenticScope.writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value);
40-
}
41-
return new AgenticModel(agenticScope);
42-
}
43-
44-
/**
45-
* Applies any change to the model after running as task. We will always set it to a @AgenticScope
46-
* object since @AgentExecutor is always adding the output to the agenticScope. We just have to
47-
* make sure that agenticScope is always passed to the next input task.
48-
*
49-
* @param prev the global AgenticScope object getting updated by the workflow context
50-
* @param obj the same AgenticScope object updated by the AgentExecutor
51-
* @return the workflow context model holding the agenticScope object.
52-
*/
5334
@Override
35+
@SuppressWarnings("unchecked")
5436
public WorkflowModel fromAny(WorkflowModel prev, Object obj) {
55-
// We ignore `obj` since it's already included in `prev` within the agenticScope instance
56-
return prev;
37+
// TODO: we shouldn't update the state if the previous task was an agent call since under the
38+
// hood, the agent already updated it.
39+
if (prev instanceof AgenticModel agenticModel) {
40+
this.scopeRegistryAssessor.setAgenticScope(agenticModel.getAgenticScope());
41+
}
42+
43+
if (obj instanceof Map) {
44+
this.scopeRegistryAssessor.getAgenticScope().writeStates((Map<String, Object>) obj);
45+
} else {
46+
this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, obj);
47+
}
48+
49+
return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), obj);
5750
}
5851

5952
@Override
@@ -66,60 +59,60 @@ public WorkflowModel combine(Map<String, WorkflowModel> workflowVariables) {
6659

6760
@Override
6861
public WorkflowModelCollection createCollection() {
69-
throw new UnsupportedOperationException();
62+
return new AgenticModelCollection(this.scopeRegistryAssessor.getAgenticScope());
7063
}
7164

72-
// TODO: all these methods can use agenticScope as long as we have access to the `outputName`
73-
7465
@Override
7566
public WorkflowModel from(boolean value) {
76-
return asAgenticModel(value);
67+
this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value);
68+
return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), value);
7769
}
7870

7971
@Override
8072
public WorkflowModel from(Number value) {
81-
return asAgenticModel(value);
73+
this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value);
74+
return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), value);
8275
}
8376

8477
@Override
8578
public WorkflowModel from(String value) {
86-
return asAgenticModel(value);
79+
this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value);
80+
return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), value);
8781
}
8882

8983
@Override
9084
public WorkflowModel from(CloudEvent ce) {
91-
// TODO: serialize the CE into the AgenticScope
92-
return new JavaModel(ce);
85+
return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), ce);
9386
}
9487

9588
@Override
9689
public WorkflowModel from(CloudEventData ce) {
97-
// TODO: serialize the CE data into the AgenticScope
98-
return new JavaModel(ce);
90+
return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), ce);
9991
}
10092

10193
@Override
10294
public WorkflowModel from(OffsetDateTime value) {
103-
return asAgenticModel(value);
95+
this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value);
96+
return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), value);
10497
}
10598

10699
@Override
107100
public WorkflowModel from(Map<String, Object> map) {
108-
final AgenticScope agenticScope = this.scopeRegistryAssessor.getAgenticScope();
109-
agenticScope.writeStates(map);
110-
return new AgenticModel(agenticScope);
101+
this.scopeRegistryAssessor.getAgenticScope().writeStates(map);
102+
return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), map);
111103
}
112104

113105
@Override
114106
public WorkflowModel fromNull() {
115-
return asAgenticModel(null);
107+
return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), null);
116108
}
117109

118110
@Override
119111
public WorkflowModel fromOther(Object value) {
120-
if (value instanceof AgenticScope) {
121-
return new AgenticModel((AgenticScope) value);
112+
if (value instanceof AgenticScope scope) {
113+
return new AgenticModel(scope, scope.state());
122114
}
123-
return asAgenticModel(value);
115+
this.scopeRegistryAssessor.getAgenticScope().writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, value);
116+
return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), value);
124117
}
125118
}

experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/langchain4j/AgenticScopeRegistryAssessor.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.serverlessworkflow.impl.expressions.agentic.langchain4j;
1717

1818
import dev.langchain4j.agentic.internal.AgenticScopeOwner;
19+
import dev.langchain4j.agentic.scope.AgenticScope;
1920
import dev.langchain4j.agentic.scope.AgenticScopeRegistry;
2021
import dev.langchain4j.agentic.scope.DefaultAgenticScope;
2122
import java.util.Objects;
@@ -27,7 +28,7 @@ public class AgenticScopeRegistryAssessor implements AgenticScopeOwner {
2728
private final AtomicReference<AgenticScopeRegistry> agenticScopeRegistry =
2829
new AtomicReference<>();
2930
private final String agentId;
30-
private DefaultAgenticScope agenticScope;
31+
private AgenticScope agenticScope;
3132
private Object memoryId;
3233

3334
public AgenticScopeRegistryAssessor(String agentId) {
@@ -44,7 +45,7 @@ public void setMemoryId(Object memoryId) {
4445
this.memoryId = memoryId;
4546
}
4647

47-
public DefaultAgenticScope getAgenticScope() {
48+
public AgenticScope getAgenticScope() {
4849
if (agenticScope != null) {
4950
return agenticScope;
5051
}
@@ -57,6 +58,10 @@ public DefaultAgenticScope getAgenticScope() {
5758
return this.agenticScope;
5859
}
5960

61+
public void setAgenticScope(AgenticScope agenticScope) {
62+
this.agenticScope = agenticScope;
63+
}
64+
6065
@Override
6166
public AgenticScopeOwner withAgenticScope(DefaultAgenticScope agenticScope) {
6267
this.agenticScope = agenticScope;

experimental/lambda/src/main/java/io/serverlessworkflow/impl/expressions/func/JavaModelCollection.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public boolean isEmpty() {
4646

4747
@Override
4848
public boolean contains(Object o) {
49-
throw new UnsupportedOperationException();
49+
throw new UnsupportedOperationException("contains() is not supported yet");
5050
}
5151

5252
private class ModelIterator implements Iterator<WorkflowModel> {
@@ -80,12 +80,12 @@ public Iterator<WorkflowModel> iterator() {
8080

8181
@Override
8282
public Object[] toArray() {
83-
throw new UnsupportedOperationException();
83+
throw new UnsupportedOperationException("toArray is not supported yet");
8484
}
8585

8686
@Override
8787
public <T> T[] toArray(T[] a) {
88-
throw new UnsupportedOperationException();
88+
throw new UnsupportedOperationException("toArray is not supported yet");
8989
}
9090

9191
@Override
@@ -100,7 +100,7 @@ public boolean remove(Object o) {
100100

101101
@Override
102102
public boolean containsAll(Collection<?> c) {
103-
throw new UnsupportedOperationException();
103+
throw new UnsupportedOperationException("containsAll is not supported yet");
104104
}
105105

106106
@Override
@@ -119,7 +119,7 @@ public boolean removeAll(Collection<?> c) {
119119

120120
@Override
121121
public boolean retainAll(Collection<?> c) {
122-
throw new UnsupportedOperationException();
122+
throw new UnsupportedOperationException("retainAll() is not supported yet");
123123
}
124124

125125
@Override

experimental/lambda/src/main/java/io/serverlessworkflow/impl/expressions/func/JavaModelFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import java.time.OffsetDateTime;
2424
import java.util.Map;
2525

26-
class JavaModelFactory implements WorkflowModelFactory {
26+
public class JavaModelFactory implements WorkflowModelFactory {
2727
private final JavaModel TrueModel = new JavaModel(Boolean.TRUE);
2828
private final JavaModel FalseModel = new JavaModel(Boolean.FALSE);
2929
private final JavaModel NullModel = new JavaModel(null);

fluent/agentic-langchain4j/src/main/java/io/serverlessworkflow/fluent/agentic/langchain4j/WorkflowInvocationHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,20 +115,20 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
115115
}
116116

117117
// invoke
118-
return executeWorkflow(currentCognisphere(method, args), method, args);
118+
return executeWorkflow(currentAgenticScope(method, args), method, args);
119119
}
120120

121-
private Object executeWorkflow(DefaultAgenticScope agenticScope, Method method, Object[] args) {
121+
private Object executeWorkflow(AgenticScope agenticScope, Method method, Object[] args) {
122122
writeAgenticScopeState(agenticScope, method, args);
123123

124124
try (WorkflowApplication app = workflowApplicationBuilder.build()) {
125125
// TODO improve result handling
126-
DefaultAgenticScope output =
126+
AgenticScope output =
127127
app.workflowDefinition(workflow)
128128
.instance(agenticScope)
129129
.start()
130130
.get()
131-
.as(DefaultAgenticScope.class)
131+
.as(AgenticScope.class)
132132
.orElseThrow(
133133
() ->
134134
new IllegalArgumentException(
@@ -149,7 +149,7 @@ private Object executeWorkflow(DefaultAgenticScope agenticScope, Method method,
149149
}
150150
}
151151

152-
private DefaultAgenticScope currentCognisphere(Method method, Object[] args) {
152+
private AgenticScope currentAgenticScope(Method method, Object[] args) {
153153
Object memoryId = memoryId(method, args);
154154
this.agenticScopeRegistryAssessor.setMemoryId(memoryId);
155155
return this.agenticScopeRegistryAssessor.getAgenticScope();

0 commit comments

Comments
 (0)