Skip to content

Commit 96ada91

Browse files
committed
[Fix #782] Adding MVStore persistence
Signed-off-by: fjtirado <[email protected]>
1 parent 01f64f8 commit 96ada91

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1154
-265
lines changed

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaForExecutorBuilder.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,23 @@
1818
import static io.serverlessworkflow.impl.executors.func.JavaFuncUtils.safeObject;
1919

2020
import io.serverlessworkflow.api.types.ForTask;
21-
import io.serverlessworkflow.api.types.Workflow;
2221
import io.serverlessworkflow.api.types.func.ForTaskFunction;
2322
import io.serverlessworkflow.api.types.func.LoopPredicateIndex;
2423
import io.serverlessworkflow.api.types.func.TypedFunction;
25-
import io.serverlessworkflow.impl.WorkflowApplication;
24+
import io.serverlessworkflow.impl.WorkflowDefinition;
2625
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2726
import io.serverlessworkflow.impl.WorkflowPredicate;
2827
import io.serverlessworkflow.impl.WorkflowValueResolver;
2928
import io.serverlessworkflow.impl.executors.ForExecutor.ForExecutorBuilder;
3029
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
31-
import io.serverlessworkflow.impl.resources.ResourceLoader;
3230
import java.util.Collection;
3331
import java.util.Optional;
3432

3533
public class JavaForExecutorBuilder extends ForExecutorBuilder {
3634

3735
protected JavaForExecutorBuilder(
38-
WorkflowMutablePosition position,
39-
ForTask task,
40-
Workflow workflow,
41-
WorkflowApplication application,
42-
ResourceLoader resourceLoader) {
43-
super(position, task, workflow, application, resourceLoader);
36+
WorkflowMutablePosition position, ForTask task, WorkflowDefinition definition) {
37+
super(position, task, definition);
4438
}
4539

4640
@Override

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaListenExecutorBuilder.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,18 @@
1919

2020
import io.serverlessworkflow.api.types.ListenTask;
2121
import io.serverlessworkflow.api.types.Until;
22-
import io.serverlessworkflow.api.types.Workflow;
2322
import io.serverlessworkflow.api.types.func.UntilPredicate;
24-
import io.serverlessworkflow.impl.WorkflowApplication;
23+
import io.serverlessworkflow.impl.WorkflowDefinition;
2524
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2625
import io.serverlessworkflow.impl.WorkflowPredicate;
2726
import io.serverlessworkflow.impl.executors.ListenExecutor.ListenExecutorBuilder;
2827
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
29-
import io.serverlessworkflow.impl.resources.ResourceLoader;
3028

3129
public class JavaListenExecutorBuilder extends ListenExecutorBuilder {
3230

3331
protected JavaListenExecutorBuilder(
34-
WorkflowMutablePosition position,
35-
ListenTask task,
36-
Workflow workflow,
37-
WorkflowApplication application,
38-
ResourceLoader resourceLoader) {
39-
super(position, task, workflow, application, resourceLoader);
32+
WorkflowMutablePosition position, ListenTask task, WorkflowDefinition definition) {
33+
super(position, task, definition);
4034
}
4135

4236
@Override

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaSwitchExecutorBuilder.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,19 @@
1919

2020
import io.serverlessworkflow.api.types.SwitchCase;
2121
import io.serverlessworkflow.api.types.SwitchTask;
22-
import io.serverlessworkflow.api.types.Workflow;
2322
import io.serverlessworkflow.api.types.func.SwitchCaseFunction;
24-
import io.serverlessworkflow.impl.WorkflowApplication;
23+
import io.serverlessworkflow.impl.WorkflowDefinition;
2524
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2625
import io.serverlessworkflow.impl.WorkflowPredicate;
2726
import io.serverlessworkflow.impl.executors.SwitchExecutor.SwitchExecutorBuilder;
2827
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
29-
import io.serverlessworkflow.impl.resources.ResourceLoader;
3028
import java.util.Optional;
3129

3230
public class JavaSwitchExecutorBuilder extends SwitchExecutorBuilder {
3331

3432
protected JavaSwitchExecutorBuilder(
35-
WorkflowMutablePosition position,
36-
SwitchTask task,
37-
Workflow workflow,
38-
WorkflowApplication application,
39-
ResourceLoader resourceLoader) {
40-
super(position, task, workflow, application, resourceLoader);
33+
WorkflowMutablePosition position, SwitchTask task, WorkflowDefinition definition) {
34+
super(position, task, definition);
4135
}
4236

4337
@Override

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaTaskExecutorFactory.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,32 +17,23 @@
1717

1818
import io.serverlessworkflow.api.types.Task;
1919
import io.serverlessworkflow.api.types.TaskBase;
20-
import io.serverlessworkflow.api.types.Workflow;
21-
import io.serverlessworkflow.impl.WorkflowApplication;
20+
import io.serverlessworkflow.impl.WorkflowDefinition;
2221
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2322
import io.serverlessworkflow.impl.executors.DefaultTaskExecutorFactory;
2423
import io.serverlessworkflow.impl.executors.TaskExecutorBuilder;
25-
import io.serverlessworkflow.impl.resources.ResourceLoader;
2624

2725
public class JavaTaskExecutorFactory extends DefaultTaskExecutorFactory {
2826

2927
public TaskExecutorBuilder<? extends TaskBase> getTaskExecutor(
30-
WorkflowMutablePosition position,
31-
Task task,
32-
Workflow workflow,
33-
WorkflowApplication application,
34-
ResourceLoader resourceLoader) {
28+
WorkflowMutablePosition position, Task task, WorkflowDefinition definition) {
3529
if (task.getForTask() != null) {
36-
return new JavaForExecutorBuilder(
37-
position, task.getForTask(), workflow, application, resourceLoader);
30+
return new JavaForExecutorBuilder(position, task.getForTask(), definition);
3831
} else if (task.getSwitchTask() != null) {
39-
return new JavaSwitchExecutorBuilder(
40-
position, task.getSwitchTask(), workflow, application, resourceLoader);
32+
return new JavaSwitchExecutorBuilder(position, task.getSwitchTask(), definition);
4133
} else if (task.getListenTask() != null) {
42-
return new JavaListenExecutorBuilder(
43-
position, task.getListenTask(), workflow, application, resourceLoader);
34+
return new JavaListenExecutorBuilder(position, task.getListenTask(), definition);
4435
} else {
45-
return super.getTaskExecutor(position, task, workflow, application, resourceLoader);
36+
return super.getTaskExecutor(position, task, definition);
4637
}
4738
}
4839
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class WorkflowApplication implements AutoCloseable {
6161
private final Collection<EventPublisher> eventPublishers;
6262
private final boolean lifeCycleCEPublishingEnabled;
6363

64-
private WorkflowApplication(Builder builder) {
64+
protected WorkflowApplication(Builder builder) {
6565
this.taskFactory = builder.taskFactory;
6666
this.exprFactory = builder.exprFactory;
6767
this.resourceLoaderFactory = builder.resourceLoaderFactory;
@@ -149,7 +149,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
149149
() -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap());
150150
private boolean lifeCycleCEPublishingEnabled = true;
151151

152-
private Builder() {}
152+
protected Builder() {}
153153

154154
public Builder withListener(WorkflowExecutionListener listener) {
155155
listeners.add(listener);
@@ -254,7 +254,11 @@ public Map<WorkflowDefinitionId, WorkflowDefinition> workflowDefinitions() {
254254

255255
public WorkflowDefinition workflowDefinition(Workflow workflow) {
256256
return definitions.computeIfAbsent(
257-
WorkflowDefinitionId.of(workflow), k -> WorkflowDefinition.of(this, workflow));
257+
WorkflowDefinitionId.of(workflow), k -> createDefinition(workflow));
258+
}
259+
260+
protected WorkflowDefinition createDefinition(Workflow workflow) {
261+
return WorkflowDefinition.of(this, workflow);
258262
}
259263

260264
@Override

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,13 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData
3636
private Optional<WorkflowFilter> outputFilter = Optional.empty();
3737
private final WorkflowApplication application;
3838
private final TaskExecutor<?> taskExecutor;
39+
private ResourceLoader resourceLoader;
3940

4041
private WorkflowDefinition(
4142
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
4243
this.workflow = workflow;
4344
this.application = application;
45+
this.resourceLoader = resourceLoader;
4446
if (workflow.getInput() != null) {
4547
Input input = workflow.getInput();
4648
this.inputSchemaValidator =
@@ -55,11 +57,7 @@ private WorkflowDefinition(
5557
}
5658
this.taskExecutor =
5759
TaskExecutorHelper.createExecutorList(
58-
application.positionFactory().get(),
59-
workflow.getDo(),
60-
workflow,
61-
application,
62-
resourceLoader);
60+
application.positionFactory().get(), workflow.getDo(), this);
6361
}
6462

6563
static WorkflowDefinition of(WorkflowApplication application, Workflow workflow) {
@@ -72,7 +70,10 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow,
7270
}
7371

7472
public WorkflowInstance instance(Object input) {
75-
return new WorkflowMutableInstance(this, application.modelFactory().fromAny(input));
73+
WorkflowModel inputModel = application.modelFactory().fromAny(input);
74+
inputSchemaValidator().ifPresent(v -> v.validate(inputModel));
75+
return new WorkflowMutableInstance(
76+
this, application().idFactory().get(), inputModel, WorkflowStatus.PENDING);
7677
}
7778

7879
Optional<SchemaValidator> inputSchemaValidator() {
@@ -106,7 +107,9 @@ public WorkflowApplication application() {
106107
}
107108

108109
@Override
109-
public void close() {
110-
// TODO close resourcers hold for uncompleted process instances, if any
110+
public void close() {}
111+
112+
public ResourceLoader resourceLoader() {
113+
return resourceLoader;
111114
}
112115
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,18 @@ public class WorkflowMutableInstance implements WorkflowInstance {
4949
private CompletableFuture<WorkflowModel> completableFuture;
5050
private Map<CompletableFuture<TaskContext>, TaskContext> suspended;
5151

52-
WorkflowMutableInstance(WorkflowDefinition definition, WorkflowModel input) {
53-
this.id = definition.application().idFactory().get();
52+
public WorkflowMutableInstance(
53+
WorkflowDefinition definition, String id, WorkflowModel input, WorkflowStatus status) {
54+
this.id = id;
5455
this.input = input;
55-
this.status = new AtomicReference<>(WorkflowStatus.PENDING);
56-
definition.inputSchemaValidator().ifPresent(v -> v.validate(input));
56+
this.status = new AtomicReference<>(status);
5757
this.workflowContext = new WorkflowContext(definition, this);
5858
}
5959

6060
@Override
6161
public CompletableFuture<WorkflowModel> start() {
62-
this.startedAt = Instant.now();
63-
this.status.set(WorkflowStatus.RUNNING);
62+
startedAt = Instant.now();
63+
status.set(WorkflowStatus.RUNNING);
6464
publishEvent(
6565
workflowContext, l -> l.onWorkflowStarted(new WorkflowStartedEvent(workflowContext)));
6666
this.completableFuture =
@@ -139,12 +139,14 @@ public WorkflowModel output() {
139139

140140
@Override
141141
public <T> T outputAs(Class<T> clazz) {
142-
return output
143-
.as(clazz)
144-
.orElseThrow(
145-
() ->
146-
new IllegalArgumentException(
147-
"Output " + output + " cannot be converted to class " + clazz));
142+
return output != null
143+
? output
144+
.as(clazz)
145+
.orElseThrow(
146+
() ->
147+
new IllegalArgumentException(
148+
"Output " + output + " cannot be converted to class " + clazz))
149+
: null;
148150
}
149151

150152
public void status(WorkflowStatus state) {
@@ -234,6 +236,13 @@ public CompletableFuture<TaskContext> suspendedCheck(TaskContext t) {
234236
return CompletableFuture.completedFuture(t);
235237
}
236238

239+
// internal purposes only, not to be invoked directly by users of the API
240+
public void restore(
241+
WorkflowPosition position, WorkflowModel model, WorkflowModel context, Instant startedAt) {
242+
this.startedAt = startedAt;
243+
workflowContext.context(context);
244+
}
245+
237246
@Override
238247
public boolean cancel() {
239248
try {

impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.serverlessworkflow.impl.TaskContext;
2929
import io.serverlessworkflow.impl.WorkflowApplication;
3030
import io.serverlessworkflow.impl.WorkflowContext;
31+
import io.serverlessworkflow.impl.WorkflowDefinition;
3132
import io.serverlessworkflow.impl.WorkflowFilter;
3233
import io.serverlessworkflow.impl.WorkflowModel;
3334
import io.serverlessworkflow.impl.WorkflowMutablePosition;
@@ -81,17 +82,13 @@ public abstract static class AbstractTaskExecutorBuilder<
8182
private V instance;
8283

8384
protected AbstractTaskExecutorBuilder(
84-
WorkflowMutablePosition position,
85-
T task,
86-
Workflow workflow,
87-
WorkflowApplication application,
88-
ResourceLoader resourceLoader) {
89-
this.workflow = workflow;
85+
WorkflowMutablePosition position, T task, WorkflowDefinition definition) {
86+
this.workflow = definition.workflow();
9087
this.taskName = position.last().toString();
9188
this.position = position;
9289
this.task = task;
93-
this.application = application;
94-
this.resourceLoader = resourceLoader;
90+
this.application = definition.application();
91+
this.resourceLoader = definition.resourceLoader();
9592
if (task.getInput() != null) {
9693
Input input = task.getInput();
9794
this.inputProcessor = buildWorkflowFilter(application, input.getFrom());
@@ -174,16 +171,18 @@ protected AbstractTaskExecutor(AbstractTaskExecutorBuilder<T, ?> builder) {
174171

175172
protected final CompletableFuture<TaskContext> executeNext(
176173
CompletableFuture<TaskContext> future, WorkflowContext workflow) {
177-
return future.thenCompose(
178-
t -> {
179-
TransitionInfo transition = t.transition();
180-
if (transition.isEndNode()) {
181-
workflow.instance().status(WorkflowStatus.COMPLETED);
182-
} else if (transition.next() != null) {
183-
return transition.next().apply(workflow, t.parent(), t.output());
184-
}
185-
return CompletableFuture.completedFuture(t);
186-
});
174+
return future.thenCompose(t -> executeNext(workflow, t));
175+
}
176+
177+
private CompletableFuture<TaskContext> executeNext(
178+
WorkflowContext workflow, TaskContext taskContext) {
179+
TransitionInfo transition = taskContext.transition();
180+
if (transition.isEndNode()) {
181+
workflow.instance().status(WorkflowStatus.COMPLETED);
182+
} else if (transition.next() != null) {
183+
return transition.next().apply(workflow, taskContext.parent(), taskContext.output());
184+
}
185+
return CompletableFuture.completedFuture(taskContext);
187186
}
188187

189188
@Override

impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallTaskExecutor.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,11 @@
1616
package io.serverlessworkflow.impl.executors;
1717

1818
import io.serverlessworkflow.api.types.TaskBase;
19-
import io.serverlessworkflow.api.types.Workflow;
2019
import io.serverlessworkflow.impl.TaskContext;
21-
import io.serverlessworkflow.impl.WorkflowApplication;
2220
import io.serverlessworkflow.impl.WorkflowContext;
21+
import io.serverlessworkflow.impl.WorkflowDefinition;
2322
import io.serverlessworkflow.impl.WorkflowModel;
2423
import io.serverlessworkflow.impl.WorkflowMutablePosition;
25-
import io.serverlessworkflow.impl.resources.ResourceLoader;
2624
import java.util.concurrent.CompletableFuture;
2725

2826
public class CallTaskExecutor<T extends TaskBase> extends RegularTaskExecutor<T> {
@@ -36,11 +34,9 @@ public static class CallTaskExecutorBuilder<T extends TaskBase>
3634
protected CallTaskExecutorBuilder(
3735
WorkflowMutablePosition position,
3836
T task,
39-
Workflow workflow,
40-
WorkflowApplication application,
41-
ResourceLoader resourceLoader,
37+
WorkflowDefinition definition,
4238
CallableTask<T> callable) {
43-
super(position, task, workflow, application, resourceLoader);
39+
super(position, task, definition);
4440
this.callable = callable;
4541
callable.init(task, workflow, application, resourceLoader);
4642
}

0 commit comments

Comments
 (0)