@@ -60,7 +60,8 @@ void chat_bot() {
6060 // .chatMemoryProvider(memoryId -> MessageWindowChatMemory.withMaxMessages(10))
6161 .outputName ("conversation" )
6262 .build ());
63- BlockingQueue <CloudEvent > publishedEvents = new LinkedBlockingQueue <>();
63+ BlockingQueue <CloudEvent > replyEvents = new LinkedBlockingQueue <>();
64+ BlockingQueue <CloudEvent > finishedEvents = new LinkedBlockingQueue <>();
6465
6566 // 1. listen to an event containing `message` key in the body
6667 // 2. if contains, call the agent, if not end the workflow
@@ -71,7 +72,13 @@ void chat_bot() {
7172 t ->
7273 t .listen (
7374 l ->
74- l .until (message -> "" .equals (message .get ("userInput" )), Map .class )
75+ l .until (
76+ message ->
77+ !message
78+ .getOrDefault ("userInput" , "" )
79+ .toString ()
80+ .isEmpty (),
81+ Map .class )
7582 .any (
7683 c ->
7784 c .with (event -> event .type ("org.acme.chatbot.request" )))
@@ -87,7 +94,16 @@ void chat_bot() {
8794 e ->
8895 e .type (
8996 "org.acme.chatbot.reply" ))))))
90- .emit (emit -> emit .event (e -> e .type ("org.acme.chatbot.finished" ))))
97+ .emit (
98+ emit ->
99+ emit .when (
100+ message ->
101+ message
102+ .getOrDefault ("userInput" , "" )
103+ .toString ()
104+ .isEmpty (),
105+ Map .class )
106+ .event (e -> e .type ("org.acme.chatbot.finished" ))))
91107 .build ();
92108
93109 try (WorkflowApplication app = WorkflowApplication .builder ().build ()) {
@@ -98,7 +114,7 @@ void chat_bot() {
98114 new EventFilter ()
99115 .withWith (new EventProperties ().withType ("org.acme.chatbot.reply" )),
100116 app ),
101- ce -> publishedEvents .add ((CloudEvent ) ce ));
117+ ce -> replyEvents .add ((CloudEvent ) ce ));
102118
103119 app .eventConsumer ()
104120 .register (
@@ -107,7 +123,7 @@ void chat_bot() {
107123 new EventFilter ()
108124 .withWith (new EventProperties ().withType ("org.acme.chatbot.finished" )),
109125 app ),
110- ce -> publishedEvents .add ((CloudEvent ) ce ));
126+ ce -> finishedEvents .add ((CloudEvent ) ce ));
111127
112128 final WorkflowInstance waitingInstance =
113129 app .workflowDefinition (listenWorkflow ).instance (Map .of ());
@@ -118,12 +134,12 @@ void chat_bot() {
118134
119135 // Publish the event
120136 app .eventPublisher ().publish (newMessageEvent ("Hello World!" ));
121- CloudEvent reply = publishedEvents .poll (60 , TimeUnit .SECONDS );
137+ CloudEvent reply = replyEvents .poll (60 , TimeUnit .SECONDS );
122138 assertNotNull (reply );
123139
124140 // Empty message completes the workflow
125141 app .eventPublisher ().publish (newMessageEvent ("" ));
126- CloudEvent finished = publishedEvents .poll (60 , TimeUnit .SECONDS );
142+ CloudEvent finished = finishedEvents .poll (60 , TimeUnit .SECONDS );
127143 assertNotNull (finished );
128144
129145 assertThat (runningModel ).isCompleted ();
@@ -158,7 +174,7 @@ void mixed_workflow() {
158174 AgenticServices .agentBuilder (Agents .ChatBot .class )
159175 .chatModel (Models .BASE_MODEL )
160176 .chatMemoryProvider (memoryId -> MessageWindowChatMemory .withMaxMessages (10 ))
161- .outputName ("message " )
177+ .outputName ("userInput " )
162178 .build ());
163179
164180 final Workflow mixedWorkflow =
@@ -170,7 +186,7 @@ void mixed_workflow() {
170186 callJ .function (
171187 input -> {
172188 System .out .println (input );
173- return Map .of ("message " , input );
189+ return Map .of ("userInput " , input );
174190 },
175191 String .class ))
176192 .agent (chatBot )
0 commit comments