Skip to content

Commit 3ab34c0

Browse files
committed
[Fix #782] Added suspend support
Signed-off-by: fjtirado <[email protected]>
1 parent ca799b9 commit 3ab34c0

File tree

7 files changed

+50
-22
lines changed

7 files changed

+50
-22
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -147,17 +147,21 @@ public WorkflowStatus status() {
147147

148148
@Override
149149
public WorkflowModel output() {
150-
return futureRef.get().join();
150+
CompletableFuture<WorkflowModel> future = futureRef.get();
151+
return future != null ? future.join() : null;
151152
}
152153

153154
@Override
154155
public <T> T outputAs(Class<T> clazz) {
155-
return output()
156-
.as(clazz)
157-
.orElseThrow(
158-
() ->
159-
new IllegalArgumentException(
160-
"Output " + output() + " cannot be converted to class " + clazz));
156+
WorkflowModel output = output();
157+
return output != null
158+
? output
159+
.as(clazz)
160+
.orElseThrow(
161+
() ->
162+
new IllegalArgumentException(
163+
"Output " + output + " cannot be converted to class " + clazz))
164+
: null;
161165
}
162166

163167
public void status(WorkflowStatus state) {
@@ -182,8 +186,7 @@ public boolean suspend() {
182186
try {
183187
statusLock.lock();
184188
if (TaskExecutorHelper.isActive(status.get()) && suspended == null) {
185-
suspended = new ConcurrentHashMap<>();
186-
status.set(WorkflowStatus.SUSPENDED);
189+
internalSuspend();
187190
publishEvent(
188191
workflowContext,
189192
l -> l.onWorkflowSuspended(new WorkflowSuspendedEvent(workflowContext)));
@@ -196,6 +199,11 @@ public boolean suspend() {
196199
}
197200
}
198201

202+
protected final void internalSuspend() {
203+
suspended = new ConcurrentHashMap<>();
204+
status.set(WorkflowStatus.SUSPENDED);
205+
}
206+
199207
@Override
200208
public boolean resume() {
201209
try {

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.serverlessworkflow.impl.WorkflowDefinition;
2121
import io.serverlessworkflow.impl.WorkflowModel;
2222
import io.serverlessworkflow.impl.WorkflowMutableInstance;
23+
import io.serverlessworkflow.impl.WorkflowStatus;
2324
import io.serverlessworkflow.impl.executors.TransitionInfo;
2425
import java.util.concurrent.CompletableFuture;
2526

@@ -37,6 +38,9 @@ public CompletableFuture<WorkflowModel> start() {
3738
return startExecution(
3839
() -> {
3940
startedAt = info.startedAt();
41+
if (info.status() == WorkflowStatus.SUSPENDED) {
42+
internalSuspend();
43+
}
4044
});
4145
}
4246

16 KB
Binary file not shown.

impl/test/db-samples/suspended.db

12 KB
Binary file not shown.

impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import io.serverlessworkflow.impl.WorkflowApplication;
2121
import io.serverlessworkflow.impl.WorkflowDefinition;
22+
import io.serverlessworkflow.impl.WorkflowInstance;
2223
import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder;
2324
import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers;
2425
import io.serverlessworkflow.impl.persistence.bigmap.BytesMapPersistenceInstanceHandlers;
@@ -31,9 +32,14 @@
3132
public class DBGenerator {
3233

3334
public static void main(String[] args) throws IOException {
34-
Files.deleteIfExists(Path.of("test.db"));
35+
runInstance("db-samples/running.db", false);
36+
runInstance("db-samples/suspended.db", true);
37+
}
38+
39+
private static void runInstance(String dbName, boolean suspend) throws IOException {
40+
Files.deleteIfExists(Path.of(dbName));
3541
try (PersistenceInstanceHandlers factories =
36-
BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore("test.db"))
42+
BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore(dbName))
3743
.build();
3844
WorkflowApplication application =
3945
PersistenceApplicationBuilder.builder(
@@ -43,7 +49,11 @@ public static void main(String[] args) throws IOException {
4349
WorkflowDefinition definition =
4450
application.workflowDefinition(
4551
readWorkflowFromClasspath("workflows-samples/set-listen-to-any.yaml"));
46-
definition.instance(Map.of()).start();
52+
WorkflowInstance instance = definition.instance(Map.of());
53+
instance.start();
54+
if (suspend) {
55+
instance.suspend();
56+
}
4757
}
4858
}
4959
}

impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,24 @@ void testWaitingInstance() throws IOException {
4444

4545
WorkflowInstance instance = definition.instance(Map.of());
4646
instance.start();
47-
assertThat(taskCounter.taskCounter(instance.id()).orElseThrow().completed()).isEqualTo(1);
47+
assertThat(taskCounter.taskCounter(instance.id()).completed()).isEqualTo(1);
4848
}
4949
}
5050

5151
@Test
5252
void testRestoreWaitingInstance() throws IOException {
53+
runIt("db-samples/running.db", WorkflowStatus.WAITING);
54+
}
55+
56+
@Test
57+
void testRestoreSuspendedInstance() throws IOException {
58+
runIt("db-samples/suspended.db", WorkflowStatus.SUSPENDED);
59+
}
60+
61+
private void runIt(String dbName, WorkflowStatus expectedStatus) throws IOException {
5362
TaskCounterPerInstanceListener taskCounter = new TaskCounterPerInstanceListener();
5463
try (PersistenceInstanceHandlers handlers =
55-
BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore("test.db"))
64+
BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore(dbName))
5665
.build();
5766
WorkflowApplication application =
5867
PersistenceApplicationBuilder.builder(
@@ -71,9 +80,8 @@ void testRestoreWaitingInstance() throws IOException {
7180
.singleElement()
7281
.satisfies(
7382
instance -> {
74-
assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING);
75-
assertThat(taskCounter.taskCounter(instance.id()).orElseThrow().completed())
76-
.isEqualTo(0);
83+
assertThat(instance.status()).isEqualTo(expectedStatus);
84+
assertThat(taskCounter.taskCounter(instance.id()).completed()).isEqualTo(0);
7785
});
7886
}
7987
}

impl/test/src/test/java/io/serverlessworkflow/impl/test/TaskCounterPerInstanceListener.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import io.serverlessworkflow.impl.lifecycle.WorkflowEvent;
2121
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
2222
import java.util.Map;
23-
import java.util.Optional;
2423
import java.util.concurrent.ConcurrentHashMap;
2524

2625
public class TaskCounterPerInstanceListener implements WorkflowExecutionListener {
@@ -53,12 +52,11 @@ public void onTaskStarted(TaskStartedEvent ev) {
5352
}
5453

5554
private TaskCounter taskCounter(WorkflowEvent ev) {
56-
return taskCounter.computeIfAbsent(
57-
ev.workflowContext().instanceData().id(), k -> new TaskCounter());
55+
return taskCounter(ev.workflowContext().instanceData().id());
5856
}
5957

60-
public Optional<TaskCounter> taskCounter(String instanceId) {
61-
return Optional.ofNullable(taskCounter.get(instanceId));
58+
public TaskCounter taskCounter(String instanceId) {
59+
return taskCounter.computeIfAbsent(instanceId, k -> new TaskCounter());
6260
}
6361

6462
public void onTaskCompleted(TaskCompletedEvent ev) {

0 commit comments

Comments
 (0)