Skip to content

Commit 521f8f7

Browse files
committed
[Fix #695] Suspending/cancelling/resume
1 parent 86bef90 commit 521f8f7

File tree

7 files changed

+101
-18
lines changed

7 files changed

+101
-18
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ public WorkflowPosition position() {
122122
return position;
123123
}
124124

125-
@Override
126125
public Map<String, Object> variables() {
127126
return contextVariables;
128127
}
@@ -132,7 +131,6 @@ public Instant startedAt() {
132131
return startedAt;
133132
}
134133

135-
@Override
136134
public Optional<TaskContext> parent() {
137135
return parentContext;
138136
}

impl/core/src/main/java/io/serverlessworkflow/impl/TaskContextData.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
import io.serverlessworkflow.api.types.TaskBase;
1919
import java.time.Instant;
20-
import java.util.Map;
21-
import java.util.Optional;
2220

2321
public interface TaskContextData {
2422

@@ -34,12 +32,8 @@ public interface TaskContextData {
3432

3533
WorkflowPosition position();
3634

37-
Map<String, Object> variables();
38-
3935
Instant startedAt();
4036

41-
Optional<TaskContext> parent();
42-
4337
String taskName();
4438

4539
Instant completedAt();

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,10 @@
1919

2020
public interface WorkflowInstance extends WorkflowInstanceData {
2121
CompletableFuture<WorkflowModel> start();
22+
23+
boolean suspend();
24+
25+
boolean cancel();
26+
27+
boolean resume();
2228
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,18 @@
1818
import static io.serverlessworkflow.impl.lifecycle.LifecycleEventsUtils.publishEvent;
1919

2020
import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
21+
import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent;
2122
import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent;
2223
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
2324
import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent;
2425
import java.time.Instant;
2526
import java.util.Optional;
27+
import java.util.concurrent.CancellationException;
2628
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.CompletionException;
2730
import java.util.concurrent.atomic.AtomicReference;
31+
import java.util.concurrent.locks.Lock;
32+
import java.util.concurrent.locks.ReentrantLock;
2833

2934
public class WorkflowMutableInstance implements WorkflowInstance {
3035

@@ -36,7 +41,12 @@ public class WorkflowMutableInstance implements WorkflowInstance {
3641
private Instant startedAt;
3742
private Instant completedAt;
3843
private volatile WorkflowModel output;
44+
private Lock resumeLock = new ReentrantLock();
3945
private CompletableFuture<WorkflowModel> completableFuture;
46+
private ResumeInfo resumeInfo;
47+
48+
private static record ResumeInfo(
49+
CompletableFuture<TaskContext> incompletedFuture, TaskContext completedInfo) {}
4050

4151
WorkflowMutableInstance(WorkflowDefinition definition, WorkflowModel input) {
4252
this.id = definition.application().idFactory().get();
@@ -70,7 +80,17 @@ public CompletableFuture<WorkflowModel> start() {
7080
private void whenFailed(WorkflowModel result, Throwable ex) {
7181
completedAt = Instant.now();
7282
if (ex != null) {
73-
status.compareAndSet(WorkflowStatus.RUNNING, WorkflowStatus.FAULTED);
83+
handleException(ex instanceof CompletionException ? ex = ex.getCause() : ex);
84+
}
85+
}
86+
87+
private void handleException(Throwable ex) {
88+
if (ex instanceof CancellationException) {
89+
status.set(WorkflowStatus.CANCELLED);
90+
publishEvent(
91+
workflowContext, l -> l.onWorkflowCancelled(new WorkflowCancelledEvent(workflowContext)));
92+
} else {
93+
status.set(WorkflowStatus.FAULTED);
7494
publishEvent(
7595
workflowContext, l -> l.onWorkflowFailed(new WorkflowFailedEvent(workflowContext, ex)));
7696
}
@@ -146,4 +166,56 @@ public String toString() {
146166
+ completedAt
147167
+ "]";
148168
}
169+
170+
@Override
171+
public boolean suspend() {
172+
try {
173+
resumeLock.lock();
174+
WorkflowStatus currentStatus = status.get();
175+
if (currentStatus == WorkflowStatus.RUNNING || currentStatus == WorkflowStatus.WAITING) {
176+
status.set(WorkflowStatus.SUSPENDED);
177+
return true;
178+
} else {
179+
return false;
180+
}
181+
} finally {
182+
resumeLock.unlock();
183+
}
184+
}
185+
186+
@Override
187+
public boolean resume() {
188+
try {
189+
resumeLock.lock();
190+
if (resumeInfo != null) {
191+
resumeInfo.incompletedFuture().complete(resumeInfo.completedInfo());
192+
resumeInfo = null;
193+
return true;
194+
} else {
195+
return false;
196+
}
197+
} finally {
198+
resumeLock.unlock();
199+
}
200+
}
201+
202+
public CompletableFuture<TaskContext> checkSuspended(TaskContext t) {
203+
try {
204+
resumeLock.lock();
205+
if (status.get() == WorkflowStatus.SUSPENDED) {
206+
CompletableFuture<TaskContext> incompletedFuture = new CompletableFuture<>();
207+
resumeInfo = new ResumeInfo(incompletedFuture, t);
208+
return incompletedFuture;
209+
} else {
210+
return CompletableFuture.completedFuture(t);
211+
}
212+
} finally {
213+
resumeLock.unlock();
214+
}
215+
}
216+
217+
@Override
218+
public boolean cancel() {
219+
return completableFuture.cancel(true);
220+
}
149221
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowStatus.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@ public enum WorkflowStatus {
2121
WAITING,
2222
COMPLETED,
2323
FAULTED,
24-
CANCELLED
24+
CANCELLED,
25+
SUSPENDED
2526
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.serverlessworkflow.impl.WorkflowPosition;
3535
import io.serverlessworkflow.impl.WorkflowPredicate;
3636
import io.serverlessworkflow.impl.WorkflowStatus;
37+
import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent;
3738
import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent;
3839
import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent;
3940
import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent;
@@ -43,7 +44,9 @@
4344
import java.util.Iterator;
4445
import java.util.Map;
4546
import java.util.Optional;
47+
import java.util.concurrent.CancellationException;
4648
import java.util.concurrent.CompletableFuture;
49+
import java.util.concurrent.CompletionException;
4750

4851
public abstract class AbstractTaskExecutor<T extends TaskBase> implements TaskExecutor<T> {
4952

@@ -204,12 +207,13 @@ public CompletableFuture<TaskContext> apply(
204207
.whenComplete(
205208
(t, e) -> {
206209
if (e != null) {
207-
publishEvent(
210+
handleException(
208211
workflowContext,
209-
l ->
210-
l.onTaskFailed(new TaskFailedEvent(workflowContext, taskContext, e)));
212+
taskContext,
213+
e instanceof CompletionException ? e.getCause() : e);
211214
}
212215
})
216+
.thenCompose(t -> workflowContext.instance().checkSuspended(t))
213217
.thenApply(
214218
t -> {
215219
outputProcessor.ifPresent(
@@ -235,6 +239,19 @@ public CompletableFuture<TaskContext> apply(
235239
}
236240
}
237241

242+
private void handleException(
243+
WorkflowContext workflowContext, TaskContext taskContext, Throwable e) {
244+
if (e instanceof CancellationException) {
245+
publishEvent(
246+
workflowContext,
247+
l -> l.onTaskCancelled(new TaskCancelledEvent(workflowContext, taskContext)));
248+
} else {
249+
publishEvent(
250+
workflowContext,
251+
l -> l.onTaskFailed(new TaskFailedEvent(workflowContext, taskContext, e)));
252+
}
253+
}
254+
238255
protected abstract TransitionInfo getSkipTransition();
239256

240257
protected abstract CompletableFuture<TaskContext> execute(

impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowErrorCEData.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
2222
import java.io.PrintWriter;
2323
import java.io.StringWriter;
24-
import java.util.concurrent.CompletionException;
2524

2625
public record WorkflowErrorCEData(
2726
String type, Integer status, String instance, String title, String detail) {
@@ -35,10 +34,6 @@ public static WorkflowErrorCEData error(WorkflowFailedEvent ev) {
3534
}
3635

3736
private static WorkflowErrorCEData error(Throwable cause) {
38-
39-
if (cause instanceof CompletionException) {
40-
cause = cause.getCause();
41-
}
4237
return cause instanceof WorkflowException ex ? error(ex) : commonError(cause);
4338
}
4439

0 commit comments

Comments
 (0)