diff --git a/examples/events/pom.xml b/examples/events/pom.xml index e1461497..15678a1a 100644 --- a/examples/events/pom.xml +++ b/examples/events/pom.xml @@ -15,6 +15,10 @@ + + io.serverlessworkflow + serverlessworkflow-api + io.serverlessworkflow serverlessworkflow-impl-jackson diff --git a/examples/simpleGet/pom.xml b/examples/simpleGet/pom.xml index f8d853b3..656dacec 100644 --- a/examples/simpleGet/pom.xml +++ b/examples/simpleGet/pom.xml @@ -14,6 +14,10 @@ + + io.serverlessworkflow + serverlessworkflow-api + io.serverlessworkflow serverlessworkflow-impl-jackson @@ -23,8 +27,8 @@ serverlessworkflow-impl-http - org.glassfish.jersey.media - jersey-media-json-jackson + org.glassfish.jersey.media + jersey-media-json-jackson org.glassfish.jersey.core diff --git a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java index 397f2183..8f696ef1 100644 --- a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java +++ b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java @@ -28,13 +28,12 @@ import dev.langchain4j.memory.chat.MessageWindowChatMemory; import io.cloudevents.CloudEvent; import io.cloudevents.core.v1.CloudEventBuilder; -import io.serverlessworkflow.api.types.EventFilter; -import io.serverlessworkflow.api.types.EventProperties; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.events.InMemoryEvents; import java.net.URI; import java.time.OffsetDateTime; import java.util.Map; @@ -103,25 +102,15 @@ void chat_bot() { .emit(emit -> emit.event(e -> e.type("org.acme.chatbot.finished")))) .build(); - try (WorkflowApplication app = WorkflowApplication.builder().build()) { - app.eventConsumer() - .register( - app.eventConsumer() - .listen( - new EventFilter() - .withWith(new EventProperties().withType("org.acme.chatbot.reply")), - app), - ce -> replyEvents.add((CloudEvent) ce)); - - app.eventConsumer() - .register( - app.eventConsumer() - .listen( - new EventFilter() - .withWith(new EventProperties().withType("org.acme.chatbot.finished")), - app), - ce -> finishedEvents.add((CloudEvent) ce)); + InMemoryEvents eventBroker = new InMemoryEvents(); + eventBroker.register("org.acme.chatbot.reply", ce -> replyEvents.add((CloudEvent) ce)); + eventBroker.register("org.acme.chatbot.finished", ce -> finishedEvents.add((CloudEvent) ce)); + try (WorkflowApplication app = + WorkflowApplication.builder() + .withEventConsumer(eventBroker) + .withEventPublisher(eventBroker) + .build()) { final WorkflowInstance waitingInstance = app.workflowDefinition(listenWorkflow).instance(Map.of()); final CompletableFuture runningModel = waitingInstance.start(); @@ -130,12 +119,12 @@ void chat_bot() { assertEquals(WorkflowStatus.WAITING, waitingInstance.status()); // Publish the events - app.eventPublisher().publish(newMessageEvent("Hello World!")); + eventBroker.publish(newMessageEvent("Hello World!")); CloudEvent reply = replyEvents.poll(60, TimeUnit.SECONDS); assertNotNull(reply); // Empty message completes the workflow - app.eventPublisher().publish(newMessageEvent("", "org.acme.chatbot.finalize")); + eventBroker.publish(newMessageEvent("", "org.acme.chatbot.finalize")); CloudEvent finished = finishedEvents.poll(60, TimeUnit.SECONDS); assertNotNull(finished); assertThat(finishedEvents).isEmpty(); @@ -145,6 +134,7 @@ void chat_bot() { } catch (InterruptedException e) { fail(e.getMessage()); + } finally { } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index e1cb5838..bc70f8d2 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -32,6 +32,7 @@ import io.serverlessworkflow.impl.resources.StaticResource; import io.serverlessworkflow.impl.schema.SchemaValidator; import io.serverlessworkflow.impl.schema.SchemaValidatorFactory; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -58,7 +59,8 @@ public class WorkflowApplication implements AutoCloseable { private final ExecutorServiceFactory executorFactory; private final RuntimeDescriptorFactory runtimeDescriptorFactory; private final EventConsumer eventConsumer; - private final EventPublisher eventPublisher; + private final Collection eventPublishers; + private final boolean lifeCycleCEPublishingEnabled; private WorkflowApplication(Builder builder) { this.taskFactory = builder.taskFactory; @@ -72,7 +74,8 @@ private WorkflowApplication(Builder builder) { this.listeners = builder.listeners != null ? builder.listeners : Collections.emptySet(); this.definitions = new ConcurrentHashMap<>(); this.eventConsumer = builder.eventConsumer; - this.eventPublisher = builder.eventPublisher; + this.eventPublishers = builder.eventPublishers; + this.lifeCycleCEPublishingEnabled = builder.lifeCycleCEPublishingEnabled; } public TaskExecutorFactory taskFactory() { @@ -99,8 +102,8 @@ public Collection listeners() { return listeners; } - public EventPublisher eventPublisher() { - return eventPublisher; + public Collection eventPublishers() { + return eventPublishers; } public WorkflowIdFactory idFactory() { @@ -142,9 +145,10 @@ public SchemaValidator getValidator(SchemaInline inline) { private WorkflowIdFactory idFactory = () -> UlidCreator.getMonotonicUlid().toString(); private ExecutorServiceFactory executorFactory = new DefaultExecutorServiceFactory(); private EventConsumer eventConsumer; - private EventPublisher eventPublisher; + private Collection eventPublishers = new ArrayList<>(); private RuntimeDescriptorFactory descriptorFactory = () -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap()); + private boolean lifeCycleCEPublishingEnabled = true; private Builder() {} @@ -168,6 +172,11 @@ public Builder withResourceLoaderFactory(ResourceLoaderFactory resourceLoader) { return this; } + public Builder disableLifeCycleCEPublishing() { + this.lifeCycleCEPublishingEnabled = false; + return this; + } + public Builder withExecutorFactory(ExecutorServiceFactory executorFactory) { this.executorFactory = executorFactory; return this; @@ -193,10 +202,13 @@ public Builder withDescriptorFactory(RuntimeDescriptorFactory factory) { return this; } - public Builder withEventHandler( - EventPublisher eventPublisher, EventConsumer eventConsumer) { + public Builder withEventConsumer(EventConsumer eventConsumer) { this.eventConsumer = eventConsumer; - this.eventPublisher = eventPublisher; + return this; + } + + public Builder withEventPublisher(EventPublisher eventPublisher) { + this.eventPublishers.add(eventPublisher); return this; } @@ -219,10 +231,19 @@ public WorkflowApplication build() { .findFirst() .orElseGet(() -> DefaultTaskExecutorFactory.get()); } - if (eventConsumer == null && eventPublisher == null) { - InMemoryEvents inMemory = new InMemoryEvents(executorFactory); - eventPublisher = inMemory; - eventConsumer = inMemory; + ServiceLoader.load(EventPublisher.class).forEach(e -> eventPublishers.add(e)); + if (eventConsumer == null) { + eventConsumer = + ServiceLoader.load(EventConsumer.class) + .findFirst() + .orElseGet( + () -> { + InMemoryEvents inMemory = new InMemoryEvents(executorFactory); + if (eventPublishers.isEmpty()) { + eventPublishers.add(inMemory); + } + return inMemory; + }); } return new WorkflowApplication(this); } @@ -242,8 +263,11 @@ public WorkflowDefinition workflowDefinition(Workflow workflow) { @Override public void close() { safeClose(executorFactory); - safeClose(eventPublisher); + for (EventPublisher eventPublisher : eventPublishers) { + safeClose(eventPublisher); + } safeClose(eventConsumer); + for (WorkflowDefinition definition : definitions.values()) { safeClose(definition); } @@ -278,4 +302,8 @@ public EventConsumer eventConsumer() { public ExecutorService executorService() { return executorFactory.get(); } + + public boolean isLifeCycleCEPublishingEnabled() { + return lifeCycleCEPublishingEnabled; + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java index 4c3abd18..796d2e68 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java @@ -176,6 +176,10 @@ public boolean suspend() { statusLock.lock(); if (TaskExecutorHelper.isActive(status.get())) { suspended = new CompletableFuture(); + workflowContext.instance().status(WorkflowStatus.SUSPENDED); + publishEvent( + workflowContext, + l -> l.onWorkflowSuspended(new WorkflowSuspendedEvent(workflowContext))); return true; } else { return false; @@ -197,11 +201,11 @@ public boolean resume() { publishEvent( workflowContext, l -> l.onTaskResumed(new TaskResumedEvent(workflowContext, suspendedTask))); - publishEvent( - workflowContext, l -> l.onWorkflowResumed(new WorkflowResumedEvent(workflowContext))); } else { suspended = null; } + publishEvent( + workflowContext, l -> l.onWorkflowResumed(new WorkflowResumedEvent(workflowContext))); return true; } else { return false; @@ -216,12 +220,8 @@ public CompletableFuture completedChecks(TaskContext t) { statusLock.lock(); if (suspended != null) { suspendedTask = t; - workflowContext.instance().status(WorkflowStatus.SUSPENDED); publishEvent( workflowContext, l -> l.onTaskSuspended(new TaskSuspendedEvent(workflowContext, t))); - publishEvent( - workflowContext, - l -> l.onWorkflowSuspended(new WorkflowSuspendedEvent(workflowContext))); return suspended; } if (cancelled != null) { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java index 2bcf96ef..f2d6cba3 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java @@ -16,6 +16,7 @@ package io.serverlessworkflow.impl.events; import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.DefaultExecutorServiceFactory; import io.serverlessworkflow.impl.ExecutorServiceFactory; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -29,6 +30,10 @@ */ public class InMemoryEvents extends AbstractTypeConsumer implements EventPublisher { + public InMemoryEvents() { + this(new DefaultExecutorServiceFactory()); + } + public InMemoryEvents(ExecutorServiceFactory serviceFactory) { this.serviceFactory = serviceFactory; } @@ -40,7 +45,7 @@ public InMemoryEvents(ExecutorServiceFactory serviceFactory) { private AtomicReference> allConsumerRef = new AtomicReference<>(); @Override - protected void register(String topicName, Consumer consumer) { + public void register(String topicName, Consumer consumer) { topicMap.put(topicName, consumer); } @@ -77,6 +82,7 @@ protected void unregisterFromAll() { @Override public void close() { + topicMap.clear(); serviceFactory.close(); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java index 8e858335..5c9d10ba 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java @@ -33,10 +33,12 @@ import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.WorkflowValueResolver; import io.serverlessworkflow.impl.events.CloudEventUtils; +import io.serverlessworkflow.impl.events.EventPublisher; import io.serverlessworkflow.impl.expressions.ExpressionDescriptor; import io.serverlessworkflow.impl.resources.ResourceLoader; import java.net.URI; import java.time.OffsetDateTime; +import java.util.Collection; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -74,11 +76,13 @@ private EmitExecutor(EmitExecutorBuilder builder) { @Override protected CompletableFuture internalExecute( WorkflowContext workflow, TaskContext taskContext) { - return workflow - .definition() - .application() - .eventPublisher() - .publish(buildCloudEvent(workflow, taskContext)) + Collection eventPublishers = + workflow.definition().application().eventPublishers(); + CloudEvent ce = buildCloudEvent(workflow, taskContext); + return CompletableFuture.allOf( + eventPublishers.stream() + .map(eventPublisher -> eventPublisher.publish(ce)) + .toArray(size -> new CompletableFuture[size])) .thenApply(v -> taskContext.input()); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java index fee07613..ade29db3 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java @@ -18,13 +18,14 @@ import static io.serverlessworkflow.impl.lifecycle.ce.WorkflowDefinitionCEData.ref; import static io.serverlessworkflow.impl.lifecycle.ce.WorkflowErrorCEData.error; +import io.cloudevents.CloudEvent; import io.cloudevents.CloudEventData; import io.cloudevents.core.builder.CloudEventBuilder; import io.cloudevents.core.data.PojoCloudEventData; import io.cloudevents.core.data.PojoCloudEventData.ToBytes; +import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.events.CloudEventUtils; -import io.serverlessworkflow.impl.events.EventPublisher; import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent; import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; import io.serverlessworkflow.impl.lifecycle.TaskEvent; @@ -41,187 +42,272 @@ import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent; import java.time.OffsetDateTime; +import java.util.Collection; +import java.util.Set; +import java.util.function.Function; public abstract class AbstractLifeCyclePublisher implements WorkflowExecutionListener { + private static final String TASK_STARTED = "io.serverlessworkflow.task.started.v1"; + private static final String TASK_COMPLETED = "io.serverlessworkflow.task.completed.v1"; + private static final String TASK_SUSPENDED = "io.serverlessworkflow.task.suspended.v1"; + private static final String TASK_RESUMED = "io.serverlessworkflow.task.resumed.v1"; + private static final String TASK_FAULTED = "io.serverlessworkflow.task.faulted.v1"; + private static final String TASK_CANCELLED = "io.serverlessworkflow.task.cancelled.v1"; + + private static final String WORKFLOW_STARTED = "io.serverlessworkflow.workflow.started.v1"; + private static final String WORKFLOW_COMPLETED = "io.serverlessworkflow.workflow.completed.v1"; + private static final String WORKFLOW_SUSPENDED = "io.serverlessworkflow.workflow.suspended.v1"; + private static final String WORKFLOW_RESUMED = "io.serverlessworkflow.workflow.resumed.v1"; + private static final String WORKFLOW_FAULTED = "io.serverlessworkflow.workflow.faulted.v1"; + private static final String WORKFLOW_CANCELLED = "io.serverlessworkflow.workflow.cancelled.v1"; + + public static Collection getLifeCycleTypes() { + return Set.of( + TASK_STARTED, + TASK_COMPLETED, + TASK_SUSPENDED, + TASK_RESUMED, + TASK_FAULTED, + TASK_CANCELLED, + WORKFLOW_STARTED, + WORKFLOW_COMPLETED, + WORKFLOW_SUSPENDED, + WORKFLOW_RESUMED, + WORKFLOW_FAULTED, + WORKFLOW_CANCELLED); + } + @Override - public void onTaskStarted(TaskStartedEvent ev) { - eventPublisher(ev) - .publish( + public void onTaskStarted(TaskStartedEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( new TaskStartedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()), this::convert)) - .withType("io.serverlessworkflow.task.started.v1") + .withType(TASK_STARTED) .build()); } @Override - public void onTaskCompleted(TaskCompletedEvent ev) { - eventPublisher(ev) - .publish( + public void onTaskCompleted(TaskCompletedEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( new TaskCompletedCEData( id(ev), pos(ev), ref(ev), ev.eventDate(), output(ev)), this::convert)) - .withType("io.serverlessworkflow.task.completed.v1") + .withType(TASK_COMPLETED) .build()); } @Override - public void onTaskSuspended(TaskSuspendedEvent ev) { - eventPublisher(ev) - .publish( + public void onTaskSuspended(TaskSuspendedEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( new TaskSuspendedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()), this::convert)) - .withType("io.serverlessworkflow.task.suspended.v1") + .withType(TASK_SUSPENDED) .build()); } @Override - public void onTaskResumed(TaskResumedEvent ev) { - eventPublisher(ev) - .publish( + public void onTaskResumed(TaskResumedEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( new TaskResumedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()), this::convert)) - .withType("io.serverlessworkflow.task.resumed.v1") + .withType(TASK_RESUMED) .build()); } @Override - public void onTaskCancelled(TaskCancelledEvent ev) { - eventPublisher(ev) - .publish( + public void onTaskCancelled(TaskCancelledEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( new TaskCancelledCEData(id(ev), pos(ev), ref(ev), ev.eventDate()), this::convert)) - .withType("io.serverlessworkflow.task.cancelled.v1") + .withType(TASK_CANCELLED) .build()); } @Override - public void onTaskFailed(TaskFailedEvent ev) { - eventPublisher(ev) - .publish( + public void onTaskFailed(TaskFailedEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( new TaskFailedCEData(id(ev), pos(ev), ref(ev), ev.eventDate(), error(ev)), this::convert)) - .withType("io.serverlessworkflow.task.faulted.v1") + .withType(TASK_FAULTED) .build()); } @Override - public void onWorkflowStarted(WorkflowStartedEvent ev) { - eventPublisher(ev) - .publish( + public void onWorkflowStarted(WorkflowStartedEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( new WorkflowStartedCEData(id(ev), ref(ev), ev.eventDate()), this::convert)) - .withType("io.serverlessworkflow.workflow.started.v1") + .withType(WORKFLOW_STARTED) .build()); } @Override - public void onWorkflowSuspended(WorkflowSuspendedEvent ev) { - eventPublisher(ev) - .publish( + public void onWorkflowSuspended(WorkflowSuspendedEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( new WorkflowSuspendedCEData(id(ev), ref(ev), ev.eventDate()), this::convert)) - .withType("io.serverlessworkflow.workflow.suspended.v1") + .withType(WORKFLOW_SUSPENDED) .build()); } @Override - public void onWorkflowCancelled(WorkflowCancelledEvent ev) { - eventPublisher(ev) - .publish( + public void onWorkflowCancelled(WorkflowCancelledEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( new WorkflowCancelledCEData(id(ev), ref(ev), ev.eventDate()), this::convert)) - .withType("io.serverlessworkflow.workflow.cancelled.v1") + .withType(WORKFLOW_CANCELLED) .build()); } @Override - public void onWorkflowResumed(WorkflowResumedEvent ev) { - eventPublisher(ev) - .publish( + public void onWorkflowResumed(WorkflowResumedEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( new WorkflowResumedCEData(id(ev), ref(ev), ev.eventDate()), this::convert)) - .withType("io.serverlessworkflow.workflow.resumed.v1") + .withType(WORKFLOW_RESUMED) .build()); } @Override - public void onWorkflowCompleted(WorkflowCompletedEvent ev) { - eventPublisher(ev) - .publish( + public void onWorkflowCompleted(WorkflowCompletedEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( new WorkflowCompletedCEData(id(ev), ref(ev), ev.eventDate(), output(ev)), this::convert)) - .withType("io.serverlessworkflow.workflow.completed.v1") + .withType(WORKFLOW_COMPLETED) .build()); } @Override - public void onWorkflowFailed(WorkflowFailedEvent ev) { - eventPublisher(ev) - .publish( + public void onWorkflowFailed(WorkflowFailedEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( new WorkflowFailedCEData(id(ev), ref(ev), ev.eventDate(), error(ev)), this::convert)) - .withType("io.serverlessworkflow.workflow.faulted.v1") + .withType(WORKFLOW_FAULTED) .build()); } - protected abstract byte[] convert(WorkflowStartedCEData data); + protected byte[] convert(WorkflowStartedCEData data) { + return convertToBytes(data); + } - protected abstract byte[] convert(WorkflowSuspendedCEData data); + protected byte[] convert(WorkflowCompletedCEData data) { + return convertToBytes(data); + } - protected abstract byte[] convert(WorkflowResumedCEData data); + protected byte[] convert(TaskStartedCEData data) { + return convertToBytes(data); + } - protected abstract byte[] convert(WorkflowCancelledCEData data); + protected byte[] convert(TaskCompletedCEData data) { + return convertToBytes(data); + } - protected abstract byte[] convert(WorkflowCompletedCEData data); + protected byte[] convert(TaskFailedCEData data) { + return convertToBytes(data); + } - protected abstract byte[] convert(TaskStartedCEData data); + protected byte[] convert(WorkflowFailedCEData data) { + return convertToBytes(data); + } - protected abstract byte[] convert(TaskCompletedCEData data); + protected byte[] convert(WorkflowSuspendedCEData data) { + return convertToBytes(data); + } - protected abstract byte[] convert(TaskFailedCEData data); + protected byte[] convert(WorkflowResumedCEData data) { + return convertToBytes(data); + } - protected abstract byte[] convert(TaskSuspendedCEData data); + protected byte[] convert(WorkflowCancelledCEData data) { + return convertToBytes(data); + } - protected abstract byte[] convert(TaskCancelledCEData data); + protected byte[] convert(TaskSuspendedCEData data) { + return convertToBytes(data); + } + + protected byte[] convert(TaskCancelledCEData data) { + return convertToBytes(data); + } + + protected byte[] convert(TaskResumedCEData data) { + return convertToBytes(data); + } - protected abstract byte[] convert(TaskResumedCEData data); + protected abstract byte[] convertToBytes(T data); - protected abstract byte[] convert(WorkflowFailedCEData data); + protected void publish(T ev, Function ceFunction) { + WorkflowApplication appl = ev.workflowContext().definition().application(); + if (appl.isLifeCycleCEPublishingEnabled()) { + publish(appl, ceFunction.apply(ev)); + } + } + + /* By default, generated cloud events are published, if user has not disabled them at application level, + * using application event publishers. That might be changed if needed by children by overriding this method + */ + protected void publish(WorkflowApplication application, CloudEvent ce) { + application.eventPublishers().forEach(p -> p.publish(ce)); + } private static CloudEventData cloudEventData(T data, ToBytes toBytes) { return PojoCloudEventData.wrap(data, toBytes); @@ -246,10 +332,6 @@ private static Object output(WorkflowEvent ev) { return from(ev.workflowContext().instanceData().output()); } - private static EventPublisher eventPublisher(WorkflowEvent ev) { - return ev.workflowContext().definition().application().eventPublisher(); - } - private static Object output(TaskEvent ev) { return from(ev.taskContext().output()); } diff --git a/impl/jackson/src/main/java/io/serverlessworkflow/impl/jackson/events/JacksonLifeCyclePublisher.java b/impl/jackson/src/main/java/io/serverlessworkflow/impl/jackson/events/JacksonLifeCyclePublisher.java index c186c24f..66a7618a 100644 --- a/impl/jackson/src/main/java/io/serverlessworkflow/impl/jackson/events/JacksonLifeCyclePublisher.java +++ b/impl/jackson/src/main/java/io/serverlessworkflow/impl/jackson/events/JacksonLifeCyclePublisher.java @@ -18,83 +18,12 @@ import com.fasterxml.jackson.core.JsonProcessingException; import io.serverlessworkflow.impl.jackson.JsonUtils; import io.serverlessworkflow.impl.lifecycle.ce.AbstractLifeCyclePublisher; -import io.serverlessworkflow.impl.lifecycle.ce.TaskCancelledCEData; -import io.serverlessworkflow.impl.lifecycle.ce.TaskCompletedCEData; -import io.serverlessworkflow.impl.lifecycle.ce.TaskFailedCEData; -import io.serverlessworkflow.impl.lifecycle.ce.TaskResumedCEData; -import io.serverlessworkflow.impl.lifecycle.ce.TaskStartedCEData; -import io.serverlessworkflow.impl.lifecycle.ce.TaskSuspendedCEData; -import io.serverlessworkflow.impl.lifecycle.ce.WorkflowCancelledCEData; -import io.serverlessworkflow.impl.lifecycle.ce.WorkflowCompletedCEData; -import io.serverlessworkflow.impl.lifecycle.ce.WorkflowFailedCEData; -import io.serverlessworkflow.impl.lifecycle.ce.WorkflowResumedCEData; -import io.serverlessworkflow.impl.lifecycle.ce.WorkflowStartedCEData; -import io.serverlessworkflow.impl.lifecycle.ce.WorkflowSuspendedCEData; import java.io.UncheckedIOException; public class JacksonLifeCyclePublisher extends AbstractLifeCyclePublisher { @Override - protected byte[] convert(WorkflowStartedCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(WorkflowCompletedCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(TaskStartedCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(TaskCompletedCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(TaskFailedCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(WorkflowFailedCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(WorkflowSuspendedCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(WorkflowResumedCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(WorkflowCancelledCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(TaskSuspendedCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(TaskCancelledCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(TaskResumedCEData data) { - return genericConvert(data); - } - - protected byte[] genericConvert(T data) { + protected byte[] convertToBytes(T data) { try { return JsonUtils.mapper().writeValueAsBytes(data); } catch (JsonProcessingException e) { diff --git a/impl/test/pom.xml b/impl/test/pom.xml index 4a6a82b3..33a81db5 100644 --- a/impl/test/pom.xml +++ b/impl/test/pom.xml @@ -12,6 +12,10 @@ io.serverlessworkflow serverlessworkflow-impl-jackson + + io.serverlessworkflow + serverlessworkflow-api + io.serverlessworkflow serverlessworkflow-impl-http diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/EventDefinitionTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/EventDefinitionTest.java index 548ba61c..d2ed4407 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/EventDefinitionTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/EventDefinitionTest.java @@ -46,7 +46,7 @@ public class EventDefinitionTest { @BeforeAll static void init() { - appl = WorkflowApplication.builder().build(); + appl = WorkflowApplication.builder().disableLifeCycleCEPublishing().build(); } @ParameterizedTest diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java index d6088a2a..cc908b1f 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java @@ -26,8 +26,8 @@ import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowStatus; -import io.serverlessworkflow.impl.events.EventRegistration; -import io.serverlessworkflow.impl.events.EventRegistrationBuilder; +import io.serverlessworkflow.impl.events.InMemoryEvents; +import io.serverlessworkflow.impl.lifecycle.ce.AbstractLifeCyclePublisher; import io.serverlessworkflow.impl.lifecycle.ce.TaskCancelledCEData; import io.serverlessworkflow.impl.lifecycle.ce.TaskCompletedCEData; import io.serverlessworkflow.impl.lifecycle.ce.TaskResumedCEData; @@ -41,7 +41,6 @@ import io.serverlessworkflow.impl.lifecycle.ce.WorkflowStartedCEData; import io.serverlessworkflow.impl.lifecycle.ce.WorkflowSuspendedCEData; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.Map; import java.util.Optional; @@ -59,24 +58,23 @@ class LifeCycleEventsTest { private WorkflowApplication appl; private Collection publishedEvents; - private Collection registrations; @BeforeEach void setup() { publishedEvents = new CopyOnWriteArrayList<>(); - appl = WorkflowApplication.builder().build(); - registrations = new ArrayList<>(); - Collection builders = appl.eventConsumer().listenToAll(appl); - - for (EventRegistrationBuilder builder : builders) { - registrations.add( - appl.eventConsumer().register(builder, ce -> publishedEvents.add((CloudEvent) ce))); + InMemoryEvents eventBroker = new InMemoryEvents(); + for (String type : AbstractLifeCyclePublisher.getLifeCycleTypes()) { + eventBroker.register(type, ce -> publishedEvents.add(ce)); } + appl = + WorkflowApplication.builder() + .withEventConsumer(eventBroker) + .withEventPublisher(eventBroker) + .build(); } @AfterEach void close() { - registrations.forEach(r -> appl.eventConsumer().unregister(r)); appl.close(); } @@ -115,9 +113,18 @@ void testSuspendResumeNotWait() .instance(Map.of()); CompletableFuture future = instance.start(); instance.suspend(); + assertThat(instance.status()).isEqualTo(WorkflowStatus.SUSPENDED); instance.resume(); assertThat(future.get(1, TimeUnit.SECONDS).asMap().orElseThrow()) .isEqualTo(Map.of("name", "Javierito")); + assertThat(instance.status()).isEqualTo(WorkflowStatus.COMPLETED); + WorkflowSuspendedCEData workflowSuspendedEvent = + assertPojoInCE( + "io.serverlessworkflow.workflow.suspended.v1", WorkflowSuspendedCEData.class); + WorkflowResumedCEData workflowResumedEvent = + assertPojoInCE("io.serverlessworkflow.workflow.resumed.v1", WorkflowResumedCEData.class); + assertThat(workflowSuspendedEvent.suspendedAt()) + .isBeforeOrEqualTo(workflowResumedEvent.resumedAt()); } @Test @@ -127,10 +134,10 @@ void testSuspendResumeWait() appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath("wait-set.yaml")) .instance(Map.of()); CompletableFuture future = instance.start(); - instance.suspend(); assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING); - Thread.sleep(550); + instance.suspend(); assertThat(instance.status()).isEqualTo(WorkflowStatus.SUSPENDED); + Thread.sleep(550); instance.resume(); assertThat(future.get(1, TimeUnit.SECONDS).asMap().orElseThrow()) .isEqualTo(Map.of("name", "Javierito"));