Skip to content

Commit 4547503

Browse files
committed
Fix IT; Add EventDataFunction
Signed-off-by: Ricardo Zanini <[email protected]>
1 parent dd38a9f commit 4547503

File tree

4 files changed

+232
-204
lines changed

4 files changed

+232
-204
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package io.serverlessworkflow.api.types.func;
2+
3+
import io.serverlessworkflow.api.types.EventData;
4+
import java.util.Objects;
5+
import java.util.function.Function;
6+
7+
public class EventDataFunction extends EventData {
8+
9+
public <T, V> EventData withFunction(Function<T, V> value) {
10+
setObject(value);
11+
return this;
12+
}
13+
14+
public <T, V> EventData withFunction(Function<T, V> value, Class<T> argClass) {
15+
Objects.requireNonNull(argClass);
16+
setObject(new TypedFunction<>(value, argClass));
17+
return this;
18+
}
19+
}

fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java

Lines changed: 91 additions & 204 deletions
Original file line numberDiff line numberDiff line change
@@ -15,228 +15,115 @@
1515
*/
1616
package io.serverlessworkflow.fluent.agentic;
1717

18-
import static org.assertj.core.api.Assertions.assertThat;
19-
import static org.junit.jupiter.api.Assertions.assertEquals;
20-
import static org.junit.jupiter.api.Assertions.assertFalse;
21-
import static org.junit.jupiter.api.Assertions.assertNotNull;
22-
import static org.junit.jupiter.api.Assertions.assertTrue;
23-
import static org.junit.jupiter.api.Assertions.fail;
24-
import static org.mockito.Mockito.spy;
18+
import java.util.Map;
19+
import java.util.concurrent.BlockingQueue;
20+
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.LinkedBlockingQueue;
22+
import java.util.concurrent.TimeUnit;
23+
24+
import org.junit.jupiter.api.Test;
2525

2626
import dev.langchain4j.agentic.AgenticServices;
27-
import dev.langchain4j.agentic.scope.AgenticScope;
2827
import dev.langchain4j.memory.chat.MessageWindowChatMemory;
2928
import io.cloudevents.CloudEvent;
30-
import io.cloudevents.core.v1.CloudEventBuilder;
3129
import io.serverlessworkflow.api.types.EventFilter;
3230
import io.serverlessworkflow.api.types.EventProperties;
3331
import io.serverlessworkflow.api.types.Workflow;
3432
import io.serverlessworkflow.impl.WorkflowApplication;
3533
import io.serverlessworkflow.impl.WorkflowInstance;
3634
import io.serverlessworkflow.impl.WorkflowModel;
3735
import io.serverlessworkflow.impl.WorkflowStatus;
38-
import java.net.URI;
39-
import java.time.OffsetDateTime;
40-
import java.util.Map;
41-
import java.util.Optional;
42-
import java.util.UUID;
43-
import java.util.concurrent.BlockingQueue;
44-
import java.util.concurrent.CompletableFuture;
45-
import java.util.concurrent.LinkedBlockingQueue;
46-
import java.util.concurrent.TimeUnit;
47-
import org.junit.jupiter.api.Disabled;
48-
import org.junit.jupiter.api.Test;
36+
37+
import static org.assertj.core.api.Assertions.assertThat;
38+
import static org.junit.jupiter.api.Assertions.assertEquals;
39+
import static org.junit.jupiter.api.Assertions.assertNotNull;
40+
import static org.junit.jupiter.api.Assertions.fail;
41+
import static org.mockito.Mockito.spy;
4942

