Skip to content

Commit 9eb6b60

Browse files
committed
[Fix #782] More changes after testing
Signed-off-by: fjtirado <[email protected]>
1 parent aac9cda commit 9eb6b60

27 files changed

+398
-132
lines changed

impl/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,10 @@ As shown in previous examples, to start a new workflow instance, first a [Workfl
213213

214214
Once started, and before it completes, a workflow instance execution can be suspended or cancelled. Once cancelled, a workflow instance is done, while a suspended one might be resumed.
215215

216+
## Persistent
217+
218+
Workflow progress might be recorded into DB. See [details](persistence/README.md)
219+
216220
## Fluent Java DSL
217221

218222
Prefer building workflows programmatically with type-safe builders and recipes?

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ public CompletableFuture<WorkflowModel> start() {
6262
return startExecution(
6363
() -> {
6464
startedAt = Instant.now();
65-
status.set(WorkflowStatus.RUNNING);
6665
publishEvent(
6766
workflowContext, l -> l.onWorkflowStarted(new WorkflowStartedEvent(workflowContext)));
6867
});
@@ -73,6 +72,7 @@ protected final CompletableFuture<WorkflowModel> startExecution(Runnable runnabl
7372
if (future != null) {
7473
return future;
7574
}
75+
status.set(WorkflowStatus.RUNNING);
7676
runnable.run();
7777
future =
7878
TaskExecutorHelper.processTaskList(

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,12 @@ public static String toString(UriTemplate template) {
144144
}
145145

146146
public static void safeClose(AutoCloseable closeable) {
147-
try {
148-
closeable.close();
149-
} catch (Exception ex) {
150-
logger.warn("Error closing resource {}", closeable.getClass().getName(), ex);
147+
if (closeable != null) {
148+
try {
149+
closeable.close();
150+
} catch (Exception ex) {
151+
logger.warn("Error closing resource {}", closeable.getClass().getName(), ex);
152+
}
151153
}
152154
}
153155
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,10 +194,11 @@ public CompletableFuture<TaskContext> apply(
194194
TaskContext taskContext = new TaskContext(input, position, parentContext, taskName, task);
195195
workflowContext.instance().restoreContext(workflowContext, taskContext);
196196
CompletableFuture<TaskContext> completable = CompletableFuture.completedFuture(taskContext);
197-
if (taskContext.isCompleted() && !TaskExecutorHelper.isActive(workflowContext)) {
197+
if (!TaskExecutorHelper.isActive(workflowContext)) {
198198
return completable;
199-
}
200-
if (ifFilter.map(f -> f.test(workflowContext, taskContext, input)).orElse(true)) {
199+
} else if (taskContext.isCompleted()) {
200+
return executeNext(completable, workflowContext);
201+
} else if (ifFilter.map(f -> f.test(workflowContext, taskContext, input)).orElse(true)) {
201202
return executeNext(
202203
completable
203204
.thenCompose(workflowContext.instance()::suspendedCheck)

impl/persistence/README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[![Gitpod ready-to-code](https://img.shields.io/badge/Gitpod-ready--to--code-blue?logo=gitpod)](https://gitpod.io/#https://github.com/serverlessworkflow/sdk-java)
2+
3+
# Serverless Workflow Specification — Java SDK (Reference Implementation)- Persistence
4+
5+
Workflow persistence aim is to be able to restore workflow instances execution in the event of a JVM stop. To do that, progress of every running instance is persisted into the underlying DB by using life cycle events. Later on, when a new JVM is instantiated, the application is expected to manually start those instances that are not longer being executed by any other JVM, using the information previously stored.
6+
7+
Currently, persistence structure has been layout for key-value store dbs, plus one concrete implementation using [H2 MVStore](mvstore/README.md). Next step will do the same for relational dbs (table layout plus concrete implementation using Postgresql).
8+
9+
Map of key values has been given precedence because, when persisting the status of a running workflow instance, the number of writes are usually large, while read only operations are only performed when the JVM starts up. This give a performance edge for this kind of db over relational ones.
10+
11+
---
12+
13+
*Questions or ideas? PRs and issues welcome!*

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/DefaultBufferFactory.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,19 @@ public class DefaultBufferFactory implements WorkflowBufferFactory {
2424

2525
private final Collection<CustomObjectMarshaller> marshallers;
2626

27+
private static class DefaultBufferFactoryHolder {
28+
private static DefaultBufferFactory instance =
29+
new DefaultBufferFactory(
30+
ServiceLoader.load(CustomObjectMarshaller.class).stream()
31+
.map(ServiceLoader.Provider::get)
32+
.toList());
33+
}
34+
2735
public static DefaultBufferFactory factory() {
28-
return new DefaultBufferFactory(
29-
ServiceLoader.load(CustomObjectMarshaller.class).stream()
30-
.map(ServiceLoader.Provider::get)
31-
.toList());
36+
return DefaultBufferFactoryHolder.instance;
3237
}
3338

34-
public DefaultBufferFactory(Collection<CustomObjectMarshaller> marshallers) {
39+
protected DefaultBufferFactory(Collection<CustomObjectMarshaller> marshallers) {
3540
this.marshallers = marshallers;
3641
}
3742

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import io.serverlessworkflow.impl.WorkflowApplication;
19+
import io.serverlessworkflow.impl.WorkflowApplication.Builder;
20+
21+
public class PersistenceApplicationBuilder {
22+
23+
public static PersistenceApplicationBuilder builder(
24+
WorkflowApplication.Builder builder, PersistenceInstanceWriter writer) {
25+
return new PersistenceApplicationBuilder(builder, writer);
26+
}
27+
28+
private final PersistenceInstanceWriter writer;
29+
private final WorkflowApplication.Builder appBuilder;
30+
31+
protected PersistenceApplicationBuilder(Builder appBuilder, PersistenceInstanceWriter writer) {
32+
this.appBuilder = appBuilder;
33+
this.writer = writer;
34+
}
35+
36+
public WorkflowApplication build() {
37+
appBuilder.withListener(new WorkflowPersistenceListener(writer));
38+
return appBuilder.build();
39+
}
40+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;
19+
20+
public abstract class PersistenceInstanceHandlers implements AutoCloseable {
21+
22+
protected final PersistenceInstanceWriter writer;
23+
protected final PersistenceInstanceReader reader;
24+
25+
protected PersistenceInstanceHandlers(
26+
PersistenceInstanceWriter writer, PersistenceInstanceReader reader) {
27+
this.writer = writer;
28+
this.reader = reader;
29+
}
30+
31+
public PersistenceInstanceWriter writer() {
32+
return writer;
33+
}
34+
35+
public PersistenceInstanceReader reader() {
36+
return reader;
37+
}
38+
39+
@Override
40+
public void close() {
41+
safeClose(writer);
42+
safeClose(reader);
43+
}
44+
}
Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,12 @@
2121
import java.util.Map;
2222
import java.util.Optional;
2323

24-
public interface WorkflowPersistenceRestorer extends AutoCloseable {
25-
Map<String, WorkflowInstance> restoreAll(WorkflowDefinition definition);
24+
public interface PersistenceInstanceReader extends AutoCloseable {
25+
Map<String, WorkflowInstance> readAll(WorkflowDefinition definition);
2626

27-
Map<String, WorkflowInstance> restore(
28-
WorkflowDefinition definition, Collection<String> instanceIds);
27+
Map<String, WorkflowInstance> read(WorkflowDefinition definition, Collection<String> instanceIds);
2928

30-
Optional<WorkflowInstance> restore(WorkflowDefinition definition, String instanceId);
29+
Optional<WorkflowInstance> read(WorkflowDefinition definition, String instanceId);
3130

3231
@Override
3332
default void close() {}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import io.serverlessworkflow.impl.TaskContextData;
1919
import io.serverlessworkflow.impl.WorkflowContextData;
2020

21-
public interface WorkflowPersistenceWriter extends AutoCloseable {
21+
public interface PersistenceInstanceWriter extends AutoCloseable {
2222

2323
void started(WorkflowContextData workflowContext);
2424

0 commit comments

Comments
 (0)