Skip to content

Commit 2247fa4

Browse files
committed
[Fix #782] Adding MVStore persistence
Signed-off-by: fjtirado <[email protected]>
1 parent 7d4003d commit 2247fa4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1174
-274
lines changed

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaForExecutorBuilder.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,23 @@
1818
import static io.serverlessworkflow.impl.executors.func.JavaFuncUtils.safeObject;
1919

2020
import io.serverlessworkflow.api.types.ForTask;
21-
import io.serverlessworkflow.api.types.Workflow;
2221
import io.serverlessworkflow.api.types.func.ForTaskFunction;
2322
import io.serverlessworkflow.api.types.func.LoopPredicateIndex;
2423
import io.serverlessworkflow.api.types.func.TypedFunction;
25-
import io.serverlessworkflow.impl.WorkflowApplication;
24+
import io.serverlessworkflow.impl.WorkflowDefinition;
2625
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2726
import io.serverlessworkflow.impl.WorkflowPredicate;
2827
import io.serverlessworkflow.impl.WorkflowValueResolver;
2928
import io.serverlessworkflow.impl.executors.ForExecutor.ForExecutorBuilder;
3029
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
31-
import io.serverlessworkflow.impl.resources.ResourceLoader;
3230
import java.util.Collection;
3331
import java.util.Optional;
3432

3533
public class JavaForExecutorBuilder extends ForExecutorBuilder {
3634

3735
protected JavaForExecutorBuilder(
38-
WorkflowMutablePosition position,
39-
ForTask task,
40-
Workflow workflow,
41-
WorkflowApplication application,
42-
ResourceLoader resourceLoader) {
43-
super(position, task, workflow, application, resourceLoader);
36+
WorkflowMutablePosition position, ForTask task, WorkflowDefinition definition) {
37+
super(position, task, definition);
4438
}
4539

4640
@Override

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaListenExecutorBuilder.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,18 @@
1919

2020
import io.serverlessworkflow.api.types.ListenTask;
2121
import io.serverlessworkflow.api.types.Until;
22-
import io.serverlessworkflow.api.types.Workflow;
2322
import io.serverlessworkflow.api.types.func.UntilPredicate;
24-
import io.serverlessworkflow.impl.WorkflowApplication;
23+
import io.serverlessworkflow.impl.WorkflowDefinition;
2524
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2625
import io.serverlessworkflow.impl.WorkflowPredicate;
2726
import io.serverlessworkflow.impl.executors.ListenExecutor.ListenExecutorBuilder;
2827
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
29-
import io.serverlessworkflow.impl.resources.ResourceLoader;
3028

3129
public class JavaListenExecutorBuilder extends ListenExecutorBuilder {
3230

3331
protected JavaListenExecutorBuilder(
34-
WorkflowMutablePosition position,
35-
ListenTask task,
36-
Workflow workflow,
37-
WorkflowApplication application,
38-
ResourceLoader resourceLoader) {
39-
super(position, task, workflow, application, resourceLoader);
32+
WorkflowMutablePosition position, ListenTask task, WorkflowDefinition definition) {
33+
super(position, task, definition);
4034
}
4135

4236
@Override

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaSwitchExecutorBuilder.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,19 @@
1919

2020
import io.serverlessworkflow.api.types.SwitchCase;
2121
import io.serverlessworkflow.api.types.SwitchTask;
22-
import io.serverlessworkflow.api.types.Workflow;
2322
import io.serverlessworkflow.api.types.func.SwitchCaseFunction;
24-
import io.serverlessworkflow.impl.WorkflowApplication;
23+
import io.serverlessworkflow.impl.WorkflowDefinition;
2524
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2625
import io.serverlessworkflow.impl.WorkflowPredicate;
2726
import io.serverlessworkflow.impl.executors.SwitchExecutor.SwitchExecutorBuilder;
2827
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
29-
import io.serverlessworkflow.impl.resources.ResourceLoader;
3028
import java.util.Optional;
3129

3230
public class JavaSwitchExecutorBuilder extends SwitchExecutorBuilder {
3331

3432
protected JavaSwitchExecutorBuilder(
35-
WorkflowMutablePosition position,
36-
SwitchTask task,
37-
Workflow workflow,
38-
WorkflowApplication application,
39-
ResourceLoader resourceLoader) {
40-
super(position, task, workflow, application, resourceLoader);
33+
WorkflowMutablePosition position, SwitchTask task, WorkflowDefinition definition) {
34+
super(position, task, definition);
4135
}
4236

4337
@Override

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaTaskExecutorFactory.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,32 +17,23 @@
1717

1818
import io.serverlessworkflow.api.types.Task;
1919
import io.serverlessworkflow.api.types.TaskBase;
20-
import io.serverlessworkflow.api.types.Workflow;
21-
import io.serverlessworkflow.impl.WorkflowApplication;
20+
import io.serverlessworkflow.impl.WorkflowDefinition;
2221
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2322
import io.serverlessworkflow.impl.executors.DefaultTaskExecutorFactory;
2423
import io.serverlessworkflow.impl.executors.TaskExecutorBuilder;
25-
import io.serverlessworkflow.impl.resources.ResourceLoader;
2624

2725
public class JavaTaskExecutorFactory extends DefaultTaskExecutorFactory {
2826

2927
public TaskExecutorBuilder<? extends TaskBase> getTaskExecutor(
30-
WorkflowMutablePosition position,
31-
Task task,
32-
Workflow workflow,
33-
WorkflowApplication application,
34-
ResourceLoader resourceLoader) {
28+
WorkflowMutablePosition position, Task task, WorkflowDefinition definition) {
3529
if (task.getForTask() != null) {
36-
return new JavaForExecutorBuilder(
37-
position, task.getForTask(), workflow, application, resourceLoader);
30+
return new JavaForExecutorBuilder(position, task.getForTask(), definition);
3831
} else if (task.getSwitchTask() != null) {
39-
return new JavaSwitchExecutorBuilder(
40-
position, task.getSwitchTask(), workflow, application, resourceLoader);
32+
return new JavaSwitchExecutorBuilder(position, task.getSwitchTask(), definition);
4133
} else if (task.getListenTask() != null) {
42-
return new JavaListenExecutorBuilder(
43-
position, task.getListenTask(), workflow, application, resourceLoader);
34+
return new JavaListenExecutorBuilder(position, task.getListenTask(), definition);
4435
} else {
45-
return super.getTaskExecutor(position, task, workflow, application, resourceLoader);
36+
return super.getTaskExecutor(position, task, definition);
4637
}
4738
}
4839
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class WorkflowApplication implements AutoCloseable {
6161
private final Collection<EventPublisher> eventPublishers;
6262
private final boolean lifeCycleCEPublishingEnabled;
6363

64-
private WorkflowApplication(Builder builder) {
64+
protected WorkflowApplication(Builder builder) {
6565
this.taskFactory = builder.taskFactory;
6666
this.exprFactory = builder.exprFactory;
6767
this.resourceLoaderFactory = builder.resourceLoaderFactory;
@@ -149,7 +149,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
149149
() -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap());
150150
private boolean lifeCycleCEPublishingEnabled = true;
151151

152-
private Builder() {}
152+
protected Builder() {}
153153

154154
public Builder withListener(WorkflowExecutionListener listener) {
155155
listeners.add(listener);
@@ -254,7 +254,11 @@ public Map<WorkflowDefinitionId, WorkflowDefinition> workflowDefinitions() {
254254

255255
public WorkflowDefinition workflowDefinition(Workflow workflow) {
256256
return definitions.computeIfAbsent(
257-
WorkflowDefinitionId.of(workflow), k -> WorkflowDefinition.of(this, workflow));
257+
WorkflowDefinitionId.of(workflow), k -> createDefinition(workflow));
258+
}
259+
260+
protected WorkflowDefinition createDefinition(Workflow workflow) {
261+
return WorkflowDefinition.of(this, workflow);
258262
}
259263

260264
@Override

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,13 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData
3636
private Optional<WorkflowFilter> outputFilter = Optional.empty();
3737
private final WorkflowApplication application;
3838
private final TaskExecutor<?> taskExecutor;
39+
private ResourceLoader resourceLoader;
3940

4041
private WorkflowDefinition(
4142
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
4243
this.workflow = workflow;
4344
this.application = application;
45+
this.resourceLoader = resourceLoader;
4446
if (workflow.getInput() != null) {
4547
Input input = workflow.getInput();
4648
this.inputSchemaValidator =
@@ -55,11 +57,7 @@ private WorkflowDefinition(
5557
}
5658
this.taskExecutor =
5759
TaskExecutorHelper.createExecutorList(
58-
application.positionFactory().get(),
59-
workflow.getDo(),
60-
workflow,
61-
application,
62-
resourceLoader);
60+
application.positionFactory().get(), workflow.getDo(), this);
6361
}
6462

6563
static WorkflowDefinition of(WorkflowApplication application, Workflow workflow) {
@@ -72,7 +70,10 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow,
7270
}
7371

7472
public WorkflowInstance instance(Object input) {
75-
return new WorkflowMutableInstance(this, application.modelFactory().fromAny(input));
73+
WorkflowModel inputModel = application.modelFactory().fromAny(input);
74+
inputSchemaValidator().ifPresent(v -> v.validate(inputModel));
75+
return new WorkflowMutableInstance(
76+
this, application().idFactory().get(), inputModel, WorkflowStatus.PENDING);
7677
}
7778

7879
Optional<SchemaValidator> inputSchemaValidator() {
@@ -106,7 +107,9 @@ public WorkflowApplication application() {
106107
}
107108

108109
@Override
109-
public void close() {
110-
// TODO close resourcers hold for uncompleted process instances, if any
110+
public void close() {}
111+
112+
public ResourceLoader resourceLoader() {
113+
return resourceLoader;
111114
}
112115
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,18 @@ public class WorkflowMutableInstance implements WorkflowInstance {
5151
private TaskContext suspendedTask;
5252
private CompletableFuture<TaskContext> cancelled;
5353

54-
WorkflowMutableInstance(WorkflowDefinition definition, WorkflowModel input) {
55-
this.id = definition.application().idFactory().get();
54+
public WorkflowMutableInstance(
55+
WorkflowDefinition definition, String id, WorkflowModel input, WorkflowStatus status) {
56+
this.id = id;
5657
this.input = input;
57-
this.status = new AtomicReference<>(WorkflowStatus.PENDING);
58-
definition.inputSchemaValidator().ifPresent(v -> v.validate(input));
58+
this.status = new AtomicReference<>(status);
5959
this.workflowContext = new WorkflowContext(definition, this);
6060
}
6161

6262
@Override
6363
public CompletableFuture<WorkflowModel> start() {
64-
this.startedAt = Instant.now();
65-
this.status.set(WorkflowStatus.RUNNING);
64+
startedAt = Instant.now();
65+
status.set(WorkflowStatus.RUNNING);
6666
publishEvent(
6767
workflowContext, l -> l.onWorkflowStarted(new WorkflowStartedEvent(workflowContext)));
6868
this.completableFuture =
@@ -107,7 +107,7 @@ private WorkflowModel whenSuccess(WorkflowModel node) {
107107
.map(f -> f.apply(workflowContext, null, node))
108108
.orElse(node);
109109
workflowContext.definition().outputSchemaValidator().ifPresent(v -> v.validate(output));
110-
status.compareAndSet(WorkflowStatus.RUNNING, WorkflowStatus.COMPLETED);
110+
status.set(WorkflowStatus.COMPLETED);
111111
publishEvent(
112112
workflowContext, l -> l.onWorkflowCompleted(new WorkflowCompletedEvent(workflowContext)));
113113
return output;
@@ -145,12 +145,14 @@ public WorkflowModel output() {
145145

146146
@Override
147147
public <T> T outputAs(Class<T> clazz) {
148-
return output
149-
.as(clazz)
150-
.orElseThrow(
151-
() ->
152-
new IllegalArgumentException(
153-
"Output " + output + " cannot be converted to class " + clazz));
148+
return output != null
149+
? output
150+
.as(clazz)
151+
.orElseThrow(
152+
() ->
153+
new IllegalArgumentException(
154+
"Output " + output + " cannot be converted to class " + clazz))
155+
: null;
154156
}
155157

156158
public void status(WorkflowStatus state) {
@@ -215,15 +217,9 @@ public boolean resume() {
215217
}
216218
}
217219

218-
public CompletableFuture<TaskContext> completedChecks(TaskContext t) {
220+
public CompletableFuture<TaskContext> cancelCheck(TaskContext t) {
219221
try {
220222
statusLock.lock();
221-
if (suspended != null) {
222-
suspendedTask = t;
223-
publishEvent(
224-
workflowContext, l -> l.onTaskSuspended(new TaskSuspendedEvent(workflowContext, t)));
225-
return suspended;
226-
}
227223
if (cancelled != null) {
228224
cancelled = new CompletableFuture<TaskContext>();
229225
workflowContext.instance().status(WorkflowStatus.CANCELLED);
@@ -237,6 +233,29 @@ public CompletableFuture<TaskContext> completedChecks(TaskContext t) {
237233
return CompletableFuture.completedFuture(t);
238234
}
239235

236+
public CompletableFuture<TaskContext> suspendedCheck(TaskContext t) {
237+
try {
238+
statusLock.lock();
239+
if (suspended != null) {
240+
this.suspendedTask = t;
241+
publishEvent(
242+
workflowContext,
243+
l -> l.onTaskSuspended(new TaskSuspendedEvent(workflowContext, suspendedTask)));
244+
return suspended;
245+
}
246+
} finally {
247+
statusLock.unlock();
248+
}
249+
return CompletableFuture.completedFuture(t);
250+
}
251+
252+
// internal purposes only, not to be invoked directly by users of the API
253+
public void restore(
254+
WorkflowPosition position, WorkflowModel model, WorkflowModel context, Instant startedAt) {
255+
this.startedAt = startedAt;
256+
workflowContext.context(context);
257+
}
258+
240259
@Override
241260
public boolean cancel() {
242261
try {

impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.serverlessworkflow.impl.TaskContext;
2929
import io.serverlessworkflow.impl.WorkflowApplication;
3030
import io.serverlessworkflow.impl.WorkflowContext;
31+
import io.serverlessworkflow.impl.WorkflowDefinition;
3132
import io.serverlessworkflow.impl.WorkflowFilter;
3233
import io.serverlessworkflow.impl.WorkflowModel;
3334
import io.serverlessworkflow.impl.WorkflowMutablePosition;
@@ -81,17 +82,13 @@ public abstract static class AbstractTaskExecutorBuilder<
8182
private V instance;
8283

8384
protected AbstractTaskExecutorBuilder(
84-
WorkflowMutablePosition position,
85-
T task,
86-
Workflow workflow,
87-
WorkflowApplication application,
88-
ResourceLoader resourceLoader) {
89-
this.workflow = workflow;
85+
WorkflowMutablePosition position, T task, WorkflowDefinition definition) {
86+
this.workflow = definition.workflow();
9087
this.taskName = position.last().toString();
9188
this.position = position;
9289
this.task = task;
93-
this.application = application;
94-
this.resourceLoader = resourceLoader;
90+
this.application = definition.application();
91+
this.resourceLoader = definition.resourceLoader();
9592
if (task.getInput() != null) {
9693
Input input = task.getInput();
9794
this.inputProcessor = buildWorkflowFilter(application, input.getFrom());
@@ -174,16 +171,18 @@ protected AbstractTaskExecutor(AbstractTaskExecutorBuilder<T, ?> builder) {
174171

175172
protected final CompletableFuture<TaskContext> executeNext(
176173
CompletableFuture<TaskContext> future, WorkflowContext workflow) {
177-
return future.thenCompose(
178-
t -> {
179-
TransitionInfo transition = t.transition();
180-
if (transition.isEndNode()) {
181-
workflow.instance().status(WorkflowStatus.COMPLETED);
182-
} else if (transition.next() != null) {
183-
return transition.next().apply(workflow, t.parent(), t.output());
184-
}
185-
return CompletableFuture.completedFuture(t);
186-
});
174+
return future.thenCompose(t -> executeNext(workflow, t));
175+
}
176+
177+
private CompletableFuture<TaskContext> executeNext(
178+
WorkflowContext workflow, TaskContext taskContext) {
179+
TransitionInfo transition = taskContext.transition();
180+
if (transition.isEndNode()) {
181+
workflow.instance().status(WorkflowStatus.COMPLETED);
182+
} else if (transition.next() != null) {
183+
return transition.next().apply(workflow, taskContext.parent(), taskContext.output());
184+
}
185+
return CompletableFuture.completedFuture(taskContext);
187186
}
188187

189188
@Override
@@ -197,6 +196,7 @@ public CompletableFuture<TaskContext> apply(
197196
if (ifFilter.map(f -> f.test(workflowContext, taskContext, input)).orElse(true)) {
198197
return executeNext(
199198
completable
199+
.thenCompose(t -> workflowContext.instance().suspendedCheck(t))
200200
.thenApply(
201201
t -> {
202202
publishEvent(
@@ -208,7 +208,7 @@ public CompletableFuture<TaskContext> apply(
208208
return t;
209209
})
210210
.thenCompose(t -> execute(workflowContext, t))
211-
.thenCompose(t -> workflowContext.instance().completedChecks(t))
211+
.thenCompose(t -> workflowContext.instance().cancelCheck(t))
212212
.whenComplete(
213213
(t, e) -> {
214214
if (e != null) {

0 commit comments

Comments
 (0)