Skip to content

Commit cfcc4a2

Browse files
committed
Add FuncEventData
Signed-off-by: Ricardo Zanini <[email protected]>
1 parent 4547503 commit cfcc4a2

22 files changed

+701
-372
lines changed

experimental/types/src/main/java/io/serverlessworkflow/api/types/func/EventDataFunction.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package io.serverlessworkflow.api.types.func;
217

318
import io.serverlessworkflow.api.types.EventData;

fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentListenTaskBuilder.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,29 @@
1818
import io.serverlessworkflow.api.types.AnyEventConsumptionStrategy;
1919
import io.serverlessworkflow.api.types.ListenTask;
2020
import io.serverlessworkflow.api.types.func.UntilPredicate;
21-
import io.serverlessworkflow.fluent.spec.ListenTaskBuilder;
21+
import io.serverlessworkflow.fluent.func.FuncListenToBuilder;
22+
import io.serverlessworkflow.fluent.spec.AbstractListenTaskBuilder;
2223
import java.util.function.Predicate;
2324

24-
public class AgentListenTaskBuilder extends ListenTaskBuilder<AgentTaskItemListBuilder> {
25+
public class AgentListenTaskBuilder
26+
extends AbstractListenTaskBuilder<AgentTaskItemListBuilder, FuncListenToBuilder> {
2527

2628
private UntilPredicate untilPredicate;
2729

2830
public AgentListenTaskBuilder() {
2931
super(new AgentTaskItemListBuilder());
3032
}
3133

34+
@Override
35+
protected AgentListenTaskBuilder self() {
36+
return this;
37+
}
38+
39+
@Override
40+
protected FuncListenToBuilder newEventConsumptionStrategyBuilder() {
41+
return new FuncListenToBuilder();
42+
}
43+
3244
public <T> AgentListenTaskBuilder until(Predicate<T> predicate, Class<T> predClass) {
3345
untilPredicate = new UntilPredicate().withPredicate(predicate, predClass);
3446
return this;

fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java

Lines changed: 120 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,11 @@
1515
*/
1616
package io.serverlessworkflow.fluent.agentic;
1717

18-
import java.util.Map;
19-
import java.util.concurrent.BlockingQueue;
20-
import java.util.concurrent.CompletableFuture;
21-
import java.util.concurrent.LinkedBlockingQueue;
22-
import java.util.concurrent.TimeUnit;
23-
24-
import org.junit.jupiter.api.Test;
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.junit.jupiter.api.Assertions.assertEquals;
20+
import static org.junit.jupiter.api.Assertions.assertNotNull;
21+
import static org.junit.jupiter.api.Assertions.fail;
22+
import static org.mockito.Mockito.spy;
2523

2624
import dev.langchain4j.agentic.AgenticServices;
2725
import dev.langchain4j.memory.chat.MessageWindowChatMemory;
@@ -33,97 +31,126 @@
3331
import io.serverlessworkflow.impl.WorkflowInstance;
3432
import io.serverlessworkflow.impl.WorkflowModel;
3533
import io.serverlessworkflow.impl.WorkflowStatus;
36-
37-
import static org.assertj.core.api.Assertions.assertThat;
38-
import static org.junit.jupiter.api.Assertions.assertEquals;
39-
import static org.junit.jupiter.api.Assertions.assertNotNull;
40-
import static org.junit.jupiter.api.Assertions.fail;
41-
import static org.mockito.Mockito.spy;
34+
import java.util.Map;
35+
import java.util.concurrent.BlockingQueue;
36+
import java.util.concurrent.CompletableFuture;
37+
import java.util.concurrent.LinkedBlockingQueue;
38+
import java.util.concurrent.TimeUnit;
39+
import org.junit.jupiter.api.Test;
4240

4341
public class ChatBotIT {
4442

45-
@Test
46-
@SuppressWarnings("unchecked")
47-
void chat_bot() {
48-
Agents.ChatBot chatBot =
49-
spy(
50-
AgenticServices.agentBuilder(Agents.ChatBot.class)
51-
.chatModel(Models.BASE_MODEL)
52-
.chatMemoryProvider(memoryId -> MessageWindowChatMemory.withMaxMessages(10))
53-
.outputName("conversation")
54-
.build());
55-
BlockingQueue<CloudEvent> replyEvents = new LinkedBlockingQueue<>();
56-
BlockingQueue<CloudEvent> finishedEvents = new LinkedBlockingQueue<>();
57-
58-
// 1. listen to an event containing `message` key in the body
59-
// 2. if contains, call the agent, if not end the workflow
60-
// 3. After replying to the chat, return
61-
final Workflow listenWorkflow =
62-
AgentWorkflowBuilder.workflow("chat-bot")
63-
.tasks(t -> t.listen(l ->
64-
l.to(to -> to.any(c -> c.with(event -> event.type("org.acme.chatbot.request")))
65-
.until(until -> until.one(one -> one.with(e -> e.type("org.acme.chatbot.finalize")))))
66-
.forEach(f -> f.tasks(tasks -> tasks
43+
@Test
44+
@SuppressWarnings("unchecked")
45+
void chat_bot() {
46+
Agents.ChatBot chatBot =
47+
spy(
48+
AgenticServices.agentBuilder(Agents.ChatBot.class)
49+
.chatModel(Models.BASE_MODEL)
50+
.chatMemoryProvider(memoryId -> MessageWindowChatMemory.withMaxMessages(10))
51+
.outputName("conversation")
52+
.build());
53+
BlockingQueue<CloudEvent> replyEvents = new LinkedBlockingQueue<>();
54+
BlockingQueue<CloudEvent> finishedEvents = new LinkedBlockingQueue<>();
55+
56+
// 1. listen to an event containing `message` key in the body
57+
// 2. if contains, call the agent, if not end the workflow
58+
// 3. After replying to the chat, return
59+
final Workflow listenWorkflow =
60+
AgentWorkflowBuilder.workflow("chat-bot")
61+
.tasks(
62+
t ->
63+
t.listen(
64+
l ->
65+
l.to(
66+
to ->
67+
to.any(
68+
c ->
69+
c.with(
70+
event ->
71+
event.type(
72+
"org.acme.chatbot.request")))
73+
.until(
74+
until ->
75+
until.one(
76+
one ->
77+
one.with(
78+
e ->
79+
e.type(
80+
"org.acme.chatbot.finalize")))))
81+
.forEach(
82+
f ->
83+
f.tasks(
84+
tasks ->
85+
tasks
6786
.agent(chatBot)
68-
.emit(emit -> emit.event(e -> e.type("org.acme.chatbot.reply").data(".conversation"))))))
69-
.emit(emit -> emit.event(e -> e.type("org.acme.chatbot.finished"))))
70-
.build();
71-
72-
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
73-
app.eventConsumer()
74-
.register(
75-
app.eventConsumer()
76-
.listen(
77-
new EventFilter()
78-
.withWith(new EventProperties().withType("org.acme.chatbot.reply")),
79-
app),
80-
ce -> replyEvents.add((CloudEvent) ce));
81-
82-
app.eventConsumer()
83-
.register(
84-
app.eventConsumer()
85-
.listen(
86-
new EventFilter()
87-
.withWith(new EventProperties().withType("org.acme.chatbot.finished")),
88-
app),
89-
ce -> finishedEvents.add((CloudEvent) ce));
90-
91-
final WorkflowInstance waitingInstance =
92-
app.workflowDefinition(listenWorkflow).instance(Map.of());
93-
final CompletableFuture<WorkflowModel> runningModel = waitingInstance.start();
94-
95-
// The workflow is just waiting for the event
96-
assertEquals(WorkflowStatus.WAITING, waitingInstance.status());
97-
98-
// Publish the events
99-
app.eventPublisher().publish(newRequestMessage("Hi! Can you tell me a good duck joke?"));
100-
CloudEvent reply = replyEvents.poll(60, TimeUnit.SECONDS);
101-
assertNotNull(reply);
102-
103-
app.eventPublisher().publish(newRequestMessage("Oh I didn't like this one, please tell me another."));
104-
reply = replyEvents.poll(60, TimeUnit.SECONDS);
105-
assertNotNull(reply);
106-
107-
// Empty message completes the workflow
108-
app.eventPublisher().publish(newFinalizeMessage());
109-
CloudEvent finished = finishedEvents.poll(60, TimeUnit.SECONDS);
110-
assertNotNull(finished);
111-
assertThat(finishedEvents).isEmpty();
112-
113-
assertThat(runningModel).isCompleted();
114-
assertEquals(WorkflowStatus.COMPLETED, waitingInstance.status());
115-
116-
} catch (InterruptedException e) {
117-
fail(e.getMessage());
118-
}
87+
.emit(
88+
emit ->
89+
emit.event(
90+
e ->
91+
e.type(
92+
"org.acme.chatbot.reply")
93+
.data(
94+
".conversation"))))))
95+
.emit(emit -> emit.event(e -> e.type("org.acme.chatbot.finished"))))
96+
.build();
97+
98+
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
99+
app.eventConsumer()
100+
.register(
101+
app.eventConsumer()
102+
.listen(
103+
new EventFilter()
104+
.withWith(new EventProperties().withType("org.acme.chatbot.reply")),
105+
app),
106+
ce -> replyEvents.add((CloudEvent) ce));
107+
108+
app.eventConsumer()
109+
.register(
110+
app.eventConsumer()
111+
.listen(
112+
new EventFilter()
113+
.withWith(new EventProperties().withType("org.acme.chatbot.finished")),
114+
app),
115+
ce -> finishedEvents.add((CloudEvent) ce));
116+
117+
final WorkflowInstance waitingInstance =
118+
app.workflowDefinition(listenWorkflow).instance(Map.of());
119+
final CompletableFuture<WorkflowModel> runningModel = waitingInstance.start();
120+
121+
// The workflow is just waiting for the event
122+
assertEquals(WorkflowStatus.WAITING, waitingInstance.status());
123+
124+
// Publish the events
125+
app.eventPublisher().publish(newRequestMessage("Hi! Can you tell me a good duck joke?"));
126+
CloudEvent reply = replyEvents.poll(60, TimeUnit.SECONDS);
127+
assertNotNull(reply);
128+
129+
app.eventPublisher()
130+
.publish(newRequestMessage("Oh I didn't like this one, please tell me another."));
131+
reply = replyEvents.poll(60, TimeUnit.SECONDS);
132+
assertNotNull(reply);
133+
134+
// Empty message completes the workflow
135+
app.eventPublisher().publish(newFinalizeMessage());
136+
CloudEvent finished = finishedEvents.poll(60, TimeUnit.SECONDS);
137+
assertNotNull(finished);
138+
assertThat(finishedEvents).isEmpty();
139+
140+
assertThat(runningModel).isCompleted();
141+
assertEquals(WorkflowStatus.COMPLETED, waitingInstance.status());
142+
143+
} catch (InterruptedException e) {
144+
fail(e.getMessage());
119145
}
146+
}
120147

148+
private CloudEvent newRequestMessage(String message) {
149+
return CloudEventsTestBuilder.newMessage(
150+
String.format("{\"userInput\": \"%s\"}", message), "org.acme.chatbot.request");
151+
}
121152

122-
private CloudEvent newRequestMessage(String message) {
123-
return CloudEventsTestBuilder.newMessage(String.format("{\"userInput\": \"%s\"}", message), "org.acme.chatbot.request");
124-
}
125-
126-
private CloudEvent newFinalizeMessage() {
127-
return CloudEventsTestBuilder.newMessage("", "org.acme.chatbot.finalize");
128-
}
153+
private CloudEvent newFinalizeMessage() {
154+
return CloudEventsTestBuilder.newMessage("", "org.acme.chatbot.finalize");
155+
}
129156
}
Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,42 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package io.serverlessworkflow.fluent.agentic;
217

18+
import io.cloudevents.CloudEvent;
19+
import io.cloudevents.core.v1.CloudEventBuilder;
320
import java.net.URI;
421
import java.time.OffsetDateTime;
522
import java.util.UUID;
623

7-
import io.cloudevents.CloudEvent;
8-
import io.cloudevents.core.v1.CloudEventBuilder;
9-
1024
public final class CloudEventsTestBuilder {
1125

12-
private CloudEventsTestBuilder(){
13-
14-
}
26+
private CloudEventsTestBuilder() {}
1527

16-
public static CloudEvent newMessage(String data, String type) {
17-
if (data == null) {
18-
data = "";
19-
}
20-
return new CloudEventBuilder()
21-
.withData(data.getBytes())
22-
.withType(type)
23-
.withId(UUID.randomUUID().toString())
24-
.withDataContentType("application/json")
25-
.withSource(URI.create("test://localhost"))
26-
.withSubject("A chatbot message")
27-
.withTime(OffsetDateTime.now())
28-
.build();
28+
public static CloudEvent newMessage(String data, String type) {
29+
if (data == null) {
30+
data = "";
2931
}
30-
32+
return new CloudEventBuilder()
33+
.withData(data.getBytes())
34+
.withType(type)
35+
.withId(UUID.randomUUID().toString())
36+
.withDataContentType("application/json")
37+
.withSource(URI.create("test://localhost"))
38+
.withSubject("A chatbot message")
39+
.withTime(OffsetDateTime.now())
40+
.build();
41+
}
3142
}

0 commit comments

Comments
 (0)