Skip to content

Commit 8e1b72a

Browse files
authored
[Fix #782] Adding MVStore persistence (#783)
* [Fix #782] Adding MVStore persistence Signed-off-by: fjtirado <[email protected]> * [Fix #782] Refining approach Signed-off-by: fjtirado <[email protected]> * [Fix #782] Fixes after manual testing Signed-off-by: fjtirado <[email protected]> * [Fix #782] Moving tests around Signed-off-by: fjtirado <[email protected]> * [Fix #782] More changes after testing Signed-off-by: fjtirado <[email protected]> * [Fix #782] Added suspend support Signed-off-by: fjtirado <[email protected]> * [Fix #782] Adding transaction support Signed-off-by: fjtirado <[email protected]> --------- Signed-off-by: fjtirado <[email protected]>
1 parent fee8305 commit 8e1b72a

File tree

60 files changed

+2484
-45
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+2484
-45
lines changed

impl/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,10 @@ As shown in previous examples, to start a new workflow instance, first a [Workfl
213213

214214
Once started, and before it completes, a workflow instance execution can be suspended or cancelled. Once cancelled, a workflow instance is done, while a suspended one might be resumed.
215215

216+
## Persistence
217+
218+
Workflow progress might be recorded into DB. See [details](persistence/README.md)
219+
216220
## Fluent Java DSL
217221

218222
Prefer building workflows programmatically with type-safe builders and recipes?

impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class TaskContext implements TaskContextData {
3737
private WorkflowModel rawOutput;
3838
private Instant completedAt;
3939
private TransitionInfo transition;
40+
private boolean completed;
4041

4142
public TaskContext(
4243
WorkflowModel input,
@@ -109,6 +110,7 @@ public WorkflowModel rawOutput() {
109110

110111
public TaskContext output(WorkflowModel output) {
111112
this.output = output;
113+
this.completed = true;
112114
return this;
113115
}
114116

@@ -159,6 +161,10 @@ public TaskContext transition(TransitionInfo transition) {
159161
return this;
160162
}
161163

164+
public boolean isCompleted() {
165+
return completed;
166+
}
167+
162168
@Override
163169
public String toString() {
164170
return "TaskContext [position="

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package io.serverlessworkflow.impl;
1717

18+
import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;
19+
1820
import io.serverlessworkflow.api.types.SchemaInline;
1921
import io.serverlessworkflow.api.types.Workflow;
2022
import io.serverlessworkflow.impl.events.EventConsumer;
@@ -39,13 +41,9 @@
3941
import java.util.concurrent.ConcurrentHashMap;
4042
import java.util.concurrent.ExecutorService;
4143
import java.util.stream.Collectors;
42-
import org.slf4j.Logger;
43-
import org.slf4j.LoggerFactory;
4444

4545
public class WorkflowApplication implements AutoCloseable {
4646

47-
private static final Logger logger = LoggerFactory.getLogger(WorkflowApplication.class);
48-
4947
private final TaskExecutorFactory taskFactory;
5048
private final ExpressionFactory exprFactory;
5149
private final ResourceLoaderFactory resourceLoaderFactory;
@@ -271,14 +269,11 @@ public void close() {
271269
safeClose(definition);
272270
}
273271
definitions.clear();
274-
}
275272

276-
private void safeClose(AutoCloseable closeable) {
277-
try {
278-
closeable.close();
279-
} catch (Exception ex) {
280-
logger.warn("Error closing resource {}", closeable.getClass().getName(), ex);
273+
for (WorkflowExecutionListener listener : listeners) {
274+
safeClose(listener);
281275
}
276+
listeners.clear();
282277
}
283278

284279
public WorkflowPositionFactory positionFactory() {

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import io.serverlessworkflow.impl.resources.ResourceLoader;
2626
import io.serverlessworkflow.impl.schema.SchemaValidator;
2727
import java.nio.file.Path;
28+
import java.util.HashMap;
29+
import java.util.Map;
2830
import java.util.Optional;
2931

3032
public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData {
@@ -37,6 +39,7 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData
3739
private final WorkflowApplication application;
3840
private final TaskExecutor<?> taskExecutor;
3941
private final ResourceLoader resourceLoader;
42+
private final Map<String, TaskExecutor<?>> executors = new HashMap<>();
4043

4144
private WorkflowDefinition(
4245
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
@@ -70,7 +73,9 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow,
7073
}
7174

7275
public WorkflowInstance instance(Object input) {
73-
return new WorkflowMutableInstance(this, application.modelFactory().fromAny(input));
76+
WorkflowModel inputModel = application.modelFactory().fromAny(input);
77+
inputSchemaValidator().ifPresent(v -> v.validate(inputModel));
78+
return new WorkflowMutableInstance(this, application().idFactory().get(), inputModel);
7479
}
7580

7681
Optional<SchemaValidator> inputSchemaValidator() {
@@ -107,8 +112,14 @@ public ResourceLoader resourceLoader() {
107112
return resourceLoader;
108113
}
109114

110-
@Override
111-
public void close() {
112-
// TODO close resourcers hold for uncompleted process instances, if any
115+
public TaskExecutor<?> taskExecutor(String jsonPointer) {
116+
return executors.get(jsonPointer);
117+
}
118+
119+
public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor<?> taskExecutor) {
120+
executors.put(position.jsonPointer(), taskExecutor);
113121
}
122+
123+
@Override
124+
public void close() {}
114125
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,32 +38,43 @@
3838
public class WorkflowMutableInstance implements WorkflowInstance {
3939

4040
protected final AtomicReference<WorkflowStatus> status;
41-
private final String id;
42-
private final WorkflowModel input;
41+
protected final String id;
42+
protected final WorkflowModel input;
43+
44+
protected final WorkflowContext workflowContext;
45+
protected Instant startedAt;
46+
47+
protected AtomicReference<CompletableFuture<WorkflowModel>> futureRef = new AtomicReference<>();
48+
protected Instant completedAt;
4349

44-
private WorkflowContext workflowContext;
45-
private Instant startedAt;
46-
private Instant completedAt;
47-
private volatile WorkflowModel output;
4850
private Lock statusLock = new ReentrantLock();
49-
private CompletableFuture<WorkflowModel> completableFuture;
5051
private Map<CompletableFuture<TaskContext>, TaskContext> suspended;
5152

52-
WorkflowMutableInstance(WorkflowDefinition definition, WorkflowModel input) {
53-
this.id = definition.application().idFactory().get();
53+
protected WorkflowMutableInstance(WorkflowDefinition definition, String id, WorkflowModel input) {
54+
this.id = id;
5455
this.input = input;
5556
this.status = new AtomicReference<>(WorkflowStatus.PENDING);
56-
definition.inputSchemaValidator().ifPresent(v -> v.validate(input));
5757
this.workflowContext = new WorkflowContext(definition, this);
5858
}
5959

6060
@Override
6161
public CompletableFuture<WorkflowModel> start() {
62-
this.startedAt = Instant.now();
63-
this.status.set(WorkflowStatus.RUNNING);
64-
publishEvent(
65-
workflowContext, l -> l.onWorkflowStarted(new WorkflowStartedEvent(workflowContext)));
66-
this.completableFuture =
62+
return startExecution(
63+
() -> {
64+
startedAt = Instant.now();
65+
publishEvent(
66+
workflowContext, l -> l.onWorkflowStarted(new WorkflowStartedEvent(workflowContext)));
67+
});
68+
}
69+
70+
protected final CompletableFuture<WorkflowModel> startExecution(Runnable runnable) {
71+
CompletableFuture<WorkflowModel> future = futureRef.get();
72+
if (future != null) {
73+
return future;
74+
}
75+
status.set(WorkflowStatus.RUNNING);
76+
runnable.run();
77+
future =
6778
TaskExecutorHelper.processTaskList(
6879
workflowContext.definition().startTask(),
6980
workflowContext,
@@ -75,7 +86,8 @@ public CompletableFuture<WorkflowModel> start() {
7586
.orElse(input))
7687
.whenComplete(this::whenFailed)
7788
.thenApply(this::whenSuccess);
78-
return completableFuture;
89+
futureRef.set(future);
90+
return future;
7991
}
8092

8193
private void whenFailed(WorkflowModel result, Throwable ex) {
@@ -94,7 +106,7 @@ private void handleException(Throwable ex) {
94106
}
95107

96108
private WorkflowModel whenSuccess(WorkflowModel node) {
97-
output =
109+
WorkflowModel output =
98110
workflowContext
99111
.definition()
100112
.outputFilter()
@@ -103,7 +115,8 @@ private WorkflowModel whenSuccess(WorkflowModel node) {
103115
workflowContext.definition().outputSchemaValidator().ifPresent(v -> v.validate(output));
104116
status.set(WorkflowStatus.COMPLETED);
105117
publishEvent(
106-
workflowContext, l -> l.onWorkflowCompleted(new WorkflowCompletedEvent(workflowContext)));
118+
workflowContext,
119+
l -> l.onWorkflowCompleted(new WorkflowCompletedEvent(workflowContext, output)));
107120
return output;
108121
}
109122

@@ -134,11 +147,13 @@ public WorkflowStatus status() {
134147

135148
@Override
136149
public WorkflowModel output() {
137-
return output;
150+
CompletableFuture<WorkflowModel> future = futureRef.get();
151+
return future != null ? future.join() : null;
138152
}
139153

140154
@Override
141155
public <T> T outputAs(Class<T> clazz) {
156+
WorkflowModel output = output();
142157
return output != null
143158
? output
144159
.as(clazz)
@@ -171,8 +186,7 @@ public boolean suspend() {
171186
try {
172187
statusLock.lock();
173188
if (TaskExecutorHelper.isActive(status.get()) && suspended == null) {
174-
suspended = new ConcurrentHashMap<>();
175-
status.set(WorkflowStatus.SUSPENDED);
189+
internalSuspend();
176190
publishEvent(
177191
workflowContext,
178192
l -> l.onWorkflowSuspended(new WorkflowSuspendedEvent(workflowContext)));
@@ -185,6 +199,11 @@ public boolean suspend() {
185199
}
186200
}
187201

202+
protected final void internalSuspend() {
203+
suspended = new ConcurrentHashMap<>();
204+
status.set(WorkflowStatus.SUSPENDED);
205+
}
206+
188207
@Override
189208
public boolean resume() {
190209
try {
@@ -253,4 +272,6 @@ public boolean cancel() {
253272
statusLock.unlock();
254273
}
255274
}
275+
276+
public void restoreContext(WorkflowContext workflow, TaskContext context) {}
256277
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,15 @@
2828
import java.net.URI;
2929
import java.util.Map;
3030
import java.util.Optional;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
3133

3234
public class WorkflowUtils {
3335

3436
private WorkflowUtils() {}
3537

38+
private static final Logger logger = LoggerFactory.getLogger(WorkflowUtils.class);
39+
3640
public static Optional<SchemaValidator> getSchemaValidator(
3741
SchemaValidatorFactory validatorFactory, ResourceLoader resourceLoader, SchemaUnion schema) {
3842
if (schema != null) {
@@ -138,4 +142,14 @@ public static String toString(UriTemplate template) {
138142
URI uri = template.getLiteralUri();
139143
return uri != null ? uri.toString() : template.getLiteralUriTemplate();
140144
}
145+
146+
public static void safeClose(AutoCloseable closeable) {
147+
if (closeable != null) {
148+
try {
149+
closeable.close();
150+
} catch (Exception ex) {
151+
logger.warn("Error closing resource {}", closeable.getClass().getName(), ex);
152+
}
153+
}
154+
}
141155
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,13 @@ public abstract static class AbstractTaskExecutorBuilder<
7878
protected final WorkflowApplication application;
7979
protected final Workflow workflow;
8080
protected final ResourceLoader resourceLoader;
81+
private final WorkflowDefinition definition;
8182

8283
private V instance;
8384

8485
protected AbstractTaskExecutorBuilder(
8586
WorkflowMutablePosition position, T task, WorkflowDefinition definition) {
87+
this.definition = definition;
8688
this.workflow = definition.workflow();
8789
this.taskName = position.last().toString();
8890
this.position = position;
@@ -147,6 +149,7 @@ public V build() {
147149
if (instance == null) {
148150
instance = buildInstance();
149151
buildTransition(instance);
152+
definition.addTaskExecutor(position, instance);
150153
}
151154
return instance;
152155
}
@@ -189,11 +192,13 @@ private CompletableFuture<TaskContext> executeNext(
189192
public CompletableFuture<TaskContext> apply(
190193
WorkflowContext workflowContext, Optional<TaskContext> parentContext, WorkflowModel input) {
191194
TaskContext taskContext = new TaskContext(input, position, parentContext, taskName, task);
195+
workflowContext.instance().restoreContext(workflowContext, taskContext);
192196
CompletableFuture<TaskContext> completable = CompletableFuture.completedFuture(taskContext);
193197
if (!TaskExecutorHelper.isActive(workflowContext)) {
194198
return completable;
195-
}
196-
if (ifFilter.map(f -> f.test(workflowContext, taskContext, input)).orElse(true)) {
199+
} else if (taskContext.isCompleted()) {
200+
return executeNext(completable, workflowContext);
201+
} else if (ifFilter.map(f -> f.test(workflowContext, taskContext, input)).orElse(true)) {
197202
return executeNext(
198203
completable
199204
.thenCompose(workflowContext.instance()::suspendedCheck)
@@ -256,6 +261,10 @@ private void handleException(
256261
}
257262
}
258263

264+
public WorkflowPosition position() {
265+
return position;
266+
}
267+
259268
protected abstract TransitionInfo getSkipTransition();
260269

261270
protected abstract CompletableFuture<TaskContext> execute(

impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowCompletedEvent.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,18 @@
1616
package io.serverlessworkflow.impl.lifecycle;
1717

1818
import io.serverlessworkflow.impl.WorkflowContextData;
19+
import io.serverlessworkflow.impl.WorkflowModel;
1920

2021
public class WorkflowCompletedEvent extends WorkflowEvent {
2122

22-
public WorkflowCompletedEvent(WorkflowContextData workflow) {
23+
private WorkflowModel output;
24+
25+
public WorkflowCompletedEvent(WorkflowContextData workflow, WorkflowModel output) {
2326
super(workflow);
27+
this.output = output;
28+
}
29+
30+
public WorkflowModel output() {
31+
return output;
2432
}
2533
}

impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListener.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package io.serverlessworkflow.impl.lifecycle;
1717

18-
public interface WorkflowExecutionListener {
18+
public interface WorkflowExecutionListener extends AutoCloseable {
1919

2020
default void onWorkflowStarted(WorkflowStartedEvent ev) {}
2121

@@ -42,4 +42,7 @@ default void onTaskSuspended(TaskSuspendedEvent ev) {}
4242
default void onTaskResumed(TaskResumedEvent ev) {}
4343

4444
default void onTaskRetried(TaskRetriedEvent ev) {}
45+
46+
@Override
47+
default void close() {}
4548
}

impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,8 @@ public void onWorkflowCompleted(WorkflowCompletedEvent event) {
225225
builder()
226226
.withData(
227227
cloudEventData(
228-
new WorkflowCompletedCEData(id(ev), ref(ev), ev.eventDate(), output(ev)),
228+
new WorkflowCompletedCEData(
229+
id(ev), ref(ev), ev.eventDate(), from(event.output())),
229230
this::convert))
230231
.withType(WORKFLOW_COMPLETED)
231232
.build());
@@ -328,10 +329,6 @@ private static String pos(TaskEvent ev) {
328329
return ev.taskContext().position().jsonPointer();
329330
}
330331

331-
private static Object output(WorkflowEvent ev) {
332-
return from(ev.workflowContext().instanceData().output());
333-
}
334-
335332
private static Object output(TaskEvent ev) {
336333
return from(ev.taskContext().output());
337334
}

0 commit comments

Comments
 (0)