Skip to content

[Fix #695] Suspending/cancelling/resume #696

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ public WorkflowPosition position() {
return position;
}

@Override
public Map<String, Object> variables() {
return contextVariables;
}
Expand All @@ -132,7 +131,6 @@ public Instant startedAt() {
return startedAt;
}

@Override
public Optional<TaskContext> parent() {
return parentContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import io.serverlessworkflow.api.types.TaskBase;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;

public interface TaskContextData {

Expand All @@ -34,12 +32,8 @@ public interface TaskContextData {

WorkflowPosition position();

Map<String, Object> variables();

Instant startedAt();

Optional<TaskContext> parent();

String taskName();

Instant completedAt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,10 @@

public interface WorkflowInstance extends WorkflowInstanceData {
CompletableFuture<WorkflowModel> start();

boolean suspend();

boolean cancel();

boolean resume();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@
import static io.serverlessworkflow.impl.lifecycle.LifecycleEventsUtils.publishEvent;

import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class WorkflowMutableInstance implements WorkflowInstance {

Expand All @@ -36,7 +41,11 @@ public class WorkflowMutableInstance implements WorkflowInstance {
private Instant startedAt;
private Instant completedAt;
private volatile WorkflowModel output;
private Lock statusLock = new ReentrantLock();
private CompletableFuture<WorkflowModel> completableFuture;
private CompletableFuture<TaskContext> suspended;
private TaskContext suspendedTask;
private CompletableFuture<TaskContext> cancelled;

WorkflowMutableInstance(WorkflowDefinition definition, WorkflowModel input) {
this.id = definition.application().idFactory().get();
Expand Down Expand Up @@ -70,7 +79,17 @@ public CompletableFuture<WorkflowModel> start() {
private void whenFailed(WorkflowModel result, Throwable ex) {
completedAt = Instant.now();
if (ex != null) {
status.compareAndSet(WorkflowStatus.RUNNING, WorkflowStatus.FAULTED);
handleException(ex instanceof CompletionException ? ex = ex.getCause() : ex);
}
}

private void handleException(Throwable ex) {
if (ex instanceof CancellationException) {
status.set(WorkflowStatus.CANCELLED);
publishEvent(
workflowContext, l -> l.onWorkflowCancelled(new WorkflowCancelledEvent(workflowContext)));
} else {
status.set(WorkflowStatus.FAULTED);
publishEvent(
workflowContext, l -> l.onWorkflowFailed(new WorkflowFailedEvent(workflowContext, ex)));
}
Expand Down Expand Up @@ -146,4 +165,76 @@ public String toString() {
+ completedAt
+ "]";
}

@Override
public boolean suspend() {
try {
statusLock.lock();
if (TaskExecutorHelper.isActive(status.get())) {
suspended = new CompletableFuture<TaskContext>();
return true;
} else {
return false;
}
} finally {
statusLock.unlock();
}
}

@Override
public boolean resume() {
try {
statusLock.lock();
if (suspended != null) {
if (suspendedTask != null) {
CompletableFuture<TaskContext> toBeCompleted = suspended;
suspended = null;
toBeCompleted.complete(suspendedTask);
} else {
suspended = null;
}
return true;
} else {
return false;
}
} finally {
statusLock.unlock();
}
}

public CompletableFuture<TaskContext> completedChecks(TaskContext t) {
try {
statusLock.lock();
if (suspended != null) {
suspendedTask = t;
workflowContext.instance().status(WorkflowStatus.SUSPENDED);
return suspended;
}
if (cancelled != null) {
cancelled = new CompletableFuture<TaskContext>();
workflowContext.instance().status(WorkflowStatus.CANCELLED);
cancelled.completeExceptionally(
new CancellationException("Task " + t.taskName() + " has been cancelled"));
return cancelled;
}
} finally {
statusLock.unlock();
}
return CompletableFuture.completedFuture(t);
}

@Override
public boolean cancel() {
try {
statusLock.lock();
if (TaskExecutorHelper.isActive(status.get())) {
cancelled = new CompletableFuture<TaskContext>();
return true;
} else {
return false;
}
} finally {
statusLock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ public enum WorkflowStatus {
WAITING,
COMPLETED,
FAULTED,
CANCELLED
CANCELLED,
SUSPENDED
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.serverlessworkflow.impl.WorkflowPosition;
import io.serverlessworkflow.impl.WorkflowPredicate;
import io.serverlessworkflow.impl.WorkflowStatus;
import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent;
import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent;
import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent;
import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent;
Expand All @@ -43,7 +44,9 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

public abstract class AbstractTaskExecutor<T extends TaskBase> implements TaskExecutor<T> {

Expand Down Expand Up @@ -201,13 +204,16 @@ public CompletableFuture<TaskContext> apply(
return t;
})
.thenCompose(t -> execute(workflowContext, t))
.thenCompose(t -> workflowContext.instance().completedChecks(t))
.whenComplete(
(t, e) -> {
if (e != null) {
publishEvent(
handleException(
workflowContext,
l ->
l.onTaskFailed(new TaskFailedEvent(workflowContext, taskContext, e)));
taskContext,
e instanceof CompletionException ? e.getCause() : e);
} else {
workflowContext.instance().status(WorkflowStatus.RUNNING);
}
})
.thenApply(
Expand Down Expand Up @@ -235,6 +241,19 @@ public CompletableFuture<TaskContext> apply(
}
}

private void handleException(
WorkflowContext workflowContext, TaskContext taskContext, Throwable e) {
if (e instanceof CancellationException) {
publishEvent(
workflowContext,
l -> l.onTaskCancelled(new TaskCancelledEvent(workflowContext, taskContext)));
} else {
publishEvent(
workflowContext,
l -> l.onTaskFailed(new TaskFailedEvent(workflowContext, taskContext, e)));
}
}

protected abstract TransitionInfo getSkipTransition();

protected abstract CompletableFuture<TaskContext> execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ protected CompletableFuture<WorkflowModel> internalExecute(
processCe(converter.apply(ce), output, workflow, taskContext, future)))
.thenApply(
v -> {
workflow.instance().status(WorkflowStatus.RUNNING);
registrations.forEach(reg -> eventConsumer.unregister(reg));
return output;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ protected CompletableFuture<WorkflowModel> internalExecute(
((WorkflowMutableInstance) workflow.instance()).status(WorkflowStatus.WAITING);
return new CompletableFuture<WorkflowModel>()
.completeOnTimeout(taskContext.output(), millisToWait.toMillis(), TimeUnit.MILLISECONDS)
.thenApply(
node -> {
workflow.instance().status(WorkflowStatus.RUNNING);
return node;
});
.thenApply(this::complete);
}

private WorkflowModel complete(WorkflowModel model) {
return model;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.concurrent.CompletionException;

public record WorkflowErrorCEData(
String type, Integer status, String instance, String title, String detail) {
Expand All @@ -35,10 +34,6 @@ public static WorkflowErrorCEData error(WorkflowFailedEvent ev) {
}

private static WorkflowErrorCEData error(Throwable cause) {

if (cause instanceof CompletionException) {
cause = cause.getCause();
}
return cause instanceof WorkflowException ex ? error(ex) : commonError(cause);
}

Expand Down
Loading