Skip to content

Commit e808484

Browse files
committed
[Fix #695] Suspending/cancelling/resume
Signed-off-by: fjtirado <[email protected]>
1 parent 86bef90 commit e808484

File tree

7 files changed

+96
-18
lines changed

7 files changed

+96
-18
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ public WorkflowPosition position() {
122122
return position;
123123
}
124124

125-
@Override
126125
public Map<String, Object> variables() {
127126
return contextVariables;
128127
}
@@ -132,7 +131,6 @@ public Instant startedAt() {
132131
return startedAt;
133132
}
134133

135-
@Override
136134
public Optional<TaskContext> parent() {
137135
return parentContext;
138136
}

impl/core/src/main/java/io/serverlessworkflow/impl/TaskContextData.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
import io.serverlessworkflow.api.types.TaskBase;
1919
import java.time.Instant;
20-
import java.util.Map;
21-
import java.util.Optional;
2220

2321
public interface TaskContextData {
2422

@@ -34,12 +32,8 @@ public interface TaskContextData {
3432

3533
WorkflowPosition position();
3634

37-
Map<String, Object> variables();
38-
3935
Instant startedAt();
4036

41-
Optional<TaskContext> parent();
42-
4337
String taskName();
4438

4539
Instant completedAt();

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,10 @@
1919

2020
public interface WorkflowInstance extends WorkflowInstanceData {
2121
CompletableFuture<WorkflowModel> start();
22+
23+
boolean suspend();
24+
25+
boolean cancel();
26+
27+
boolean resume();
2228
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,18 @@
1818
import static io.serverlessworkflow.impl.lifecycle.LifecycleEventsUtils.publishEvent;
1919

2020
import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
21+
import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent;
2122
import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent;
2223
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
2324
import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent;
2425
import java.time.Instant;
2526
import java.util.Optional;
27+
import java.util.concurrent.CancellationException;
2628
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.CompletionException;
2730
import java.util.concurrent.atomic.AtomicReference;
31+
import java.util.concurrent.locks.Lock;
32+
import java.util.concurrent.locks.ReentrantLock;
2833

2934
public class WorkflowMutableInstance implements WorkflowInstance {
3035

@@ -36,7 +41,12 @@ public class WorkflowMutableInstance implements WorkflowInstance {
3641
private Instant startedAt;
3742
private Instant completedAt;
3843
private volatile WorkflowModel output;
44+
private Lock resumeLock = new ReentrantLock();
3945
private CompletableFuture<WorkflowModel> completableFuture;
46+
private ResumeInfo resumeInfo;
47+
48+
private static record ResumeInfo(
49+
CompletableFuture<TaskContext> incompletedFuture, TaskContext completedInfo) {}
4050

4151
WorkflowMutableInstance(WorkflowDefinition definition, WorkflowModel input) {
4252
this.id = definition.application().idFactory().get();
@@ -70,7 +80,17 @@ public CompletableFuture<WorkflowModel> start() {
7080
private void whenFailed(WorkflowModel result, Throwable ex) {
7181
completedAt = Instant.now();
7282
if (ex != null) {
73-
status.compareAndSet(WorkflowStatus.RUNNING, WorkflowStatus.FAULTED);
83+
handleException(ex instanceof CompletionException ? ex = ex.getCause() : ex);
84+
}
85+
}
86+
87+
private void handleException(Throwable ex) {
88+
if (ex instanceof CancellationException) {
89+
status.set(WorkflowStatus.CANCELLED);
90+
publishEvent(
91+
workflowContext, l -> l.onWorkflowCancelled(new WorkflowCancelledEvent(workflowContext)));
92+
} else {
93+
status.set(WorkflowStatus.FAULTED);
7494
publishEvent(
7595
workflowContext, l -> l.onWorkflowFailed(new WorkflowFailedEvent(workflowContext, ex)));
7696
}
@@ -146,4 +166,51 @@ public String toString() {
146166
+ completedAt
147167
+ "]";
148168
}
169+
170+
@Override
171+
public boolean suspend() {
172+
try {
173+
resumeLock.lock();
174+
return status.compareAndSet(WorkflowStatus.RUNNING, WorkflowStatus.SUSPENDED);
175+
} finally {
176+
resumeLock.unlock();
177+
}
178+
}
179+
180+
@Override
181+
public boolean resume() {
182+
try {
183+
resumeLock.lock();
184+
if (resumeInfo != null) {
185+
status.set(WorkflowStatus.RUNNING);
186+
resumeInfo.incompletedFuture().complete(resumeInfo.completedInfo());
187+
resumeInfo = null;
188+
return true;
189+
} else {
190+
return false;
191+
}
192+
} finally {
193+
resumeLock.unlock();
194+
}
195+
}
196+
197+
public CompletableFuture<TaskContext> checkSuspended(TaskContext t) {
198+
try {
199+
resumeLock.lock();
200+
if (status.get() == WorkflowStatus.SUSPENDED) {
201+
CompletableFuture<TaskContext> incompletedFuture = new CompletableFuture<>();
202+
resumeInfo = new ResumeInfo(incompletedFuture, t);
203+
return incompletedFuture;
204+
} else {
205+
return CompletableFuture.completedFuture(t);
206+
}
207+
} finally {
208+
resumeLock.unlock();
209+
}
210+
}
211+
212+
@Override
213+
public boolean cancel() {
214+
return completableFuture.cancel(true);
215+
}
149216
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowStatus.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@ public enum WorkflowStatus {
2121
WAITING,
2222
COMPLETED,
2323
FAULTED,
24-
CANCELLED
24+
CANCELLED,
25+
SUSPENDED
2526
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.serverlessworkflow.impl.WorkflowPosition;
3535
import io.serverlessworkflow.impl.WorkflowPredicate;
3636
import io.serverlessworkflow.impl.WorkflowStatus;
37+
import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent;
3738
import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent;
3839
import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent;
3940
import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent;
@@ -43,7 +44,9 @@
4344
import java.util.Iterator;
4445
import java.util.Map;
4546
import java.util.Optional;
47+
import java.util.concurrent.CancellationException;
4648
import java.util.concurrent.CompletableFuture;
49+
import java.util.concurrent.CompletionException;
4750

4851
public abstract class AbstractTaskExecutor<T extends TaskBase> implements TaskExecutor<T> {
4952

@@ -204,12 +207,13 @@ public CompletableFuture<TaskContext> apply(
204207
.whenComplete(
205208
(t, e) -> {
206209
if (e != null) {
207-
publishEvent(
210+
handleException(
208211
workflowContext,
209-
l ->
210-
l.onTaskFailed(new TaskFailedEvent(workflowContext, taskContext, e)));
212+
taskContext,
213+
e instanceof CompletionException ? e.getCause() : e);
211214
}
212215
})
216+
.thenCompose(t -> workflowContext.instance().checkSuspended(t))
213217
.thenApply(
214218
t -> {
215219
outputProcessor.ifPresent(
@@ -235,6 +239,19 @@ public CompletableFuture<TaskContext> apply(
235239
}
236240
}
237241

242+
private void handleException(
243+
WorkflowContext workflowContext, TaskContext taskContext, Throwable e) {
244+
if (e instanceof CancellationException) {
245+
publishEvent(
246+
workflowContext,
247+
l -> l.onTaskCancelled(new TaskCancelledEvent(workflowContext, taskContext)));
248+
} else {
249+
publishEvent(
250+
workflowContext,
251+
l -> l.onTaskFailed(new TaskFailedEvent(workflowContext, taskContext, e)));
252+
}
253+
}
254+
238255
protected abstract TransitionInfo getSkipTransition();
239256

240257
protected abstract CompletableFuture<TaskContext> execute(

impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowErrorCEData.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
2222
import java.io.PrintWriter;
2323
import java.io.StringWriter;
24-
import java.util.concurrent.CompletionException;
2524

2625
public record WorkflowErrorCEData(
2726
String type, Integer status, String instance, String title, String detail) {
@@ -35,10 +34,6 @@ public static WorkflowErrorCEData error(WorkflowFailedEvent ev) {
3534
}
3635

3736
private static WorkflowErrorCEData error(Throwable cause) {
38-
39-
if (cause instanceof CompletionException) {
40-
cause = cause.getCause();
41-
}
4237
return cause instanceof WorkflowException ex ? error(ex) : commonError(cause);
4338
}
4439

0 commit comments

Comments
 (0)