5043
public class ChatBotIT {
5144

52-
@Test
53-
@SuppressWarnings("unchecked")
54-
@Disabled("Figuring out event processing")
55-
void chat_bot() {
56-
Agents.ChatBot chatBot =
57-
spy(
58-
AgenticServices.agentBuilder(Agents.ChatBot.class)
59-
.chatModel(Models.BASE_MODEL)
60-
// .chatMemoryProvider(memoryId -> MessageWindowChatMemory.withMaxMessages(10))
61-
.outputName("conversation")
62-
.build());
63-
BlockingQueue<CloudEvent> replyEvents = new LinkedBlockingQueue<>();
64-
BlockingQueue<CloudEvent> finishedEvents = new LinkedBlockingQueue<>();
65-
66-
// 1. listen to an event containing `message` key in the body
67-
// 2. if contains, call the agent, if not end the workflow
68-
// 3. After replying to the chat, return
69-
final Workflow listenWorkflow =
70-
AgentWorkflowBuilder.workflow("chat-bot")
71-
.tasks(
72-
t ->
73-
t.listen(
74-
l ->
75-
l.to(
76-
to ->
77-
to.any(
78-
c ->
79-
c.with(
80-
event ->
81-
event.type(
82-
"org.acme.chatbot.request")))
83-
.until(
84-
until ->
85-
until.one(
86-
one ->
87-
one.with(
88-
e ->
89-
e.type(
90-
"org.acme.chatbot.finalize")))))
91-
.forEach(
92-
f ->
93-
f.tasks(
94-
tasks ->
95-
tasks
45+
@Test
46+
@SuppressWarnings("unchecked")
47+
void chat_bot() {
48+
Agents.ChatBot chatBot =
49+
spy(
50+
AgenticServices.agentBuilder(Agents.ChatBot.class)
51+
.chatModel(Models.BASE_MODEL)
52+
.chatMemoryProvider(memoryId -> MessageWindowChatMemory.withMaxMessages(10))
53+
.outputName("conversation")
54+
.build());
55+
BlockingQueue<CloudEvent> replyEvents = new LinkedBlockingQueue<>();
56+
BlockingQueue<CloudEvent> finishedEvents = new LinkedBlockingQueue<>();
57+
58+
// 1. listen to an event containing `message` key in the body
59+
// 2. if contains, call the agent, if not end the workflow
60+
// 3. After replying to the chat, return
61+
final Workflow listenWorkflow =
62+
AgentWorkflowBuilder.workflow("chat-bot")
63+
.tasks(t -> t.listen(l ->
64+
l.to(to -> to.any(c -> c.with(event -> event.type("org.acme.chatbot.request")))
65+
.until(until -> until.one(one -> one.with(e -> e.type("org.acme.chatbot.finalize")))))
66+
.forEach(f -> f.tasks(tasks -> tasks
9667
.agent(chatBot)
97-
.emit(
98-
emit ->
99-
emit.event(
100-
e ->
101-
e.type(
102-
"org.acme.chatbot.reply"))))))
103-
.emit(emit -> emit.event(e -> e.type("org.acme.chatbot.finished"))))
104-
.build();
105-
106-
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
107-
app.eventConsumer()
108-
.register(
109-
app.eventConsumer()
110-
.listen(
111-
new EventFilter()
112-
.withWith(new EventProperties().withType("org.acme.chatbot.reply")),
113-
app),
114-
ce -> replyEvents.add((CloudEvent) ce));
115-
116-
app.eventConsumer()
117-
.register(
118-
app.eventConsumer()
119-
.listen(
120-
new EventFilter()
121-
.withWith(new EventProperties().withType("org.acme.chatbot.finished")),
122-
app),
123-
ce -> finishedEvents.add((CloudEvent) ce));
124-
125-
final WorkflowInstance waitingInstance =
126-
app.workflowDefinition(listenWorkflow).instance(Map.of());
127-
final CompletableFuture<WorkflowModel> runningModel = waitingInstance.start();
128-
129-
// The workflow is just waiting for the event
130-
assertEquals(WorkflowStatus.WAITING, waitingInstance.status());
131-
132-
// Publish the events
133-
app.eventPublisher().publish(newMessageEvent("Hello World!"));
134-
CloudEvent reply = replyEvents.poll(60, TimeUnit.SECONDS);
135-
assertNotNull(reply);
136-
137-
// Empty message completes the workflow
138-
app.eventPublisher().publish(newMessageEvent("", "org.acme.chatbot.finalize"));
139-
CloudEvent finished = finishedEvents.poll(60, TimeUnit.SECONDS);
140-
assertNotNull(finished);
141-
assertThat(finishedEvents).isEmpty();
142-
143-
assertThat(runningModel).isCompleted();
144-
assertEquals(WorkflowStatus.COMPLETED, waitingInstance.status());
145-
146-
} catch (InterruptedException e) {
147-
fail(e.getMessage());
148-
}
149-
}
150-
151-
/**
152-
* In this test we validate a workflow mixed with agents and regular Java calls
153-
*
154-
* <p>
155-
*
156-
* <ol>
157-
* <li>The first function prints the message input and converts the data into a Map for the
158-
* agent ingestion
159-
* <li>Internally, our factories will add the output to a new AgenticScope since under the hood,
160-
* we are call `as(AgenticScope)`
161-
* <li>The agent is then called with a scope with a state as `message="input"`
162-
* <li>The agent updates the state automatically in the AgenticScope and returns the message as
163-
* a string, this string is then served to the next task
164-
* <li>The next task process the agent response and returns it ending the workflow. Meanwhile,
165-
* the AgenticScope is always updated with the latest result from the given task.
166-
* </ol>
167-
*/
168-
@Test
169-
void mixed_workflow() {
170-
Agents.ChatBot chatBot =
171-
spy(
172-
AgenticServices.agentBuilder(Agents.ChatBot.class)
173-
.chatModel(Models.BASE_MODEL)
174-
.chatMemoryProvider(memoryId -> MessageWindowChatMemory.withMaxMessages(10))
175-
.outputName("userInput")
176-
.build());
177-
178-
final Workflow mixedWorkflow =
179-
AgentWorkflowBuilder.workflow("chat-bot")
180-
.tasks(
181-
t ->
182-
t.callFn(
183-
callJ ->
184-
callJ.function(
185-
input -> {
186-
System.out.println(input);
187-
return Map.of("userInput", input);
188-
},
189-
String.class))
190-
.agent(chatBot)
191-
.callFn(
192-
callJ ->
193-
callJ.function(
194-
input -> {
195-
System.out.println(input);
196-
// Here, we are return a simple string so the internal
197-
// AgenticScope will add it to the default `input` key
198-
// If we want to really manipulate it, we could return a
199-
// Map<>(message, input)
200-
return "I've changed the input [" + input + "]";
201-
},
202-
String.class)))
203-
.build();
204-
205-
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
206-
WorkflowModel model =
207-
app.workflowDefinition(mixedWorkflow).instance("Hello World!").start().join();
208-
209-
Optional<String> resultAsString = model.as(String.class);
210-
211-
assertTrue(resultAsString.isPresent());
212-
assertFalse(resultAsString.get().isEmpty());
213-
assertTrue(resultAsString.get().contains("changed the input"));
214-
215-
Optional<AgenticScope> resultAsScope = model.as(AgenticScope.class);
216-
217-
assertTrue(resultAsScope.isPresent());
218-
assertFalse(resultAsScope.get().readState("input").toString().isEmpty());
219-
assertTrue(resultAsScope.get().readState("input").toString().contains("changed the input"));
68+
.emit(emit -> emit.event(e -> e.type("org.acme.chatbot.reply").data(".conversation"))))))
69+
.emit(emit -> emit.event(e -> e.type("org.acme.chatbot.finished"))))
70+
.build();
71+
72+
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
73+
app.eventConsumer()
74+
.register(
75+
app.eventConsumer()
76+
.listen(
77+
new EventFilter()
78+
.withWith(new EventProperties().withType("org.acme.chatbot.reply")),
79+
app),
80+
ce -> replyEvents.add((CloudEvent) ce));
81+
82+
app.eventConsumer()
83+
.register(
84+
app.eventConsumer()
85+
.listen(
86+
new EventFilter()
87+
.withWith(new EventProperties().withType("org.acme.chatbot.finished")),
88+
app),
89+
ce -> finishedEvents.add((CloudEvent) ce));
90+
91+
final WorkflowInstance waitingInstance =
92+
app.workflowDefinition(listenWorkflow).instance(Map.of());
93+
final CompletableFuture<WorkflowModel> runningModel = waitingInstance.start();
94+
95+
// The workflow is just waiting for the event
96+
assertEquals(WorkflowStatus.WAITING, waitingInstance.status());
97+
98+
// Publish the events
99+
app.eventPublisher().publish(newRequestMessage("Hi! Can you tell me a good duck joke?"));
100+
CloudEvent reply = replyEvents.poll(60, TimeUnit.SECONDS);
101+
assertNotNull(reply);
102+
103+
app.eventPublisher().publish(newRequestMessage("Oh I didn't like this one, please tell me another."));
104+
reply = replyEvents.poll(60, TimeUnit.SECONDS);
105+
assertNotNull(reply);
106+
107+
// Empty message completes the workflow
108+
app.eventPublisher().publish(newFinalizeMessage());
109+
CloudEvent finished = finishedEvents.poll(60, TimeUnit.SECONDS);
110+
assertNotNull(finished);
111+
assertThat(finishedEvents).isEmpty();
112+
113+
assertThat(runningModel).isCompleted();
114+
assertEquals(WorkflowStatus.COMPLETED, waitingInstance.status());
115+
116+
} catch (InterruptedException e) {
117+
fail(e.getMessage());
118+
}
220119
}
221-
}
222120

