Skip to content

Commit 057c792

Browse files
committed
[Fix #695] Suspending/cancelling/resume
Signed-off-by: fjtirado <[email protected]>
1 parent 86bef90 commit 057c792

File tree

9 files changed

+163
-46
lines changed

9 files changed

+163
-46
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ public WorkflowPosition position() {
122122
return position;
123123
}
124124

125-
@Override
126125
public Map<String, Object> variables() {
127126
return contextVariables;
128127
}
@@ -132,7 +131,6 @@ public Instant startedAt() {
132131
return startedAt;
133132
}
134133

135-
@Override
136134
public Optional<TaskContext> parent() {
137135
return parentContext;
138136
}

impl/core/src/main/java/io/serverlessworkflow/impl/TaskContextData.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
import io.serverlessworkflow.api.types.TaskBase;
1919
import java.time.Instant;
20-
import java.util.Map;
21-
import java.util.Optional;
2220

2321
public interface TaskContextData {
2422

@@ -34,12 +32,8 @@ public interface TaskContextData {
3432

3533
WorkflowPosition position();
3634

37-
Map<String, Object> variables();
38-
3935
Instant startedAt();
4036

41-
Optional<TaskContext> parent();
42-
4337
String taskName();
4438

4539
Instant completedAt();

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,10 @@
1919

2020
public interface WorkflowInstance extends WorkflowInstanceData {
2121
CompletableFuture<WorkflowModel> start();
22+
23+
boolean suspend();
24+
25+
boolean cancel();
26+
27+
boolean resume();
2228
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,18 @@
1818
import static io.serverlessworkflow.impl.lifecycle.LifecycleEventsUtils.publishEvent;
1919

2020
import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
21+
import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent;
2122
import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent;
2223
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
2324
import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent;
2425
import java.time.Instant;
2526
import java.util.Optional;
27+
import java.util.concurrent.CancellationException;
2628
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.CompletionException;
2730
import java.util.concurrent.atomic.AtomicReference;
31+
import java.util.concurrent.locks.Lock;
32+
import java.util.concurrent.locks.ReentrantLock;
2833

2934
public class WorkflowMutableInstance implements WorkflowInstance {
3035

@@ -36,7 +41,12 @@ public class WorkflowMutableInstance implements WorkflowInstance {
3641
private Instant startedAt;
3742
private Instant completedAt;
3843
private volatile WorkflowModel output;
44+
private Lock resumeLock = new ReentrantLock();
3945
private CompletableFuture<WorkflowModel> completableFuture;
46+
private ResumeInfo resumeInfo;
47+
48+
private static record ResumeInfo(
49+
CompletableFuture<TaskContext> incompletedFuture, TaskContext completedInfo) {}
4050

4151
WorkflowMutableInstance(WorkflowDefinition definition, WorkflowModel input) {
4252
this.id = definition.application().idFactory().get();
@@ -70,7 +80,17 @@ public CompletableFuture<WorkflowModel> start() {
7080
private void whenFailed(WorkflowModel result, Throwable ex) {
7181
completedAt = Instant.now();
7282
if (ex != null) {
73-
status.compareAndSet(WorkflowStatus.RUNNING, WorkflowStatus.FAULTED);
83+
handleException(ex instanceof CompletionException ? ex = ex.getCause() : ex);
84+
}
85+
}
86+
87+
private void handleException(Throwable ex) {
88+
if (ex instanceof CancellationException) {
89+
status.set(WorkflowStatus.CANCELLED);
90+
publishEvent(
91+
workflowContext, l -> l.onWorkflowCancelled(new WorkflowCancelledEvent(workflowContext)));
92+
} else {
93+
status.set(WorkflowStatus.FAULTED);
7494
publishEvent(
7595
workflowContext, l -> l.onWorkflowFailed(new WorkflowFailedEvent(workflowContext, ex)));
7696
}
@@ -146,4 +166,57 @@ public String toString() {
146166
+ completedAt
147167
+ "]";
148168
}
169+
170+
@Override
171+
public boolean suspend() {
172+
try {
173+
resumeLock.lock();
174+
WorkflowStatus currentStatus = status.get();
175+
if (currentStatus == WorkflowStatus.RUNNING || currentStatus == WorkflowStatus.WAITING) {
176+
status.set(WorkflowStatus.SUSPENDED);
177+
return true;
178+
} else {
179+
return false;
180+
}
181+
} finally {
182+
resumeLock.unlock();
183+
}
184+
}
185+
186+
@Override
187+
public boolean resume() {
188+
try {
189+
resumeLock.lock();
190+
if (resumeInfo != null) {
191+
status.set(WorkflowStatus.RUNNING);
192+
resumeInfo.incompletedFuture().complete(resumeInfo.completedInfo());
193+
resumeInfo = null;
194+
return true;
195+
} else {
196+
return false;
197+
}
198+
} finally {
199+
resumeLock.unlock();
200+
}
201+
}
202+
203+
public CompletableFuture<TaskContext> checkSuspended(TaskContext t) {
204+
try {
205+
resumeLock.lock();
206+
if (status.get() == WorkflowStatus.SUSPENDED) {
207+
CompletableFuture<TaskContext> incompletedFuture = new CompletableFuture<>();
208+
resumeInfo = new ResumeInfo(incompletedFuture, t);
209+
return incompletedFuture;
210+
} else {
211+
return CompletableFuture.completedFuture(t);
212+
}
213+
} finally {
214+
resumeLock.unlock();
215+
}
216+
}
217+
218+
@Override
219+
public boolean cancel() {
220+
return completableFuture.cancel(true);
221+
}
149222
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowStatus.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@ public enum WorkflowStatus {
2121
WAITING,
2222
COMPLETED,
2323
FAULTED,
24-
CANCELLED
24+
CANCELLED,
25+
SUSPENDED
2526
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.serverlessworkflow.impl.WorkflowPosition;
3535
import io.serverlessworkflow.impl.WorkflowPredicate;
3636
import io.serverlessworkflow.impl.WorkflowStatus;
37+
import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent;
3738
import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent;
3839
import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent;
3940
import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent;
@@ -43,7 +44,9 @@
4344
import java.util.Iterator;
4445
import java.util.Map;
4546
import java.util.Optional;
47+
import java.util.concurrent.CancellationException;
4648
import java.util.concurrent.CompletableFuture;
49+
import java.util.concurrent.CompletionException;
4750

4851
public abstract class AbstractTaskExecutor<T extends TaskBase> implements TaskExecutor<T> {
4952

@@ -204,12 +207,13 @@ public CompletableFuture<TaskContext> apply(
204207
.whenComplete(
205208
(t, e) -> {
206209
if (e != null) {
207-
publishEvent(
210+
handleException(
208211
workflowContext,
209-
l ->
210-
l.onTaskFailed(new TaskFailedEvent(workflowContext, taskContext, e)));
212+
taskContext,
213+
e instanceof CompletionException ? e.getCause() : e);
211214
}
212215
})
216+
.thenCompose(t -> workflowContext.instance().checkSuspended(t))
213217
.thenApply(
214218
t -> {
215219
outputProcessor.ifPresent(
@@ -235,6 +239,19 @@ public CompletableFuture<TaskContext> apply(
235239
}
236240
}
237241

242+
private void handleException(
243+
WorkflowContext workflowContext, TaskContext taskContext, Throwable e) {
244+
if (e instanceof CancellationException) {
245+
publishEvent(
246+
workflowContext,
247+
l -> l.onTaskCancelled(new TaskCancelledEvent(workflowContext, taskContext)));
248+
} else {
249+
publishEvent(
250+
workflowContext,
251+
l -> l.onTaskFailed(new TaskFailedEvent(workflowContext, taskContext, e)));
252+
}
253+
}
254+
238255
protected abstract TransitionInfo getSkipTransition();
239256

240257
protected abstract CompletableFuture<TaskContext> execute(

impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowErrorCEData.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
2222
import java.io.PrintWriter;
2323
import java.io.StringWriter;
24-
import java.util.concurrent.CompletionException;
2524

2625
public record WorkflowErrorCEData(
2726
String type, Integer status, String instance, String title, String detail) {
@@ -35,10 +34,6 @@ public static WorkflowErrorCEData error(WorkflowFailedEvent ev) {
3534
}
3635

3736
private static WorkflowErrorCEData error(Throwable cause) {
38-
39-
if (cause instanceof CompletionException) {
40-
cause = cause.getCause();
41-
}
4237
return cause instanceof WorkflowException ex ? error(ex) : commonError(cause);
4338
}
4439

impl/jackson/src/test/java/io/serverlessworkflow/impl/LifeCycleEventsTest.java

Lines changed: 49 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.catchThrowableOfType;
1920

2021
import io.cloudevents.CloudEvent;
2122
import io.cloudevents.core.data.PojoCloudEventData;
2223
import io.serverlessworkflow.api.WorkflowReader;
24+
import io.serverlessworkflow.impl.events.EventRegistration;
2325
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
2426
import io.serverlessworkflow.impl.lifecycle.ce.TaskCompletedCEData;
2527
import io.serverlessworkflow.impl.lifecycle.ce.TaskStartedCEData;
@@ -32,42 +34,36 @@
3234
import java.util.Collection;
3335
import java.util.Map;
3436
import java.util.Optional;
35-
import org.junit.jupiter.api.AfterAll;
37+
import java.util.concurrent.CompletableFuture;
38+
import java.util.concurrent.ExecutionException;
39+
import java.util.concurrent.TimeUnit;
40+
import java.util.concurrent.TimeoutException;
3641
import org.junit.jupiter.api.AfterEach;
37-
import org.junit.jupiter.api.BeforeAll;
3842
import org.junit.jupiter.api.BeforeEach;
3943
import org.junit.jupiter.api.Test;
4044

4145
class LifeCycleEventsTest {
4246

43-
private static WorkflowApplication appl;
44-
private static Collection<CloudEvent> publishedEvents;
45-
46-
@BeforeAll
47-
static void init() {
48-
appl = WorkflowApplication.builder().build();
49-
appl.eventConsumer()
50-
.listenToAll(appl)
51-
.forEach(
52-
v ->
53-
appl.eventConsumer()
54-
.register(
55-
(EventRegistrationBuilder) v, ce -> publishedEvents.add((CloudEvent) ce)));
56-
}
57-
58-
@AfterAll
59-
static void cleanup() {
60-
appl.close();
61-
}
47+
private WorkflowApplication appl;
48+
private Collection<CloudEvent> publishedEvents;
49+
private Collection<EventRegistration> registrations;
6250

6351
@BeforeEach
6452
void setup() {
6553
publishedEvents = new ArrayList<>();
54+
appl = WorkflowApplication.builder().build();
55+
registrations = new ArrayList<>();
56+
Collection<EventRegistrationBuilder> builders = appl.eventConsumer().listenToAll(appl);
57+
58+
for (EventRegistrationBuilder builder : builders) {
59+
registrations.add(
60+
appl.eventConsumer().register(builder, ce -> publishedEvents.add((CloudEvent) ce)));
61+
}
6662
}
6763

6864
@AfterEach
6965
void close() {
70-
publishedEvents = new ArrayList<>();
66+
registrations.forEach(r -> appl.eventConsumer().unregister(r));
7167
}
7268

7369
@Test
@@ -77,23 +73,48 @@ void simpleWorkflow() throws IOException {
7773
.instance(Map.of())
7874
.start()
7975
.join();
76+
WorkflowStartedCEData workflowStartedEvent =
77+
assertPojoInCE("io.serverlessworkflow.workflow.started.v1", WorkflowStartedCEData.class);
78+
TaskStartedCEData taskStartedEvent =
79+
assertPojoInCE("io.serverlessworkflow.task.started.v1", TaskStartedCEData.class);
80+
TaskCompletedCEData taskCompletedEvent =
81+
assertPojoInCE("io.serverlessworkflow.task.completed.v1", TaskCompletedCEData.class);
8082
WorkflowCompletedCEData workflowCompletedEvent =
8183
assertPojoInCE(
8284
"io.serverlessworkflow.workflow.completed.v1", WorkflowCompletedCEData.class);
8385
assertThat(workflowCompletedEvent.output()).isEqualTo(model.asJavaObject());
84-
WorkflowStartedCEData workflowStartedEvent =
85-
assertPojoInCE("io.serverlessworkflow.workflow.started.v1", WorkflowStartedCEData.class);
8686
assertThat(workflowStartedEvent.startedAt()).isBefore(workflowCompletedEvent.completedAt());
87-
TaskCompletedCEData taskCompletedEvent =
88-
assertPojoInCE("io.serverlessworkflow.task.completed.v1", TaskCompletedCEData.class);
8987
assertThat(taskCompletedEvent.output()).isEqualTo(model.asJavaObject());
9088
assertThat(taskCompletedEvent.completedAt()).isBefore(workflowCompletedEvent.completedAt());
91-
TaskStartedCEData taskStartedEvent =
92-
assertPojoInCE("io.serverlessworkflow.task.started.v1", TaskStartedCEData.class);
9389
assertThat(taskStartedEvent.startedAt()).isAfter(workflowStartedEvent.startedAt());
9490
assertThat(taskStartedEvent.startedAt()).isBefore(taskCompletedEvent.completedAt());
9591
}
9692

93+
@Test
94+
void suspendResume() throws IOException, ExecutionException, InterruptedException {
95+
WorkflowInstance instance =
96+
appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath("wait-set.yaml"))
97+
.instance(Map.of());
98+
CompletableFuture<WorkflowModel> future = instance.start();
99+
instance.suspend();
100+
assertThat(instance.status()).isEqualTo(WorkflowStatus.SUSPENDED);
101+
instance.resume();
102+
assertThat(future.get().asMap().orElseThrow()).isEqualTo(Map.of("name", "Javierito"));
103+
}
104+
105+
@Test
106+
void suspendResumeTimeout()
107+
throws IOException, ExecutionException, InterruptedException, TimeoutException {
108+
WorkflowInstance instance =
109+
appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath("wait-set.yaml"))
110+
.instance(Map.of());
111+
CompletableFuture<WorkflowModel> future = instance.start();
112+
instance.suspend();
113+
assertThat(instance.status()).isEqualTo(WorkflowStatus.SUSPENDED);
114+
catchThrowableOfType(
115+
TimeoutException.class, () -> future.get(500, TimeUnit.MILLISECONDS).asMap());
116+
}
117+
97118
@Test
98119
void testError() throws IOException {
99120
appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath("raise-inline.yaml"))
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
document:
2+
dsl: '1.0.0-alpha5'
3+
namespace: test
4+
name: simple-expression
5+
version: '0.1.0'
6+
do:
7+
- waitABit:
8+
wait:
9+
milliseconds: 500
10+
- useExpression:
11+
set:
12+
name : Javierito

0 commit comments

Comments
 (0)