Skip to content

Commit 7f407b3

Browse files
committed
[Fix #782] More changes after testing
Signed-off-by: fjtirado <[email protected]>
1 parent aac9cda commit 7f407b3

File tree

7 files changed

+122
-10
lines changed

7 files changed

+122
-10
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ public CompletableFuture<WorkflowModel> start() {
6262
return startExecution(
6363
() -> {
6464
startedAt = Instant.now();
65-
status.set(WorkflowStatus.RUNNING);
6665
publishEvent(
6766
workflowContext, l -> l.onWorkflowStarted(new WorkflowStartedEvent(workflowContext)));
6867
});
@@ -73,6 +72,7 @@ protected final CompletableFuture<WorkflowModel> startExecution(Runnable runnabl
7372
if (future != null) {
7473
return future;
7574
}
75+
status.set(WorkflowStatus.RUNNING);
7676
runnable.run();
7777
future =
7878
TaskExecutorHelper.processTaskList(

impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,10 +194,11 @@ public CompletableFuture<TaskContext> apply(
194194
TaskContext taskContext = new TaskContext(input, position, parentContext, taskName, task);
195195
workflowContext.instance().restoreContext(workflowContext, taskContext);
196196
CompletableFuture<TaskContext> completable = CompletableFuture.completedFuture(taskContext);
197-
if (taskContext.isCompleted() && !TaskExecutorHelper.isActive(workflowContext)) {
197+
if (!TaskExecutorHelper.isActive(workflowContext)) {
198198
return completable;
199-
}
200-
if (ifFilter.map(f -> f.test(workflowContext, taskContext, input)).orElse(true)) {
199+
} else if (taskContext.isCompleted()) {
200+
return executeNext(completable, workflowContext);
201+
} else if (ifFilter.map(f -> f.test(workflowContext, taskContext, input)).orElse(true)) {
201202
return executeNext(
202203
completable
203204
.thenCompose(workflowContext.instance()::suspendedCheck)

impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,22 @@
2222
import io.serverlessworkflow.impl.persistence.bigmap.BytesBigMapApplicationBuilder;
2323
import io.serverlessworkflow.impl.persistence.mvstore.MVStorePersistenceStore;
2424
import java.io.IOException;
25+
import java.nio.file.Files;
26+
import java.nio.file.Path;
2527
import java.util.Map;
2628

2729
public class DBGenerator {
2830

2931
public static void main(String[] args) throws IOException {
32+
Files.deleteIfExists(Path.of("test.db"));
3033
try (MVStorePersistenceStore store = new MVStorePersistenceStore("test.db");
3134
WorkflowApplication application =
32-
BytesBigMapApplicationBuilder.builder(WorkflowApplication.builder(), store).build()) {
35+
BytesBigMapApplicationBuilder.builder(
36+
WorkflowApplication.builder().withListener(new TraceExecutionListener()), store)
37+
.build()) {
3338
WorkflowDefinition definition =
3439
application.workflowDefinition(
35-
readWorkflowFromClasspath("workflows-samples/listen-to-any.yaml"));
40+
readWorkflowFromClasspath("workflows-samples/set-listen-to-any.yaml"));
3641
definition.instance(Map.of()).start();
3742
}
3843
}

impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,29 +30,55 @@
3030
import io.serverlessworkflow.impl.persistence.mvstore.MVStorePersistenceStore;
3131
import java.io.IOException;
3232
import java.util.Collection;
33+
import java.util.Map;
3334
import org.junit.jupiter.api.Test;
3435

3536
public class MvStorePersistenceTest {
3637

3738
@Test
38-
public void testRestoreWaitingInstance() throws IOException {
39+
void testWaitingInstance() throws IOException {
40+
TaskCounterPerInstanceListener taskCounter = new TaskCounterPerInstanceListener();
41+
try (WorkflowApplication application =
42+
WorkflowApplication.builder().withListener(taskCounter).build()) {
43+
WorkflowDefinition definition =
44+
application.workflowDefinition(
45+
readWorkflowFromClasspath("workflows-samples/set-listen-to-any.yaml"));
46+
47+
WorkflowInstance instance = definition.instance(Map.of());
48+
instance.start();
49+
assertThat(taskCounter.taskCounter(instance.id()).orElseThrow().completed()).isEqualTo(1);
50+
}
51+
}
52+
53+
@Test
54+
void testRestoreWaitingInstance() throws IOException {
3955
WorkflowBufferFactory bufferFactory = DefaultBufferFactory.factory();
56+
TaskCounterPerInstanceListener taskCounter = new TaskCounterPerInstanceListener();
4057
try (MVStorePersistenceStore store = new MVStorePersistenceStore("test.db");
4158
WorkflowApplication application =
42-
BytesBigMapApplicationBuilder.builder(WorkflowApplication.builder(), store)
59+
BytesBigMapApplicationBuilder.builder(
60+
WorkflowApplication.builder()
61+
.withListener(taskCounter)
62+
.withListener(new TraceExecutionListener()),
63+
store)
4364
.withFactory(bufferFactory)
4465
.build();
4566
WorkflowPersistenceRestorer restorer =
4667
new BytesBigMapPersistenceRestorer(store, bufferFactory); ) {
4768
WorkflowDefinition definition =
4869
application.workflowDefinition(
49-
readWorkflowFromClasspath("workflows-samples/listen-to-any.yaml"));
70+
readWorkflowFromClasspath("workflows-samples/set-listen-to-any.yaml"));
5071
Collection<WorkflowInstance> instances = restorer.restoreAll(definition).values();
5172
assertThat(instances).hasSize(1);
5273
instances.forEach(WorkflowInstance::start);
5374
assertThat(instances)
5475
.singleElement()
55-
.satisfies((instance -> assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING)));
76+
.satisfies(
77+
instance -> {
78+
assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING);
79+
assertThat(taskCounter.taskCounter(instance.id()).orElseThrow().completed())
80+
.isEqualTo(0);
81+
});
5682
}
5783
}
5884
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.test;
17+
18+
import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent;
19+
import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent;
20+
import io.serverlessworkflow.impl.lifecycle.WorkflowEvent;
21+
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
22+
import java.util.Map;
23+
import java.util.Optional;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
26+
public class TaskCounterPerInstanceListener implements WorkflowExecutionListener {
27+
28+
public static class TaskCounter {
29+
private int started;
30+
private int completed;
31+
32+
public void incStarted() {
33+
started++;
34+
}
35+
36+
public void incCompleted() {
37+
completed++;
38+
}
39+
40+
public int started() {
41+
return started;
42+
}
43+
44+
public int completed() {
45+
return completed;
46+
}
47+
}
48+
49+
private Map<String, TaskCounter> taskCounter = new ConcurrentHashMap<>();
50+
51+
public void onTaskStarted(TaskStartedEvent ev) {
52+
taskCounter(ev).incStarted();
53+
}
54+
55+
private TaskCounter taskCounter(WorkflowEvent ev) {
56+
return taskCounter.computeIfAbsent(
57+
ev.workflowContext().instanceData().id(), k -> new TaskCounter());
58+
}
59+
60+
public Optional<TaskCounter> taskCounter(String instanceId) {
61+
return Optional.ofNullable(taskCounter.get(instanceId));
62+
}
63+
64+
public void onTaskCompleted(TaskCompletedEvent ev) {
65+
taskCounter(ev).incCompleted();
66+
}
67+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
document:
2+
dsl: '1.0.0-alpha5'
3+
namespace: test
4+
name: set-listen-to-any
5+
version: '0.1.0'
6+
do:
7+
- doSomethingBeforeEvent:
8+
set:
9+
name: javierito
10+
- callDoctor:
11+
listen:
12+
to:
13+
any: []

impl/test/test.db

-4 KB
Binary file not shown.

0 commit comments

Comments
 (0)