223-
private CloudEvent newMessageEvent(String message) {
224-
return newMessageEvent(message, null);
225-
}
226121

227-
private CloudEvent newMessageEvent(String message, String type) {
228-
if (type == null || type.isEmpty()) {
229-
type = "org.acme.chatbot.request";
122+
private CloudEvent newRequestMessage(String message) {
123+
return CloudEventsTestBuilder.newMessage(String.format("{\"userInput\": \"%s\"}", message), "org.acme.chatbot.request");
230124
}
231125

232-
return new CloudEventBuilder()
233-
.withData(String.format("{\"userInput\": \"%s\"}", message).getBytes())
234-
.withType(type)
235-
.withId(UUID.randomUUID().toString())
236-
.withDataContentType("application/json")
237-
.withSource(URI.create("test://localhost"))
238-
.withSubject("A chatbot message")
239-
.withTime(OffsetDateTime.now())
240-
.build();
241-
}
126+
private CloudEvent newFinalizeMessage() {
127+
return CloudEventsTestBuilder.newMessage("", "org.acme.chatbot.finalize");
128+
}
242129
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.serverlessworkflow.fluent.agentic;
2+
3+
import java.net.URI;
4+
import java.time.OffsetDateTime;
5+
import java.util.UUID;
6+
7+
import io.cloudevents.CloudEvent;
8+
import io.cloudevents.core.v1.CloudEventBuilder;
9+
10+
public final class CloudEventsTestBuilder {
11+
12+
private CloudEventsTestBuilder(){
13+
14+
}
15+
16+
public static CloudEvent newMessage(String data, String type) {
17+
if (data == null) {
18+
data = "";
19+
}
20+
return new CloudEventBuilder()
21+
.withData(data.getBytes())
22+
.withType(type)
23+
.withId(UUID.randomUUID().toString())
24+
.withDataContentType("application/json")
25+
.withSource(URI.create("test://localhost"))
26+
.withSubject("A chatbot message")
27+
.withTime(OffsetDateTime.now())
28+
.build();
29+
}
30+
31+
}

0 commit comments

Comments
 (0)