Skip to content

Commit 9a1ea45

Browse files
committed
[Fix #744] Improving EventPublisher/EventConsumer
There might be more that one EventPublisher EventPublishers are either added programatically or through service loader. There will be only one consumer. If user want to handler more than one consuming event broker, he should provide an specific implementation that deals with them. The one consumer is added programatically (priortiy) or default to service loader. If not eventConsumer was provides, InMemoryEvents is used as event consumer and publisher. ChatBotIT and LifeCycleEventTest has been changed accordingly. Signed-off-by: Francisco Javier Tirado Sarti <[email protected]>
1 parent 6d23850 commit 9a1ea45

File tree

6 files changed

+124
-71
lines changed

6 files changed

+124
-71
lines changed

fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,12 @@
2828
import dev.langchain4j.memory.chat.MessageWindowChatMemory;
2929
import io.cloudevents.CloudEvent;
3030
import io.cloudevents.core.v1.CloudEventBuilder;
31-
import io.serverlessworkflow.api.types.EventFilter;
32-
import io.serverlessworkflow.api.types.EventProperties;
3331
import io.serverlessworkflow.api.types.Workflow;
3432
import io.serverlessworkflow.impl.WorkflowApplication;
3533
import io.serverlessworkflow.impl.WorkflowInstance;
3634
import io.serverlessworkflow.impl.WorkflowModel;
3735
import io.serverlessworkflow.impl.WorkflowStatus;
36+
import io.serverlessworkflow.impl.events.InMemoryEvents;
3837
import java.net.URI;
3938
import java.time.OffsetDateTime;
4039
import java.util.Map;
@@ -103,25 +102,15 @@ void chat_bot() {
103102
.emit(emit -> emit.event(e -> e.type("org.acme.chatbot.finished"))))
104103
.build();
105104

106-
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
107-
app.eventConsumer()
108-
.register(
109-
app.eventConsumer()
110-
.listen(
111-
new EventFilter()
112-
.withWith(new EventProperties().withType("org.acme.chatbot.reply")),
113-
app),
114-
ce -> replyEvents.add((CloudEvent) ce));
115-
116-
app.eventConsumer()
117-
.register(
118-
app.eventConsumer()
119-
.listen(
120-
new EventFilter()
121-
.withWith(new EventProperties().withType("org.acme.chatbot.finished")),
122-
app),
123-
ce -> finishedEvents.add((CloudEvent) ce));
105+
InMemoryEvents eventBroker = new InMemoryEvents();
106+
eventBroker.register("org.acme.chatbot.reply", ce -> replyEvents.add((CloudEvent) ce));
107+
eventBroker.register("org.acme.chatbot.finished", ce -> finishedEvents.add((CloudEvent) ce));
124108

109+
try (WorkflowApplication app =
110+
WorkflowApplication.builder()
111+
.withEventConsumer(eventBroker)
112+
.withEventPublisher(eventBroker)
113+
.build()) {
125114
final WorkflowInstance waitingInstance =
126115
app.workflowDefinition(listenWorkflow).instance(Map.of());
127116
final CompletableFuture<WorkflowModel> runningModel = waitingInstance.start();
@@ -130,12 +119,12 @@ void chat_bot() {
130119
assertEquals(WorkflowStatus.WAITING, waitingInstance.status());
131120

132121
// Publish the events
133-
app.eventPublisher().publish(newMessageEvent("Hello World!"));
122+
eventBroker.publish(newMessageEvent("Hello World!"));
134123
CloudEvent reply = replyEvents.poll(60, TimeUnit.SECONDS);
135124
assertNotNull(reply);
136125

137126
// Empty message completes the workflow
138-
app.eventPublisher().publish(newMessageEvent("", "org.acme.chatbot.finalize"));
127+
eventBroker.publish(newMessageEvent("", "org.acme.chatbot.finalize"));
139128
CloudEvent finished = finishedEvents.poll(60, TimeUnit.SECONDS);
140129
assertNotNull(finished);
141130
assertThat(finishedEvents).isEmpty();
@@ -145,6 +134,7 @@ void chat_bot() {
145134

146135
} catch (InterruptedException e) {
147136
fail(e.getMessage());
137+
} finally {
148138
}
149139
}
150140

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.serverlessworkflow.impl.resources.StaticResource;
3333
import io.serverlessworkflow.impl.schema.SchemaValidator;
3434
import io.serverlessworkflow.impl.schema.SchemaValidatorFactory;
35+
import java.util.ArrayList;
3536
import java.util.Collection;
3637
import java.util.Collections;
3738
import java.util.Map;
@@ -58,7 +59,7 @@ public class WorkflowApplication implements AutoCloseable {
5859
private final ExecutorServiceFactory executorFactory;
5960
private final RuntimeDescriptorFactory runtimeDescriptorFactory;
6061
private final EventConsumer<?, ?> eventConsumer;
61-
private final EventPublisher eventPublisher;
62+
private final Collection<EventPublisher> eventPublishers;
6263
private final boolean lifeCycleCEPublishingEnabled;
6364

6465
private WorkflowApplication(Builder builder) {
@@ -73,7 +74,7 @@ private WorkflowApplication(Builder builder) {
7374
this.listeners = builder.listeners != null ? builder.listeners : Collections.emptySet();
7475
this.definitions = new ConcurrentHashMap<>();
7576
this.eventConsumer = builder.eventConsumer;
76-
this.eventPublisher = builder.eventPublisher;
77+
this.eventPublishers = builder.eventPublishers;
7778
this.lifeCycleCEPublishingEnabled = builder.lifeCycleCEPublishingEnabled;
7879
}
7980

@@ -101,8 +102,8 @@ public Collection<WorkflowExecutionListener> listeners() {
101102
return listeners;
102103
}
103104

104-
public EventPublisher eventPublisher() {
105-
return eventPublisher;
105+
public Collection<EventPublisher> eventPublishers() {
106+
return eventPublishers;
106107
}
107108

108109
public WorkflowIdFactory idFactory() {
@@ -144,7 +145,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
144145
private WorkflowIdFactory idFactory = () -> UlidCreator.getMonotonicUlid().toString();
145146
private ExecutorServiceFactory executorFactory = new DefaultExecutorServiceFactory();
146147
private EventConsumer<?, ?> eventConsumer;
147-
private EventPublisher eventPublisher;
148+
private Collection<EventPublisher> eventPublishers = new ArrayList<>();
148149
private RuntimeDescriptorFactory descriptorFactory =
149150
() -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap());
150151
private boolean lifeCycleCEPublishingEnabled = true;
@@ -201,10 +202,13 @@ public Builder withDescriptorFactory(RuntimeDescriptorFactory factory) {
201202
return this;
202203
}
203204

204-
public Builder withEventHandler(
205-
EventPublisher eventPublisher, EventConsumer<?, ?> eventConsumer) {
205+
public Builder withEventConsumer(EventConsumer<?, ?> eventConsumer) {
206206
this.eventConsumer = eventConsumer;
207-
this.eventPublisher = eventPublisher;
207+
return this;
208+
}
209+
210+
public Builder withEventPublisher(EventPublisher eventPublisher) {
211+
this.eventPublishers.add(eventPublisher);
208212
return this;
209213
}
210214

@@ -227,10 +231,19 @@ public WorkflowApplication build() {
227231
.findFirst()
228232
.orElseGet(() -> DefaultTaskExecutorFactory.get());
229233
}
230-
if (eventConsumer == null && eventPublisher == null) {
231-
InMemoryEvents inMemory = new InMemoryEvents(executorFactory);
232-
eventPublisher = inMemory;
233-
eventConsumer = inMemory;
234+
ServiceLoader.load(EventPublisher.class).forEach(e -> eventPublishers.add(e));
235+
if (eventConsumer == null) {
236+
eventConsumer =
237+
ServiceLoader.load(EventConsumer.class)
238+
.findFirst()
239+
.orElseGet(
240+
() -> {
241+
InMemoryEvents inMemory = new InMemoryEvents(executorFactory);
242+
if (eventPublishers.isEmpty()) {
243+
eventPublishers.add(inMemory);
244+
}
245+
return inMemory;
246+
});
234247
}
235248
return new WorkflowApplication(this);
236249
}
@@ -250,8 +263,11 @@ public WorkflowDefinition workflowDefinition(Workflow workflow) {
250263
@Override
251264
public void close() {
252265
safeClose(executorFactory);
253-
safeClose(eventPublisher);
266+
for (EventPublisher eventPublisher : eventPublishers) {
267+
safeClose(eventPublisher);
268+
}
254269
safeClose(eventConsumer);
270+
255271
for (WorkflowDefinition definition : definitions.values()) {
256272
safeClose(definition);
257273
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.serverlessworkflow.impl.events;
1717

1818
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.impl.DefaultExecutorServiceFactory;
1920
import io.serverlessworkflow.impl.ExecutorServiceFactory;
2021
import java.util.Map;
2122
import java.util.concurrent.CompletableFuture;
@@ -29,6 +30,10 @@
2930
*/
3031
public class InMemoryEvents extends AbstractTypeConsumer implements EventPublisher {
3132

33+
public InMemoryEvents() {
34+
this(new DefaultExecutorServiceFactory());
35+
}
36+
3237
public InMemoryEvents(ExecutorServiceFactory serviceFactory) {
3338
this.serviceFactory = serviceFactory;
3439
}
@@ -40,7 +45,7 @@ public InMemoryEvents(ExecutorServiceFactory serviceFactory) {
4045
private AtomicReference<Consumer<CloudEvent>> allConsumerRef = new AtomicReference<>();
4146

4247
@Override
43-
protected void register(String topicName, Consumer<CloudEvent> consumer) {
48+
public void register(String topicName, Consumer<CloudEvent> consumer) {
4449
topicMap.put(topicName, consumer);
4550
}
4651

@@ -77,6 +82,7 @@ protected void unregisterFromAll() {
7782

7883
@Override
7984
public void close() {
85+
topicMap.clear();
8086
serviceFactory.close();
8187
}
8288
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,13 @@
3333
import io.serverlessworkflow.impl.WorkflowUtils;
3434
import io.serverlessworkflow.impl.WorkflowValueResolver;
3535
import io.serverlessworkflow.impl.events.CloudEventUtils;
36+
import io.serverlessworkflow.impl.events.EventPublisher;
3637
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
3738
import io.serverlessworkflow.impl.resources.ResourceLoader;
3839
import java.net.URI;
3940
import java.time.OffsetDateTime;
41+
import java.util.ArrayList;
42+
import java.util.Collection;
4043
import java.util.Map;
4144
import java.util.Optional;
4245
import java.util.concurrent.CompletableFuture;
@@ -74,11 +77,15 @@ private EmitExecutor(EmitExecutorBuilder builder) {
7477
@Override
7578
protected CompletableFuture<WorkflowModel> internalExecute(
7679
WorkflowContext workflow, TaskContext taskContext) {
77-
return workflow
78-
.definition()
79-
.application()
80-
.eventPublisher()
81-
.publish(buildCloudEvent(workflow, taskContext))
80+
Collection<EventPublisher> eventPublishers =
81+
workflow.definition().application().eventPublishers();
82+
CloudEvent ce = buildCloudEvent(workflow, taskContext);
83+
Collection<CompletableFuture<Void>> allCompletables = new ArrayList<>();
84+
for (EventPublisher eventPublisher : eventPublishers) {
85+
allCompletables.add(eventPublisher.publish(ce));
86+
}
87+
return CompletableFuture.allOf(
88+
allCompletables.toArray(new CompletableFuture[allCompletables.size()]))
8289
.thenApply(v -> taskContext.input());
8390
}
8491

0 commit comments

Comments
 (0)