|
15 | 15 | */
|
16 | 16 | package io.serverlessworkflow.fluent.agentic;
|
17 | 17 |
|
18 |
| -import java.util.Map; |
19 |
| -import java.util.concurrent.BlockingQueue; |
20 |
| -import java.util.concurrent.CompletableFuture; |
21 |
| -import java.util.concurrent.LinkedBlockingQueue; |
22 |
| -import java.util.concurrent.TimeUnit; |
23 |
| - |
24 |
| -import org.junit.jupiter.api.Test; |
| 18 | +import static org.assertj.core.api.Assertions.assertThat; |
| 19 | +import static org.junit.jupiter.api.Assertions.assertEquals; |
| 20 | +import static org.junit.jupiter.api.Assertions.assertNotNull; |
| 21 | +import static org.junit.jupiter.api.Assertions.fail; |
| 22 | +import static org.mockito.Mockito.spy; |
25 | 23 |
|
26 | 24 | import dev.langchain4j.agentic.AgenticServices;
|
27 | 25 | import dev.langchain4j.memory.chat.MessageWindowChatMemory;
|
|
33 | 31 | import io.serverlessworkflow.impl.WorkflowInstance;
|
34 | 32 | import io.serverlessworkflow.impl.WorkflowModel;
|
35 | 33 | import io.serverlessworkflow.impl.WorkflowStatus;
|
36 |
| - |
37 |
| -import static org.assertj.core.api.Assertions.assertThat; |
38 |
| -import static org.junit.jupiter.api.Assertions.assertEquals; |
39 |
| -import static org.junit.jupiter.api.Assertions.assertNotNull; |
40 |
| -import static org.junit.jupiter.api.Assertions.fail; |
41 |
| -import static org.mockito.Mockito.spy; |
| 34 | +import java.util.Map; |
| 35 | +import java.util.concurrent.BlockingQueue; |
| 36 | +import java.util.concurrent.CompletableFuture; |
| 37 | +import java.util.concurrent.LinkedBlockingQueue; |
| 38 | +import java.util.concurrent.TimeUnit; |
| 39 | +import org.junit.jupiter.api.Test; |
42 | 40 |
|
43 | 41 | public class ChatBotIT {
|
44 | 42 |
|
45 |
| - @Test |
46 |
| - @SuppressWarnings("unchecked") |
47 |
| - void chat_bot() { |
48 |
| - Agents.ChatBot chatBot = |
49 |
| - spy( |
50 |
| - AgenticServices.agentBuilder(Agents.ChatBot.class) |
51 |
| - .chatModel(Models.BASE_MODEL) |
52 |
| - .chatMemoryProvider(memoryId -> MessageWindowChatMemory.withMaxMessages(10)) |
53 |
| - .outputName("conversation") |
54 |
| - .build()); |
55 |
| - BlockingQueue<CloudEvent> replyEvents = new LinkedBlockingQueue<>(); |
56 |
| - BlockingQueue<CloudEvent> finishedEvents = new LinkedBlockingQueue<>(); |
57 |
| - |
58 |
| - // 1. listen to an event containing `message` key in the body |
59 |
| - // 2. if contains, call the agent, if not end the workflow |
60 |
| - // 3. After replying to the chat, return |
61 |
| - final Workflow listenWorkflow = |
62 |
| - AgentWorkflowBuilder.workflow("chat-bot") |
63 |
| - .tasks(t -> t.listen(l -> |
64 |
| - l.to(to -> to.any(c -> c.with(event -> event.type("org.acme.chatbot.request"))) |
65 |
| - .until(until -> until.one(one -> one.with(e -> e.type("org.acme.chatbot.finalize"))))) |
66 |
| - .forEach(f -> f.tasks(tasks -> tasks |
| 43 | + @Test |
| 44 | + @SuppressWarnings("unchecked") |
| 45 | + void chat_bot() { |
| 46 | + Agents.ChatBot chatBot = |
| 47 | + spy( |
| 48 | + AgenticServices.agentBuilder(Agents.ChatBot.class) |
| 49 | + .chatModel(Models.BASE_MODEL) |
| 50 | + .chatMemoryProvider(memoryId -> MessageWindowChatMemory.withMaxMessages(10)) |
| 51 | + .outputName("conversation") |
| 52 | + .build()); |
| 53 | + BlockingQueue<CloudEvent> replyEvents = new LinkedBlockingQueue<>(); |
| 54 | + BlockingQueue<CloudEvent> finishedEvents = new LinkedBlockingQueue<>(); |
| 55 | + |
| 56 | + // 1. listen to an event containing `message` key in the body |
| 57 | + // 2. if contains, call the agent, if not end the workflow |
| 58 | + // 3. After replying to the chat, return |
| 59 | + final Workflow listenWorkflow = |
| 60 | + AgentWorkflowBuilder.workflow("chat-bot") |
| 61 | + .tasks( |
| 62 | + t -> |
| 63 | + t.listen( |
| 64 | + l -> |
| 65 | + l.to( |
| 66 | + to -> |
| 67 | + to.any( |
| 68 | + c -> |
| 69 | + c.with( |
| 70 | + event -> |
| 71 | + event.type( |
| 72 | + "org.acme.chatbot.request"))) |
| 73 | + .until( |
| 74 | + until -> |
| 75 | + until.one( |
| 76 | + one -> |
| 77 | + one.with( |
| 78 | + e -> |
| 79 | + e.type( |
| 80 | + "org.acme.chatbot.finalize"))))) |
| 81 | + .forEach( |
| 82 | + f -> |
| 83 | + f.tasks( |
| 84 | + tasks -> |
| 85 | + tasks |
67 | 86 | .agent(chatBot)
|
68 |
| - .emit(emit -> emit.event(e -> e.type("org.acme.chatbot.reply").data(".conversation")))))) |
69 |
| - .emit(emit -> emit.event(e -> e.type("org.acme.chatbot.finished")))) |
70 |
| - .build(); |
71 |
| - |
72 |
| - try (WorkflowApplication app = WorkflowApplication.builder().build()) { |
73 |
| - app.eventConsumer() |
74 |
| - .register( |
75 |
| - app.eventConsumer() |
76 |
| - .listen( |
77 |
| - new EventFilter() |
78 |
| - .withWith(new EventProperties().withType("org.acme.chatbot.reply")), |
79 |
| - app), |
80 |
| - ce -> replyEvents.add((CloudEvent) ce)); |
81 |
| - |
82 |
| - app.eventConsumer() |
83 |
| - .register( |
84 |
| - app.eventConsumer() |
85 |
| - .listen( |
86 |
| - new EventFilter() |
87 |
| - .withWith(new EventProperties().withType("org.acme.chatbot.finished")), |
88 |
| - app), |
89 |
| - ce -> finishedEvents.add((CloudEvent) ce)); |
90 |
| - |
91 |
| - final WorkflowInstance waitingInstance = |
92 |
| - app.workflowDefinition(listenWorkflow).instance(Map.of()); |
93 |
| - final CompletableFuture<WorkflowModel> runningModel = waitingInstance.start(); |
94 |
| - |
95 |
| - // The workflow is just waiting for the event |
96 |
| - assertEquals(WorkflowStatus.WAITING, waitingInstance.status()); |
97 |
| - |
98 |
| - // Publish the events |
99 |
| - app.eventPublisher().publish(newRequestMessage("Hi! Can you tell me a good duck joke?")); |
100 |
| - CloudEvent reply = replyEvents.poll(60, TimeUnit.SECONDS); |
101 |
| - assertNotNull(reply); |
102 |
| - |
103 |
| - app.eventPublisher().publish(newRequestMessage("Oh I didn't like this one, please tell me another.")); |
104 |
| - reply = replyEvents.poll(60, TimeUnit.SECONDS); |
105 |
| - assertNotNull(reply); |
106 |
| - |
107 |
| - // Empty message completes the workflow |
108 |
| - app.eventPublisher().publish(newFinalizeMessage()); |
109 |
| - CloudEvent finished = finishedEvents.poll(60, TimeUnit.SECONDS); |
110 |
| - assertNotNull(finished); |
111 |
| - assertThat(finishedEvents).isEmpty(); |
112 |
| - |
113 |
| - assertThat(runningModel).isCompleted(); |
114 |
| - assertEquals(WorkflowStatus.COMPLETED, waitingInstance.status()); |
115 |
| - |
116 |
| - } catch (InterruptedException e) { |
117 |
| - fail(e.getMessage()); |
118 |
| - } |
| 87 | + .emit( |
| 88 | + emit -> |
| 89 | + emit.event( |
| 90 | + e -> |
| 91 | + e.type( |
| 92 | + "org.acme.chatbot.reply") |
| 93 | + .data( |
| 94 | + ".conversation")))))) |
| 95 | + .emit(emit -> emit.event(e -> e.type("org.acme.chatbot.finished")))) |
| 96 | + .build(); |
| 97 | + |
| 98 | + try (WorkflowApplication app = WorkflowApplication.builder().build()) { |
| 99 | + app.eventConsumer() |
| 100 | + .register( |
| 101 | + app.eventConsumer() |
| 102 | + .listen( |
| 103 | + new EventFilter() |
| 104 | + .withWith(new EventProperties().withType("org.acme.chatbot.reply")), |
| 105 | + app), |
| 106 | + ce -> replyEvents.add((CloudEvent) ce)); |
| 107 | + |
| 108 | + app.eventConsumer() |
| 109 | + .register( |
| 110 | + app.eventConsumer() |
| 111 | + .listen( |
| 112 | + new EventFilter() |
| 113 | + .withWith(new EventProperties().withType("org.acme.chatbot.finished")), |
| 114 | + app), |
| 115 | + ce -> finishedEvents.add((CloudEvent) ce)); |
| 116 | + |
| 117 | + final WorkflowInstance waitingInstance = |
| 118 | + app.workflowDefinition(listenWorkflow).instance(Map.of()); |
| 119 | + final CompletableFuture<WorkflowModel> runningModel = waitingInstance.start(); |
| 120 | + |
| 121 | + // The workflow is just waiting for the event |
| 122 | + assertEquals(WorkflowStatus.WAITING, waitingInstance.status()); |
| 123 | + |
| 124 | + // Publish the events |
| 125 | + app.eventPublisher().publish(newRequestMessage("Hi! Can you tell me a good duck joke?")); |
| 126 | + CloudEvent reply = replyEvents.poll(60, TimeUnit.SECONDS); |
| 127 | + assertNotNull(reply); |
| 128 | + |
| 129 | + app.eventPublisher() |
| 130 | + .publish(newRequestMessage("Oh I didn't like this one, please tell me another.")); |
| 131 | + reply = replyEvents.poll(60, TimeUnit.SECONDS); |
| 132 | + assertNotNull(reply); |
| 133 | + |
| 134 | + // Empty message completes the workflow |
| 135 | + app.eventPublisher().publish(newFinalizeMessage()); |
| 136 | + CloudEvent finished = finishedEvents.poll(60, TimeUnit.SECONDS); |
| 137 | + assertNotNull(finished); |
| 138 | + assertThat(finishedEvents).isEmpty(); |
| 139 | + |
| 140 | + assertThat(runningModel).isCompleted(); |
| 141 | + assertEquals(WorkflowStatus.COMPLETED, waitingInstance.status()); |
| 142 | + |
| 143 | + } catch (InterruptedException e) { |
| 144 | + fail(e.getMessage()); |
119 | 145 | }
|
| 146 | + } |
120 | 147 |
|
| 148 | + private CloudEvent newRequestMessage(String message) { |
| 149 | + return CloudEventsTestBuilder.newMessage( |
| 150 | + String.format("{\"userInput\": \"%s\"}", message), "org.acme.chatbot.request"); |
| 151 | + } |
121 | 152 |
|
122 |
| - private CloudEvent newRequestMessage(String message) { |
123 |
| - return CloudEventsTestBuilder.newMessage(String.format("{\"userInput\": \"%s\"}", message), "org.acme.chatbot.request"); |
124 |
| - } |
125 |
| - |
126 |
| - private CloudEvent newFinalizeMessage() { |
127 |
| - return CloudEventsTestBuilder.newMessage("", "org.acme.chatbot.finalize"); |
128 |
| - } |
| 153 | + private CloudEvent newFinalizeMessage() { |
| 154 | + return CloudEventsTestBuilder.newMessage("", "org.acme.chatbot.finalize"); |
| 155 | + } |
129 | 156 | }
|
0 commit comments