Skip to content

Commit 9f7b967

Browse files
Add the ChatBot Human-in-the-loop example (#703)
* Introduce ChatBot example Signed-off-by: Ricardo Zanini <[email protected]> * Refactor agentic to hold scope and current workflow context Signed-off-by: Ricardo Zanini <[email protected]> * Small refactor Signed-off-by: Ricardo Zanini <[email protected]> * Adding until to listen Signed-off-by: fjtirado <[email protected]> * Adjusting model to ingest CE Signed-off-by: Ricardo Zanini <[email protected]> * Add listen tasks Signed-off-by: Ricardo Zanini <[email protected]> * Format and headers Signed-off-by: Ricardo Zanini <[email protected]> * Adjusting the chatBotIT to add when handlers to emit Signed-off-by: Ricardo Zanini <[email protected]> * Refactor workflow to run until receive a finalized message Signed-off-by: Ricardo Zanini <[email protected]> --------- Signed-off-by: Ricardo Zanini <[email protected]> Signed-off-by: fjtirado <[email protected]> Co-authored-by: fjtirado <[email protected]>
1 parent be83be3 commit 9f7b967

File tree

43 files changed

+1219
-230
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1219
-230
lines changed

experimental/agentic/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<version>8.0.0-SNAPSHOT</version>
88
</parent>
99
<artifactId>serverlessworkflow-experimental-agentic</artifactId>
10-
<name>ServelessWorkflow:: Experimental:: Agentic</name>
10+
<name>Serverless Workflow :: Experimental :: Agentic</name>
1111
<dependencies>
1212
<dependency>
1313
<groupId>io.serverlessworkflow</groupId>

experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModel.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,31 +24,33 @@
2424

2525
class AgenticModel extends JavaModel {
2626

27-
AgenticModel(AgenticScope agenticScope) {
28-
super(agenticScope);
27+
private final AgenticScope agenticScope;
28+
29+
AgenticModel(AgenticScope agenticScope, Object object) {
30+
super(object);
31+
this.agenticScope = agenticScope;
2932
}
3033

31-
@Override
32-
public void setObject(Object obj) {
33-
super.setObject(obj);
34+
public AgenticScope getAgenticScope() {
35+
return agenticScope;
3436
}
3537

3638
@Override
3739
public Collection<WorkflowModel> asCollection() {
38-
throw new UnsupportedOperationException("Not supported yet.");
40+
throw new UnsupportedOperationException("asCollection() is not supported yet.");
3941
}
4042

4143
@Override
4244
public Optional<Map<String, Object>> asMap() {
43-
return Optional.of(((AgenticScope) object).state());
45+
return Optional.of(this.agenticScope.state());
4446
}
4547

4648
@Override
4749
public <T> Optional<T> as(Class<T> clazz) {
4850
if (AgenticScope.class.isAssignableFrom(clazz)) {
49-
return Optional.of(clazz.cast(object));
51+
return Optional.of(clazz.cast(this.agenticScope));
5052
} else if (Map.class.isAssignableFrom(clazz)) {
51-
return Optional.of(clazz.cast(((AgenticScope) object).state()));
53+
return asMap().map(clazz::cast);
5254
} else {
5355
return super.as(clazz);
5456
}

experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelCollection.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,37 @@
1919
import dev.langchain4j.agentic.scope.ResultWithAgenticScope;
2020
import io.serverlessworkflow.impl.WorkflowModel;
2121
import io.serverlessworkflow.impl.expressions.func.JavaModelCollection;
22-
import java.util.Collection;
22+
import java.util.Collections;
23+
import java.util.Map;
2324
import java.util.Optional;
2425

25-
class AgenticModelCollection extends JavaModelCollection {
26+
public class AgenticModelCollection extends JavaModelCollection {
2627

2728
private final AgenticScope agenticScope;
29+
private final AgenticScopeCloudEventsHandler ceHandler;
2830

29-
AgenticModelCollection(Collection<?> object, AgenticScope agenticScope) {
30-
super(object);
31-
this.agenticScope = agenticScope;
32-
}
33-
34-
AgenticModelCollection(AgenticScope agenticScope) {
31+
AgenticModelCollection(AgenticScope agenticScope, AgenticScopeCloudEventsHandler ceHandler) {
32+
super(Collections.emptyList());
3533
this.agenticScope = agenticScope;
34+
this.ceHandler = ceHandler;
3635
}
3736

3837
@Override
39-
protected WorkflowModel nextItem(Object obj) {
40-
return new AgenticModel((AgenticScope) obj);
38+
public boolean add(WorkflowModel e) {
39+
Optional<Map<String, Object>> asMap = e.asMap();
40+
if (asMap.isPresent() && !asMap.get().isEmpty()) {
41+
this.agenticScope.writeStates(asMap.get());
42+
return super.add(e);
43+
}
44+
45+
// Update the agenticScope with the event body, so agents can use the event data as input
46+
Object value = e.asJavaObject();
47+
if (!ceHandler.writeStateIfCloudEvent(this.agenticScope, value)) {
48+
this.agenticScope.writeState(AgenticModelFactory.DEFAULT_AGENTIC_SCOPE_STATE_KEY, value);
49+
}
50+
51+
// add to the collection
52+
return super.add(e);
4153
}
4254

4355
@Override
@@ -46,6 +58,8 @@ public <T> Optional<T> as(Class<T> clazz) {
4658
return Optional.of(clazz.cast(agenticScope));
4759
} else if (ResultWithAgenticScope.class.isAssignableFrom(clazz)) {
4860
return Optional.of(clazz.cast(new ResultWithAgenticScope<>(agenticScope, object)));
61+
} else if (Map.class.isAssignableFrom(clazz)) {
62+
return Optional.of(clazz.cast(agenticScope.state()));
4963
} else {
5064
return super.as(clazz);
5165
}

experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java

Lines changed: 40 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,40 @@
2222
import io.serverlessworkflow.impl.WorkflowModelCollection;
2323
import io.serverlessworkflow.impl.WorkflowModelFactory;
2424
import io.serverlessworkflow.impl.expressions.agentic.langchain4j.AgenticScopeRegistryAssessor;
25-
import io.serverlessworkflow.impl.expressions.func.JavaModel;
2625
import java.time.OffsetDateTime;
2726
import java.util.Map;
2827

2928
class AgenticModelFactory implements WorkflowModelFactory {
3029

31-
/**
32-
* Applies any change to the model after running as task. We will always set it to a @AgenticScope
33-
* object since @AgentExecutor is always adding the output to the agenticScope. We just have to
34-
* make sure that agenticScope is always passed to the next input task.
35-
*
36-
* @param prev the global AgenticScope object getting updated by the workflow context
37-
* @param obj the same AgenticScope object updated by the AgentExecutor
38-
* @return the workflow context model holding the agenticScope object.
39-
*/
30+
static final String DEFAULT_AGENTIC_SCOPE_STATE_KEY = "input";
31+
private final AgenticScopeRegistryAssessor scopeRegistryAssessor =
32+
new AgenticScopeRegistryAssessor();
33+
private final AgenticScopeCloudEventsHandler scopeCloudEventsHandler =
34+
new AgenticScopeCloudEventsHandler();
35+
36+
@SuppressWarnings("unchecked")
37+
private AgenticModel newAgenticModel(Object state) {
38+
if (state == null) {
39+
return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), null);
40+
}
41+
42+
if (state instanceof Map) {
43+
this.scopeRegistryAssessor.writeStates((Map<String, Object>) state);
44+
} else {
45+
this.scopeRegistryAssessor.writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, state);
46+
}
47+
48+
return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), state);
49+
}
50+
4051
@Override
4152
public WorkflowModel fromAny(WorkflowModel prev, Object obj) {
42-
// We ignore `obj` since it's already included in `prev` within the agenticScope instance
43-
return prev;
53+
// TODO: we shouldn't update the state if the previous task was an agent call since under the
54+
// hood, the agent already updated it.
55+
if (prev instanceof AgenticModel agenticModel) {
56+
this.scopeRegistryAssessor.setAgenticScope(agenticModel.getAgenticScope());
57+
}
58+
return newAgenticModel(obj);
4459
}
4560

4661
@Override
@@ -53,58 +68,55 @@ public WorkflowModel combine(Map<String, WorkflowModel> workflowVariables) {
5368

5469
@Override
5570
public WorkflowModelCollection createCollection() {
56-
throw new UnsupportedOperationException();
71+
return new AgenticModelCollection(
72+
this.scopeRegistryAssessor.getAgenticScope(), scopeCloudEventsHandler);
5773
}
5874

59-
// TODO: all these methods can use agenticScope as long as we have access to the `outputName`
60-
6175
@Override
6276
public WorkflowModel from(boolean value) {
63-
return new JavaModel(value);
77+
return newAgenticModel(value);
6478
}
6579

6680
@Override
6781
public WorkflowModel from(Number value) {
68-
return new JavaModel(value);
82+
return newAgenticModel(value);
6983
}
7084

7185
@Override
7286
public WorkflowModel from(String value) {
73-
return new JavaModel(value);
87+
return newAgenticModel(value);
7488
}
7589

7690
@Override
7791
public WorkflowModel from(CloudEvent ce) {
78-
return new JavaModel(ce);
92+
return from(scopeCloudEventsHandler.extractDataAsMap(ce));
7993
}
8094

8195
@Override
8296
public WorkflowModel from(CloudEventData ce) {
83-
return new JavaModel(ce);
97+
return from(scopeCloudEventsHandler.extractDataAsMap(ce));
8498
}
8599

86100
@Override
87101
public WorkflowModel from(OffsetDateTime value) {
88-
return new JavaModel(value);
102+
return newAgenticModel(value);
89103
}
90104

91105
@Override
92106
public WorkflowModel from(Map<String, Object> map) {
93-
final AgenticScope agenticScope = new AgenticScopeRegistryAssessor().getAgenticScope();
94-
agenticScope.writeStates(map);
95-
return new AgenticModel(agenticScope);
107+
return newAgenticModel(map);
96108
}
97109

98110
@Override
99111
public WorkflowModel fromNull() {
100-
return new JavaModel(null);
112+
return newAgenticModel(null);
101113
}
102114

103115
@Override
104116
public WorkflowModel fromOther(Object value) {
105-
if (value instanceof AgenticScope) {
106-
return new AgenticModel((AgenticScope) value);
117+
if (value instanceof AgenticScope scope) {
118+
return new AgenticModel(scope, scope.state());
107119
}
108-
return new JavaModel(value);
120+
return newAgenticModel(value);
109121
}
110122
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.expressions.agentic;
17+
18+
import com.fasterxml.jackson.core.type.TypeReference;
19+
import com.fasterxml.jackson.databind.ObjectMapper;
20+
import dev.langchain4j.agentic.scope.AgenticScope;
21+
import io.cloudevents.CloudEvent;
22+
import io.cloudevents.CloudEventData;
23+
import java.io.IOException;
24+
import java.util.Map;
25+
26+
public final class AgenticScopeCloudEventsHandler {
27+
28+
private final ObjectMapper mapper = new ObjectMapper();
29+
30+
AgenticScopeCloudEventsHandler() {}
31+
32+
public void writeState(final AgenticScope scope, final CloudEvent cloudEvent) {
33+
if (cloudEvent != null) {
34+
writeState(scope, cloudEvent.getData());
35+
}
36+
}
37+
38+
public void writeState(final AgenticScope scope, final CloudEventData cloudEvent) {
39+
scope.writeStates(extractDataAsMap(cloudEvent));
40+
}
41+
42+
public boolean writeStateIfCloudEvent(final AgenticScope scope, final Object value) {
43+
if (value instanceof CloudEvent) {
44+
writeState(scope, (CloudEvent) value);
45+
return true;
46+
} else if (value instanceof CloudEventData) {
47+
writeState(scope, (CloudEventData) value);
48+
return true;
49+
}
50+
return false;
51+
}
52+
53+
public Map<String, Object> extractDataAsMap(final CloudEventData ce) {
54+
try {
55+
if (ce != null) {
56+
return mapper.readValue(ce.toBytes(), new TypeReference<>() {});
57+
}
58+
} catch (IOException e) {
59+
throw new IllegalArgumentException("Unable to parse CloudEvent data as JSON", e);
60+
}
61+
return Map.of();
62+
}
63+
64+
public Map<String, Object> extractDataAsMap(final CloudEvent ce) {
65+
if (ce != null) {
66+
return extractDataAsMap(ce.getData());
67+
}
68+
return Map.of();
69+
}
70+
}

experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/langchain4j/AgenticScopeRegistryAssessor.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
package io.serverlessworkflow.impl.expressions.agentic.langchain4j;
1717

1818
import dev.langchain4j.agentic.internal.AgenticScopeOwner;
19+
import dev.langchain4j.agentic.scope.AgenticScope;
1920
import dev.langchain4j.agentic.scope.AgenticScopeRegistry;
2021
import dev.langchain4j.agentic.scope.DefaultAgenticScope;
22+
import java.util.Map;
2123
import java.util.Objects;
2224
import java.util.UUID;
2325
import java.util.concurrent.atomic.AtomicReference;
@@ -27,7 +29,7 @@ public class AgenticScopeRegistryAssessor implements AgenticScopeOwner {
2729
private final AtomicReference<AgenticScopeRegistry> agenticScopeRegistry =
2830
new AtomicReference<>();
2931
private final String agentId;
30-
private DefaultAgenticScope agenticScope;
32+
private AgenticScope agenticScope;
3133
private Object memoryId;
3234

3335
public AgenticScopeRegistryAssessor(String agentId) {
@@ -44,7 +46,7 @@ public void setMemoryId(Object memoryId) {
4446
this.memoryId = memoryId;
4547
}
4648

47-
public DefaultAgenticScope getAgenticScope() {
49+
public AgenticScope getAgenticScope() {
4850
if (agenticScope != null) {
4951
return agenticScope;
5052
}
@@ -57,9 +59,21 @@ public DefaultAgenticScope getAgenticScope() {
5759
return this.agenticScope;
5860
}
5961

62+
public void setAgenticScope(AgenticScope agenticScope) {
63+
this.agenticScope = Objects.requireNonNull(agenticScope, "AgenticScope cannot be null");
64+
}
65+
66+
public void writeState(String key, Object value) {
67+
this.getAgenticScope().writeState(key, value);
68+
}
69+
70+
public void writeStates(Map<String, Object> states) {
71+
this.getAgenticScope().writeStates(states);
72+
}
73+
6074
@Override
6175
public AgenticScopeOwner withAgenticScope(DefaultAgenticScope agenticScope) {
62-
this.agenticScope = agenticScope;
76+
this.setAgenticScope(agenticScope);
6377
return this;
6478
}
6579

experimental/lambda/src/main/java/io/serverlessworkflow/impl/expressions/func/JavaModel.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public Optional<Number> asNumber() {
6565

6666
@Override
6767
public Optional<Map<String, Object>> asMap() {
68+
6869
return object instanceof Map ? Optional.of((Map<String, Object>) object) : Optional.empty();
6970
}
7071

0 commit comments

Comments
 (0)