From 396029dedce51756430b2c425f9139d86b353ffa Mon Sep 17 00:00:00 2001 From: fjtirado Date: Mon, 11 Aug 2025 20:07:15 +0200 Subject: [PATCH] [Fix #695] Suspending/cancelling/resume Signed-off-by: fjtirado --- .../serverlessworkflow/impl/TaskContext.java | 2 - .../impl/TaskContextData.java | 6 - .../impl/WorkflowInstance.java | 6 + .../impl/WorkflowMutableInstance.java | 93 ++++++++++++- .../impl/WorkflowStatus.java | 3 +- .../impl/executors/AbstractTaskExecutor.java | 25 +++- .../impl/executors/ListenExecutor.java | 1 - .../impl/executors/WaitExecutor.java | 10 +- .../lifecycle/ce/WorkflowErrorCEData.java | 5 - .../impl/LifeCycleEventsTest.java | 129 ++++++++++++------ impl/jackson/src/test/resources/wait-set.yaml | 12 ++ 11 files changed, 230 insertions(+), 62 deletions(-) create mode 100644 impl/jackson/src/test/resources/wait-set.yaml diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java index 6ede1194..152a3f61 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java @@ -122,7 +122,6 @@ public WorkflowPosition position() { return position; } - @Override public Map variables() { return contextVariables; } @@ -132,7 +131,6 @@ public Instant startedAt() { return startedAt; } - @Override public Optional parent() { return parentContext; } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContextData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContextData.java index e4e19b62..cbe70fc5 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContextData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContextData.java @@ -17,8 +17,6 @@ import io.serverlessworkflow.api.types.TaskBase; import java.time.Instant; -import java.util.Map; -import java.util.Optional; public interface TaskContextData { @@ -34,12 +32,8 @@ public interface TaskContextData { WorkflowPosition position(); - Map variables(); - Instant startedAt(); - Optional parent(); - String taskName(); Instant completedAt(); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java index e8b8e219..3d17108e 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java @@ -19,4 +19,10 @@ public interface WorkflowInstance extends WorkflowInstanceData { CompletableFuture start(); + + boolean suspend(); + + boolean cancel(); + + boolean resume(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java index cbc472c4..cb24cafc 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java @@ -18,13 +18,18 @@ import static io.serverlessworkflow.impl.lifecycle.LifecycleEventsUtils.publishEvent; import io.serverlessworkflow.impl.executors.TaskExecutorHelper; +import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; import java.time.Instant; import java.util.Optional; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; public class WorkflowMutableInstance implements WorkflowInstance { @@ -36,7 +41,11 @@ public class WorkflowMutableInstance implements WorkflowInstance { private Instant startedAt; private Instant completedAt; private volatile WorkflowModel output; + private Lock statusLock = new ReentrantLock(); private CompletableFuture completableFuture; + private CompletableFuture suspended; + private TaskContext suspendedTask; + private CompletableFuture cancelled; WorkflowMutableInstance(WorkflowDefinition definition, WorkflowModel input) { this.id = definition.application().idFactory().get(); @@ -70,7 +79,17 @@ public CompletableFuture start() { private void whenFailed(WorkflowModel result, Throwable ex) { completedAt = Instant.now(); if (ex != null) { - status.compareAndSet(WorkflowStatus.RUNNING, WorkflowStatus.FAULTED); + handleException(ex instanceof CompletionException ? ex = ex.getCause() : ex); + } + } + + private void handleException(Throwable ex) { + if (ex instanceof CancellationException) { + status.set(WorkflowStatus.CANCELLED); + publishEvent( + workflowContext, l -> l.onWorkflowCancelled(new WorkflowCancelledEvent(workflowContext))); + } else { + status.set(WorkflowStatus.FAULTED); publishEvent( workflowContext, l -> l.onWorkflowFailed(new WorkflowFailedEvent(workflowContext, ex))); } @@ -146,4 +165,76 @@ public String toString() { + completedAt + "]"; } + + @Override + public boolean suspend() { + try { + statusLock.lock(); + if (TaskExecutorHelper.isActive(status.get())) { + suspended = new CompletableFuture(); + return true; + } else { + return false; + } + } finally { + statusLock.unlock(); + } + } + + @Override + public boolean resume() { + try { + statusLock.lock(); + if (suspended != null) { + if (suspendedTask != null) { + CompletableFuture toBeCompleted = suspended; + suspended = null; + toBeCompleted.complete(suspendedTask); + } else { + suspended = null; + } + return true; + } else { + return false; + } + } finally { + statusLock.unlock(); + } + } + + public CompletableFuture completedChecks(TaskContext t) { + try { + statusLock.lock(); + if (suspended != null) { + suspendedTask = t; + workflowContext.instance().status(WorkflowStatus.SUSPENDED); + return suspended; + } + if (cancelled != null) { + cancelled = new CompletableFuture(); + workflowContext.instance().status(WorkflowStatus.CANCELLED); + cancelled.completeExceptionally( + new CancellationException("Task " + t.taskName() + " has been cancelled")); + return cancelled; + } + } finally { + statusLock.unlock(); + } + return CompletableFuture.completedFuture(t); + } + + @Override + public boolean cancel() { + try { + statusLock.lock(); + if (TaskExecutorHelper.isActive(status.get())) { + cancelled = new CompletableFuture(); + return true; + } else { + return false; + } + } finally { + statusLock.unlock(); + } + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowStatus.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowStatus.java index bc657839..c61306c3 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowStatus.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowStatus.java @@ -21,5 +21,6 @@ public enum WorkflowStatus { WAITING, COMPLETED, FAULTED, - CANCELLED + CANCELLED, + SUSPENDED } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java index 38e1630b..9f20cd6c 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java @@ -34,6 +34,7 @@ import io.serverlessworkflow.impl.WorkflowPosition; import io.serverlessworkflow.impl.WorkflowPredicate; import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent; import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent; import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; @@ -43,7 +44,9 @@ import java.util.Iterator; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; public abstract class AbstractTaskExecutor implements TaskExecutor { @@ -201,13 +204,16 @@ public CompletableFuture apply( return t; }) .thenCompose(t -> execute(workflowContext, t)) + .thenCompose(t -> workflowContext.instance().completedChecks(t)) .whenComplete( (t, e) -> { if (e != null) { - publishEvent( + handleException( workflowContext, - l -> - l.onTaskFailed(new TaskFailedEvent(workflowContext, taskContext, e))); + taskContext, + e instanceof CompletionException ? e.getCause() : e); + } else { + workflowContext.instance().status(WorkflowStatus.RUNNING); } }) .thenApply( @@ -235,6 +241,19 @@ public CompletableFuture apply( } } + private void handleException( + WorkflowContext workflowContext, TaskContext taskContext, Throwable e) { + if (e instanceof CancellationException) { + publishEvent( + workflowContext, + l -> l.onTaskCancelled(new TaskCancelledEvent(workflowContext, taskContext))); + } else { + publishEvent( + workflowContext, + l -> l.onTaskFailed(new TaskFailedEvent(workflowContext, taskContext, e))); + } + } + protected abstract TransitionInfo getSkipTransition(); protected abstract CompletableFuture execute( diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java index 588bd1b2..4e842c6d 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java @@ -238,7 +238,6 @@ protected CompletableFuture internalExecute( processCe(converter.apply(ce), output, workflow, taskContext, future))) .thenApply( v -> { - workflow.instance().status(WorkflowStatus.RUNNING); registrations.forEach(reg -> eventConsumer.unregister(reg)); return output; }); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java index 9e46c1ed..45137d69 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java @@ -76,10 +76,10 @@ protected CompletableFuture internalExecute( ((WorkflowMutableInstance) workflow.instance()).status(WorkflowStatus.WAITING); return new CompletableFuture() .completeOnTimeout(taskContext.output(), millisToWait.toMillis(), TimeUnit.MILLISECONDS) - .thenApply( - node -> { - workflow.instance().status(WorkflowStatus.RUNNING); - return node; - }); + .thenApply(this::complete); + } + + private WorkflowModel complete(WorkflowModel model) { + return model; } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowErrorCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowErrorCEData.java index ee277053..776a0185 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowErrorCEData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowErrorCEData.java @@ -21,7 +21,6 @@ import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; import java.io.PrintWriter; import java.io.StringWriter; -import java.util.concurrent.CompletionException; public record WorkflowErrorCEData( String type, Integer status, String instance, String title, String detail) { @@ -35,10 +34,6 @@ public static WorkflowErrorCEData error(WorkflowFailedEvent ev) { } private static WorkflowErrorCEData error(Throwable cause) { - - if (cause instanceof CompletionException) { - cause = cause.getCause(); - } return cause instanceof WorkflowException ex ? error(ex) : commonError(cause); } diff --git a/impl/jackson/src/test/java/io/serverlessworkflow/impl/LifeCycleEventsTest.java b/impl/jackson/src/test/java/io/serverlessworkflow/impl/LifeCycleEventsTest.java index 53cb792a..56b7bc5d 100644 --- a/impl/jackson/src/test/java/io/serverlessworkflow/impl/LifeCycleEventsTest.java +++ b/impl/jackson/src/test/java/io/serverlessworkflow/impl/LifeCycleEventsTest.java @@ -16,10 +16,13 @@ package io.serverlessworkflow.impl; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowableOfType; import io.cloudevents.CloudEvent; import io.cloudevents.core.data.PojoCloudEventData; import io.serverlessworkflow.api.WorkflowReader; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.events.EventRegistration; import io.serverlessworkflow.impl.events.EventRegistrationBuilder; import io.serverlessworkflow.impl.lifecycle.ce.TaskCompletedCEData; import io.serverlessworkflow.impl.lifecycle.ce.TaskStartedCEData; @@ -32,73 +35,128 @@ import java.util.Collection; import java.util.Map; import java.util.Optional; -import org.junit.jupiter.api.AfterAll; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; class LifeCycleEventsTest { - private static WorkflowApplication appl; - private static Collection publishedEvents; - - @BeforeAll - static void init() { - appl = WorkflowApplication.builder().build(); - appl.eventConsumer() - .listenToAll(appl) - .forEach( - v -> - appl.eventConsumer() - .register( - (EventRegistrationBuilder) v, ce -> publishedEvents.add((CloudEvent) ce))); - } - - @AfterAll - static void cleanup() { - appl.close(); - } + private WorkflowApplication appl; + private Collection publishedEvents; + private Collection registrations; @BeforeEach void setup() { - publishedEvents = new ArrayList<>(); + publishedEvents = new CopyOnWriteArrayList<>(); + appl = WorkflowApplication.builder().build(); + registrations = new ArrayList<>(); + Collection builders = appl.eventConsumer().listenToAll(appl); + + for (EventRegistrationBuilder builder : builders) { + registrations.add( + appl.eventConsumer().register(builder, ce -> publishedEvents.add((CloudEvent) ce))); + } } @AfterEach void close() { - publishedEvents = new ArrayList<>(); + registrations.forEach(r -> appl.eventConsumer().unregister(r)); } @Test void simpleWorkflow() throws IOException { + WorkflowModel model = appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath("simple-expression.yaml")) .instance(Map.of()) .start() .join(); + assertThat(model.asMap()).hasValueSatisfying(m -> assertThat(m).hasSize(3)); + WorkflowStartedCEData workflowStartedEvent = + assertPojoInCE("io.serverlessworkflow.workflow.started.v1", WorkflowStartedCEData.class); + TaskStartedCEData taskStartedEvent = + assertPojoInCE("io.serverlessworkflow.task.started.v1", TaskStartedCEData.class); + TaskCompletedCEData taskCompletedEvent = + assertPojoInCE("io.serverlessworkflow.task.completed.v1", TaskCompletedCEData.class); WorkflowCompletedCEData workflowCompletedEvent = assertPojoInCE( "io.serverlessworkflow.workflow.completed.v1", WorkflowCompletedCEData.class); assertThat(workflowCompletedEvent.output()).isEqualTo(model.asJavaObject()); - WorkflowStartedCEData workflowStartedEvent = - assertPojoInCE("io.serverlessworkflow.workflow.started.v1", WorkflowStartedCEData.class); assertThat(workflowStartedEvent.startedAt()).isBefore(workflowCompletedEvent.completedAt()); - TaskCompletedCEData taskCompletedEvent = - assertPojoInCE("io.serverlessworkflow.task.completed.v1", TaskCompletedCEData.class); assertThat(taskCompletedEvent.output()).isEqualTo(model.asJavaObject()); assertThat(taskCompletedEvent.completedAt()).isBefore(workflowCompletedEvent.completedAt()); - TaskStartedCEData taskStartedEvent = - assertPojoInCE("io.serverlessworkflow.task.started.v1", TaskStartedCEData.class); assertThat(taskStartedEvent.startedAt()).isAfter(workflowStartedEvent.startedAt()); assertThat(taskStartedEvent.startedAt()).isBefore(taskCompletedEvent.completedAt()); } + @Test + void testSuspendResumeNotWait() + throws IOException, ExecutionException, InterruptedException, TimeoutException { + WorkflowInstance instance = + appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath("wait-set.yaml")) + .instance(Map.of()); + CompletableFuture future = instance.start(); + instance.suspend(); + instance.resume(); + assertThat(future.get(1, TimeUnit.SECONDS).asMap().orElseThrow()) + .isEqualTo(Map.of("name", "Javierito")); + } + + @Test + void testSuspendResumeWait() + throws IOException, ExecutionException, InterruptedException, TimeoutException { + WorkflowInstance instance = + appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath("wait-set.yaml")) + .instance(Map.of()); + CompletableFuture future = instance.start(); + instance.suspend(); + assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING); + Thread.sleep(500); + assertThat(instance.status()).isEqualTo(WorkflowStatus.SUSPENDED); + instance.resume(); + assertThat(future.get(1, TimeUnit.SECONDS).asMap().orElseThrow()) + .isEqualTo(Map.of("name", "Javierito")); + assertThat(instance.status()).isEqualTo(WorkflowStatus.COMPLETED); + } + + @Test + void testCancel() throws IOException, InterruptedException { + WorkflowInstance instance = + appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath("wait-set.yaml")) + .instance(Map.of()); + CompletableFuture future = instance.start(); + instance.cancel(); + assertThat(catchThrowableOfType(ExecutionException.class, () -> future.get().asMap())) + .isNotNull(); + assertThat(instance.status()).isEqualTo(WorkflowStatus.CANCELLED); + } + + @Test + void testSuspendResumeTimeout() + throws IOException, ExecutionException, InterruptedException, TimeoutException { + WorkflowInstance instance = + appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath("wait-set.yaml")) + .instance(Map.of()); + CompletableFuture future = instance.start(); + instance.suspend(); + assertThat(catchThrowableOfType(TimeoutException.class, () -> future.get(1, TimeUnit.SECONDS))) + .isNotNull(); + } + @Test void testError() throws IOException { - appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath("raise-inline.yaml")) - .instance(Map.of()) - .start(); + Workflow workflow = WorkflowReader.readWorkflowFromClasspath("raise-inline.yaml"); + assertThat( + catchThrowableOfType( + CompletionException.class, + () -> appl.workflowDefinition(workflow).instance(Map.of()).start().join())) + .isNotNull(); WorkflowErrorCEData error = assertPojoInCE("io.serverlessworkflow.workflow.faulted.v1", WorkflowFailedCEData.class) .error(); @@ -109,12 +167,7 @@ void testError() throws IOException { } private T assertPojoInCE(String type, Class clazz) { - return assertPojoInCE(type, clazz, 1L); - } - - private T assertPojoInCE(String type, Class clazz, long count) { - assertThat(publishedEvents.stream().filter(ev -> ev.getType().equals(type)).count()) - .isEqualTo(count); + Thread.yield(); Optional event = publishedEvents.stream().filter(ev -> ev.getType().equals(type)).findAny(); assertThat(event) diff --git a/impl/jackson/src/test/resources/wait-set.yaml b/impl/jackson/src/test/resources/wait-set.yaml new file mode 100644 index 00000000..74da694c --- /dev/null +++ b/impl/jackson/src/test/resources/wait-set.yaml @@ -0,0 +1,12 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: simple-expression + version: '0.1.0' +do: + - waitABit: + wait: + milliseconds: 500 + - useExpression: + set: + name : Javierito \ No newline at end of file