Skip to content

Commit 5a9cc62

Browse files
committed
[Fix #782] Added suspend support
1 parent ca799b9 commit 5a9cc62

File tree

7 files changed

+39
-15
lines changed

7 files changed

+39
-15
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,7 @@ public boolean suspend() {
182182
try {
183183
statusLock.lock();
184184
if (TaskExecutorHelper.isActive(status.get()) && suspended == null) {
185-
suspended = new ConcurrentHashMap<>();
186-
status.set(WorkflowStatus.SUSPENDED);
185+
internalSuspend();
187186
publishEvent(
188187
workflowContext,
189188
l -> l.onWorkflowSuspended(new WorkflowSuspendedEvent(workflowContext)));
@@ -196,6 +195,11 @@ public boolean suspend() {
196195
}
197196
}
198197

198+
protected final void internalSuspend() {
199+
suspended = new ConcurrentHashMap<>();
200+
status.set(WorkflowStatus.SUSPENDED);
201+
}
202+
199203
@Override
200204
public boolean resume() {
201205
try {

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.serverlessworkflow.impl.WorkflowDefinition;
2121
import io.serverlessworkflow.impl.WorkflowModel;
2222
import io.serverlessworkflow.impl.WorkflowMutableInstance;
23+
import io.serverlessworkflow.impl.WorkflowStatus;
2324
import io.serverlessworkflow.impl.executors.TransitionInfo;
2425
import java.util.concurrent.CompletableFuture;
2526

@@ -37,6 +38,9 @@ public CompletableFuture<WorkflowModel> start() {
3738
return startExecution(
3839
() -> {
3940
startedAt = info.startedAt();
41+
if (info.status() == WorkflowStatus.SUSPENDED) {
42+
internalSuspend();
43+
}
4044
});
4145
}
4246

16 KB
Binary file not shown.

impl/test/db-samples/suspended.db

12 KB
Binary file not shown.

impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import io.serverlessworkflow.impl.WorkflowApplication;
2121
import io.serverlessworkflow.impl.WorkflowDefinition;
22+
import io.serverlessworkflow.impl.WorkflowInstance;
2223
import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder;
2324
import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers;
2425
import io.serverlessworkflow.impl.persistence.bigmap.BytesMapPersistenceInstanceHandlers;
@@ -31,9 +32,14 @@
3132
public class DBGenerator {
3233

3334
public static void main(String[] args) throws IOException {
34-
Files.deleteIfExists(Path.of("test.db"));
35+
runInstance("db-samples/running.db", false);
36+
runInstance("db-samples/suspended.db", true);
37+
}
38+
39+
private static void runInstance(String dbName, boolean suspend) throws IOException {
40+
Files.deleteIfExists(Path.of(dbName));
3541
try (PersistenceInstanceHandlers factories =
36-
BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore("test.db"))
42+
BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore(dbName))
3743
.build();
3844
WorkflowApplication application =
3945
PersistenceApplicationBuilder.builder(
@@ -43,7 +49,11 @@ public static void main(String[] args) throws IOException {
4349
WorkflowDefinition definition =
4450
application.workflowDefinition(
4551
readWorkflowFromClasspath("workflows-samples/set-listen-to-any.yaml"));
46-
definition.instance(Map.of()).start();
52+
WorkflowInstance instance = definition.instance(Map.of());
53+
instance.start();
54+
if (suspend) {
55+
instance.suspend();
56+
}
4757
}
4858
}
4959
}

impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,24 @@ void testWaitingInstance() throws IOException {
4444

4545
WorkflowInstance instance = definition.instance(Map.of());
4646
instance.start();
47-
assertThat(taskCounter.taskCounter(instance.id()).orElseThrow().completed()).isEqualTo(1);
47+
assertThat(taskCounter.taskCounter(instance.id()).completed()).isEqualTo(1);
4848
}
4949
}
5050

5151
@Test
5252
void testRestoreWaitingInstance() throws IOException {
53+
runIt("db-samples/running.db", WorkflowStatus.WAITING);
54+
}
55+
56+
@Test
57+
void testRestoreSuspendedInstance() throws IOException {
58+
runIt("db-samples/suspended.db", WorkflowStatus.SUSPENDED);
59+
}
60+
61+
private void runIt(String dbName, WorkflowStatus expectedStatus) throws IOException {
5362
TaskCounterPerInstanceListener taskCounter = new TaskCounterPerInstanceListener();
5463
try (PersistenceInstanceHandlers handlers =
55-
BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore("test.db"))
64+
BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore(dbName))
5665
.build();
5766
WorkflowApplication application =
5867
PersistenceApplicationBuilder.builder(
@@ -71,9 +80,8 @@ void testRestoreWaitingInstance() throws IOException {
7180
.singleElement()
7281
.satisfies(
7382
instance -> {
74-
assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING);
75-
assertThat(taskCounter.taskCounter(instance.id()).orElseThrow().completed())
76-
.isEqualTo(0);
83+
assertThat(instance.status()).isEqualTo(expectedStatus);
84+
assertThat(taskCounter.taskCounter(instance.id()).completed()).isEqualTo(0);
7785
});
7886
}
7987
}

impl/test/src/test/java/io/serverlessworkflow/impl/test/TaskCounterPerInstanceListener.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import io.serverlessworkflow.impl.lifecycle.WorkflowEvent;
2121
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
2222
import java.util.Map;
23-
import java.util.Optional;
2423
import java.util.concurrent.ConcurrentHashMap;
2524

2625
public class TaskCounterPerInstanceListener implements WorkflowExecutionListener {
@@ -53,12 +52,11 @@ public void onTaskStarted(TaskStartedEvent ev) {
5352
}
5453

5554
private TaskCounter taskCounter(WorkflowEvent ev) {
56-
return taskCounter.computeIfAbsent(
57-
ev.workflowContext().instanceData().id(), k -> new TaskCounter());
55+
return taskCounter(ev.workflowContext().instanceData().id());
5856
}
5957

60-
public Optional<TaskCounter> taskCounter(String instanceId) {
61-
return Optional.ofNullable(taskCounter.get(instanceId));
58+
public TaskCounter taskCounter(String instanceId) {
59+
return taskCounter.computeIfAbsent(instanceId, k -> new TaskCounter());
6260
}
6361

6462
public void onTaskCompleted(TaskCompletedEvent ev) {

0 commit comments

Comments
 (0)