diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java index cb24cafc..4c3abd18 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java @@ -18,10 +18,14 @@ import static io.serverlessworkflow.impl.lifecycle.LifecycleEventsUtils.publishEvent; import io.serverlessworkflow.impl.executors.TaskExecutorHelper; +import io.serverlessworkflow.impl.lifecycle.TaskResumedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskSuspendedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent; import java.time.Instant; import java.util.Optional; import java.util.concurrent.CancellationException; @@ -190,6 +194,11 @@ public boolean resume() { CompletableFuture toBeCompleted = suspended; suspended = null; toBeCompleted.complete(suspendedTask); + publishEvent( + workflowContext, + l -> l.onTaskResumed(new TaskResumedEvent(workflowContext, suspendedTask))); + publishEvent( + workflowContext, l -> l.onWorkflowResumed(new WorkflowResumedEvent(workflowContext))); } else { suspended = null; } @@ -208,6 +217,11 @@ public CompletableFuture completedChecks(TaskContext t) { if (suspended != null) { suspendedTask = t; workflowContext.instance().status(WorkflowStatus.SUSPENDED); + publishEvent( + workflowContext, l -> l.onTaskSuspended(new TaskSuspendedEvent(workflowContext, t))); + publishEvent( + workflowContext, + l -> l.onWorkflowSuspended(new WorkflowSuspendedEvent(workflowContext))); return suspended; } if (cancelled != null) { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java index b3a30305..fee07613 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java @@ -25,15 +25,21 @@ import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.events.CloudEventUtils; import io.serverlessworkflow.impl.events.EventPublisher; +import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent; import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; import io.serverlessworkflow.impl.lifecycle.TaskEvent; import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskResumedEvent; import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskSuspendedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent; import java.time.OffsetDateTime; public abstract class AbstractLifeCyclePublisher implements WorkflowExecutionListener { @@ -65,6 +71,45 @@ public void onTaskCompleted(TaskCompletedEvent ev) { .build()); } + @Override + public void onTaskSuspended(TaskSuspendedEvent ev) { + eventPublisher(ev) + .publish( + builder() + .withData( + cloudEventData( + new TaskSuspendedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()), + this::convert)) + .withType("io.serverlessworkflow.task.suspended.v1") + .build()); + } + + @Override + public void onTaskResumed(TaskResumedEvent ev) { + eventPublisher(ev) + .publish( + builder() + .withData( + cloudEventData( + new TaskResumedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()), + this::convert)) + .withType("io.serverlessworkflow.task.resumed.v1") + .build()); + } + + @Override + public void onTaskCancelled(TaskCancelledEvent ev) { + eventPublisher(ev) + .publish( + builder() + .withData( + cloudEventData( + new TaskCancelledCEData(id(ev), pos(ev), ref(ev), ev.eventDate()), + this::convert)) + .withType("io.serverlessworkflow.task.cancelled.v1") + .build()); + } + @Override public void onTaskFailed(TaskFailedEvent ev) { eventPublisher(ev) @@ -90,6 +135,44 @@ public void onWorkflowStarted(WorkflowStartedEvent ev) { .build()); } + @Override + public void onWorkflowSuspended(WorkflowSuspendedEvent ev) { + eventPublisher(ev) + .publish( + builder() + .withData( + cloudEventData( + new WorkflowSuspendedCEData(id(ev), ref(ev), ev.eventDate()), + this::convert)) + .withType("io.serverlessworkflow.workflow.suspended.v1") + .build()); + } + + @Override + public void onWorkflowCancelled(WorkflowCancelledEvent ev) { + eventPublisher(ev) + .publish( + builder() + .withData( + cloudEventData( + new WorkflowCancelledCEData(id(ev), ref(ev), ev.eventDate()), + this::convert)) + .withType("io.serverlessworkflow.workflow.cancelled.v1") + .build()); + } + + @Override + public void onWorkflowResumed(WorkflowResumedEvent ev) { + eventPublisher(ev) + .publish( + builder() + .withData( + cloudEventData( + new WorkflowResumedCEData(id(ev), ref(ev), ev.eventDate()), this::convert)) + .withType("io.serverlessworkflow.workflow.resumed.v1") + .build()); + } + @Override public void onWorkflowCompleted(WorkflowCompletedEvent ev) { eventPublisher(ev) @@ -118,6 +201,12 @@ public void onWorkflowFailed(WorkflowFailedEvent ev) { protected abstract byte[] convert(WorkflowStartedCEData data); + protected abstract byte[] convert(WorkflowSuspendedCEData data); + + protected abstract byte[] convert(WorkflowResumedCEData data); + + protected abstract byte[] convert(WorkflowCancelledCEData data); + protected abstract byte[] convert(WorkflowCompletedCEData data); protected abstract byte[] convert(TaskStartedCEData data); @@ -126,6 +215,12 @@ public void onWorkflowFailed(WorkflowFailedEvent ev) { protected abstract byte[] convert(TaskFailedCEData data); + protected abstract byte[] convert(TaskSuspendedCEData data); + + protected abstract byte[] convert(TaskCancelledCEData data); + + protected abstract byte[] convert(TaskResumedCEData data); + protected abstract byte[] convert(WorkflowFailedCEData data); private static CloudEventData cloudEventData(T data, ToBytes toBytes) { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskCancelledCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskCancelledCEData.java new file mode 100644 index 00000000..efa63c16 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskCancelledCEData.java @@ -0,0 +1,24 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import java.time.OffsetDateTime; + +public record TaskCancelledCEData( + String workflow, + String task, + WorkflowDefinitionCEData definition, + OffsetDateTime cancelledAt) {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskResumedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskResumedCEData.java new file mode 100644 index 00000000..eef3606b --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskResumedCEData.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import java.time.OffsetDateTime; + +public record TaskResumedCEData( + String workflow, String task, WorkflowDefinitionCEData definition, OffsetDateTime resumedAt) {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskSuspendedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskSuspendedCEData.java new file mode 100644 index 00000000..4a376073 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskSuspendedCEData.java @@ -0,0 +1,24 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import java.time.OffsetDateTime; + +public record TaskSuspendedCEData( + String workflow, + String task, + WorkflowDefinitionCEData definition, + OffsetDateTime suspendedAt) {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowCancelledCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowCancelledCEData.java new file mode 100644 index 00000000..7eb9bd62 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowCancelledCEData.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import java.time.OffsetDateTime; + +public record WorkflowCancelledCEData( + String name, WorkflowDefinitionCEData definition, OffsetDateTime cancelledAt) {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowResumedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowResumedCEData.java new file mode 100644 index 00000000..eb040d06 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowResumedCEData.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import java.time.OffsetDateTime; + +public record WorkflowResumedCEData( + String name, WorkflowDefinitionCEData definition, OffsetDateTime resumedAt) {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowSuspendedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowSuspendedCEData.java new file mode 100644 index 00000000..5b091839 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowSuspendedCEData.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import java.time.OffsetDateTime; + +public record WorkflowSuspendedCEData( + String name, WorkflowDefinitionCEData definition, OffsetDateTime suspendedAt) {} diff --git a/impl/jackson/src/main/java/io/serverlessworkflow/impl/jackson/events/JacksonLifeCyclePublisher.java b/impl/jackson/src/main/java/io/serverlessworkflow/impl/jackson/events/JacksonLifeCyclePublisher.java index be209869..c186c24f 100644 --- a/impl/jackson/src/main/java/io/serverlessworkflow/impl/jackson/events/JacksonLifeCyclePublisher.java +++ b/impl/jackson/src/main/java/io/serverlessworkflow/impl/jackson/events/JacksonLifeCyclePublisher.java @@ -18,12 +18,18 @@ import com.fasterxml.jackson.core.JsonProcessingException; import io.serverlessworkflow.impl.jackson.JsonUtils; import io.serverlessworkflow.impl.lifecycle.ce.AbstractLifeCyclePublisher; +import io.serverlessworkflow.impl.lifecycle.ce.TaskCancelledCEData; import io.serverlessworkflow.impl.lifecycle.ce.TaskCompletedCEData; import io.serverlessworkflow.impl.lifecycle.ce.TaskFailedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.TaskResumedCEData; import io.serverlessworkflow.impl.lifecycle.ce.TaskStartedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.TaskSuspendedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowCancelledCEData; import io.serverlessworkflow.impl.lifecycle.ce.WorkflowCompletedCEData; import io.serverlessworkflow.impl.lifecycle.ce.WorkflowFailedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowResumedCEData; import io.serverlessworkflow.impl.lifecycle.ce.WorkflowStartedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowSuspendedCEData; import java.io.UncheckedIOException; public class JacksonLifeCyclePublisher extends AbstractLifeCyclePublisher { @@ -58,6 +64,36 @@ protected byte[] convert(WorkflowFailedCEData data) { return genericConvert(data); } + @Override + protected byte[] convert(WorkflowSuspendedCEData data) { + return genericConvert(data); + } + + @Override + protected byte[] convert(WorkflowResumedCEData data) { + return genericConvert(data); + } + + @Override + protected byte[] convert(WorkflowCancelledCEData data) { + return genericConvert(data); + } + + @Override + protected byte[] convert(TaskSuspendedCEData data) { + return genericConvert(data); + } + + @Override + protected byte[] convert(TaskCancelledCEData data) { + return genericConvert(data); + } + + @Override + protected byte[] convert(TaskResumedCEData data) { + return genericConvert(data); + } + protected byte[] genericConvert(T data) { try { return JsonUtils.mapper().writeValueAsBytes(data); diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java index a9d3502c..e420e0df 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java @@ -28,12 +28,18 @@ import io.serverlessworkflow.impl.WorkflowStatus; import io.serverlessworkflow.impl.events.EventRegistration; import io.serverlessworkflow.impl.events.EventRegistrationBuilder; +import io.serverlessworkflow.impl.lifecycle.ce.TaskCancelledCEData; import io.serverlessworkflow.impl.lifecycle.ce.TaskCompletedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.TaskResumedCEData; import io.serverlessworkflow.impl.lifecycle.ce.TaskStartedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.TaskSuspendedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowCancelledCEData; import io.serverlessworkflow.impl.lifecycle.ce.WorkflowCompletedCEData; import io.serverlessworkflow.impl.lifecycle.ce.WorkflowErrorCEData; import io.serverlessworkflow.impl.lifecycle.ce.WorkflowFailedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowResumedCEData; import io.serverlessworkflow.impl.lifecycle.ce.WorkflowStartedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowSuspendedCEData; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -128,6 +134,17 @@ void testSuspendResumeWait() assertThat(future.get(1, TimeUnit.SECONDS).asMap().orElseThrow()) .isEqualTo(Map.of("name", "Javierito")); assertThat(instance.status()).isEqualTo(WorkflowStatus.COMPLETED); + TaskSuspendedCEData taskSuspendedEvent = + assertPojoInCE("io.serverlessworkflow.task.suspended.v1", TaskSuspendedCEData.class); + WorkflowSuspendedCEData workflowSuspendedEvent = + assertPojoInCE( + "io.serverlessworkflow.workflow.suspended.v1", WorkflowSuspendedCEData.class); + TaskResumedCEData taskResumedEvent = + assertPojoInCE("io.serverlessworkflow.task.resumed.v1", TaskResumedCEData.class); + WorkflowResumedCEData workflowResumedEvent = + assertPojoInCE("io.serverlessworkflow.workflow.resumed.v1", WorkflowResumedCEData.class); + assertThat(workflowSuspendedEvent.suspendedAt()).isBefore(workflowResumedEvent.resumedAt()); + assertThat(taskSuspendedEvent.suspendedAt()).isBefore(taskResumedEvent.resumedAt()); } @Test @@ -140,6 +157,13 @@ void testCancel() throws IOException, InterruptedException { assertThat(catchThrowableOfType(ExecutionException.class, () -> future.get().asMap())) .isNotNull(); assertThat(instance.status()).isEqualTo(WorkflowStatus.CANCELLED); + TaskCancelledCEData taskCancelledEvent = + assertPojoInCE("io.serverlessworkflow.task.cancelled.v1", TaskCancelledCEData.class); + WorkflowCancelledCEData workflowCancelledEvent = + assertPojoInCE( + "io.serverlessworkflow.workflow.cancelled.v1", WorkflowCancelledCEData.class); + assertThat(taskCancelledEvent.cancelledAt()) + .isBeforeOrEqualTo(workflowCancelledEvent.cancelledAt()); } @Test