Skip to content

Commit cc0cb1a

Browse files
committed
[Fix #782] Adding MVStore persistence
Signed-off-by: fjtirado <[email protected]>
1 parent 37bd454 commit cc0cb1a

23 files changed

+1051
-19
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class WorkflowApplication implements AutoCloseable {
6161
private final Collection<EventPublisher> eventPublishers;
6262
private final boolean lifeCycleCEPublishingEnabled;
6363

64-
private WorkflowApplication(Builder builder) {
64+
protected WorkflowApplication(Builder builder) {
6565
this.taskFactory = builder.taskFactory;
6666
this.exprFactory = builder.exprFactory;
6767
this.resourceLoaderFactory = builder.resourceLoaderFactory;
@@ -149,7 +149,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
149149
() -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap());
150150
private boolean lifeCycleCEPublishingEnabled = true;
151151

152-
private Builder() {}
152+
protected Builder() {}
153153

154154
public Builder withListener(WorkflowExecutionListener listener) {
155155
listeners.add(listener);
@@ -254,7 +254,11 @@ public Map<WorkflowDefinitionId, WorkflowDefinition> workflowDefinitions() {
254254

255255
public WorkflowDefinition workflowDefinition(Workflow workflow) {
256256
return definitions.computeIfAbsent(
257-
WorkflowDefinitionId.of(workflow), k -> WorkflowDefinition.of(this, workflow));
257+
WorkflowDefinitionId.of(workflow), k -> createDefinition(workflow));
258+
}
259+
260+
protected WorkflowDefinition createDefinition(Workflow workflow) {
261+
return WorkflowDefinition.of(this, workflow);
258262
}
259263

260264
@Override

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,10 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow,
7070
}
7171

7272
public WorkflowInstance instance(Object input) {
73-
return new WorkflowMutableInstance(this, application.modelFactory().fromAny(input));
73+
WorkflowModel inputModel = application.modelFactory().fromAny(input);
74+
inputSchemaValidator().ifPresent(v -> v.validate(inputModel));
75+
return new WorkflowMutableInstance(
76+
this, application().idFactory().get(), inputModel, WorkflowStatus.PENDING);
7477
}
7578

7679
Optional<SchemaValidator> inputSchemaValidator() {
@@ -108,7 +111,5 @@ public ResourceLoader resourceLoader() {
108111
}
109112

110113
@Override
111-
public void close() {
112-
// TODO close resourcers hold for uncompleted process instances, if any
113-
}
114+
public void close() {}
114115
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,18 @@ public class WorkflowMutableInstance implements WorkflowInstance {
4949
private CompletableFuture<WorkflowModel> completableFuture;
5050
private Map<CompletableFuture<TaskContext>, TaskContext> suspended;
5151

52-
WorkflowMutableInstance(WorkflowDefinition definition, WorkflowModel input) {
53-
this.id = definition.application().idFactory().get();
52+
public WorkflowMutableInstance(
53+
WorkflowDefinition definition, String id, WorkflowModel input, WorkflowStatus status) {
54+
this.id = id;
5455
this.input = input;
55-
this.status = new AtomicReference<>(WorkflowStatus.PENDING);
56-
definition.inputSchemaValidator().ifPresent(v -> v.validate(input));
56+
this.status = new AtomicReference<>(status);
5757
this.workflowContext = new WorkflowContext(definition, this);
5858
}
5959

6060
@Override
6161
public CompletableFuture<WorkflowModel> start() {
62-
this.startedAt = Instant.now();
63-
this.status.set(WorkflowStatus.RUNNING);
62+
startedAt = Instant.now();
63+
status.set(WorkflowStatus.RUNNING);
6464
publishEvent(
6565
workflowContext, l -> l.onWorkflowStarted(new WorkflowStartedEvent(workflowContext)));
6666
this.completableFuture =
@@ -139,12 +139,14 @@ public WorkflowModel output() {
139139

140140
@Override
141141
public <T> T outputAs(Class<T> clazz) {
142-
return output
143-
.as(clazz)
144-
.orElseThrow(
145-
() ->
146-
new IllegalArgumentException(
147-
"Output " + output + " cannot be converted to class " + clazz));
142+
return output != null
143+
? output
144+
.as(clazz)
145+
.orElseThrow(
146+
() ->
147+
new IllegalArgumentException(
148+
"Output " + output + " cannot be converted to class " + clazz))
149+
: null;
148150
}
149151

150152
public void status(WorkflowStatus state) {
@@ -234,6 +236,13 @@ public CompletableFuture<TaskContext> suspendedCheck(TaskContext t) {
234236
return CompletableFuture.completedFuture(t);
235237
}
236238

239+
// internal purposes only, not to be invoked directly by users of the API
240+
public void restore(
241+
WorkflowPosition position, WorkflowModel model, WorkflowModel context, Instant startedAt) {
242+
this.startedAt = startedAt;
243+
workflowContext.context(context);
244+
}
245+
237246
@Override
238247
public boolean cancel() {
239248
try {

impl/persistence/api/pom.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
2+
<modelVersion>4.0.0</modelVersion>
3+
<parent>
4+
<groupId>io.serverlessworkflow</groupId>
5+
<artifactId>serverlessworkflow-persistence</artifactId>
6+
<version>8.0.0-SNAPSHOT</version>
7+
</parent>
8+
<artifactId>serverlessworkflow-persistence-api</artifactId>
9+
<name>Serverless Workflow :: Impl :: Pesistence:: API</name>
10+
<dependencies>
11+
<dependency>
12+
<groupId>io.serverlessworkflow</groupId>
13+
<artifactId>serverlessworkflow-impl-core</artifactId>
14+
</dependency>
15+
</dependencies>
16+
</project>
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import io.serverlessworkflow.impl.WorkflowDefinition;
19+
import io.serverlessworkflow.impl.WorkflowInstance;
20+
import java.util.Optional;
21+
22+
public interface WorkflowIdPersistentReader extends WorkflowMinimumPersistenceReader {
23+
24+
/**
25+
* Allow recovering by process instance id
26+
*
27+
* @param workflowInstanceId
28+
* @return
29+
*/
30+
Optional<WorkflowInstance> findById(WorkflowDefinition definition, String workflowInstanceId);
31+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import io.serverlessworkflow.impl.WorkflowDefinition;
19+
import io.serverlessworkflow.impl.WorkflowInstance;
20+
import java.util.stream.Stream;
21+
22+
public interface WorkflowMinimumPersistenceReader extends AutoCloseable {
23+
24+
/**
25+
* Allow streaming over all stored workflow instances for a certain definition
26+
*
27+
* @return
28+
*/
29+
Stream<WorkflowInstance> all(WorkflowDefinition definition);
30+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import io.serverlessworkflow.api.types.Workflow;
19+
import io.serverlessworkflow.impl.WorkflowApplication;
20+
import io.serverlessworkflow.impl.WorkflowDefinition;
21+
import io.serverlessworkflow.impl.WorkflowInstance;
22+
23+
public class WorkflowPersistenceApplication<T extends WorkflowMinimumPersistenceReader>
24+
extends WorkflowApplication {
25+
26+
private final T reader;
27+
private final WorkflowPersistenceWriter writer;
28+
private final boolean resumeAfterReboot;
29+
30+
protected WorkflowPersistenceApplication(Builder<T> builder) {
31+
super(builder);
32+
this.reader = builder.reader;
33+
this.writer = builder.writer;
34+
this.resumeAfterReboot = builder.resumeAfterReboot;
35+
}
36+
37+
public T persitenceReader() {
38+
return reader;
39+
}
40+
41+
public void close() {
42+
super.close();
43+
try {
44+
reader.close();
45+
} catch (Exception e) {
46+
}
47+
try {
48+
writer.close();
49+
} catch (Exception e) {
50+
}
51+
}
52+
53+
public static <T extends WorkflowMinimumPersistenceReader> Builder<T> builder(
54+
WorkflowPersistenceWriter writer, T reader) {
55+
return new Builder<>(writer, reader);
56+
}
57+
58+
public static class Builder<T extends WorkflowMinimumPersistenceReader>
59+
extends io.serverlessworkflow.impl.WorkflowApplication.Builder {
60+
61+
private final WorkflowPersistenceWriter writer;
62+
private final T reader;
63+
private boolean resumeAfterReboot = true;
64+
65+
protected Builder(WorkflowPersistenceWriter writer, T reader) {
66+
this.writer = writer;
67+
this.reader = reader;
68+
super.withListener(new WorkflowPersistenceListener(writer));
69+
}
70+
71+
public Builder<T> resumeAfterReboot(boolean resumeAfterReboot) {
72+
this.resumeAfterReboot = resumeAfterReboot;
73+
return this;
74+
}
75+
76+
public WorkflowPersistenceApplication<T> build() {
77+
return new WorkflowPersistenceApplication<>(this);
78+
}
79+
}
80+
81+
protected WorkflowDefinition createDefinition(Workflow workflow) {
82+
WorkflowDefinition definition = super.createDefinition(workflow);
83+
if (resumeAfterReboot) {
84+
reader.all(definition).forEach(WorkflowInstance::resume);
85+
}
86+
return definition;
87+
}
88+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent;
19+
import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent;
20+
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
21+
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
22+
import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent;
23+
import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent;
24+
import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent;
25+
26+
public class WorkflowPersistenceListener implements WorkflowExecutionListener {
27+
28+
private final WorkflowPersistenceWriter persistenceStore;
29+
30+
public WorkflowPersistenceListener(WorkflowPersistenceWriter persistenceStore) {
31+
this.persistenceStore = persistenceStore;
32+
}
33+
34+
@Override
35+
public void onWorkflowStarted(WorkflowStartedEvent ev) {
36+
persistenceStore.started(ev.workflowContext());
37+
}
38+
39+
@Override
40+
public void onWorkflowFailed(WorkflowFailedEvent ev) {
41+
persistenceStore.failed(ev.workflowContext(), ev.cause());
42+
}
43+
44+
@Override
45+
public void onWorkflowCancelled(WorkflowCancelledEvent ev) {
46+
persistenceStore.aborted(ev.workflowContext());
47+
}
48+
49+
@Override
50+
public void onWorkflowSuspended(WorkflowSuspendedEvent ev) {
51+
persistenceStore.suspended(ev.workflowContext());
52+
}
53+
54+
@Override
55+
public void onWorkflowResumed(WorkflowResumedEvent ev) {
56+
persistenceStore.resumed(ev.workflowContext());
57+
}
58+
59+
@Override
60+
public void onTaskStarted(TaskStartedEvent ev) {
61+
persistenceStore.updated(ev.workflowContext(), ev.taskContext());
62+
}
63+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import io.serverlessworkflow.impl.TaskContextData;
19+
import io.serverlessworkflow.impl.WorkflowContextData;
20+
21+
public interface WorkflowPersistenceWriter extends AutoCloseable {
22+
23+
void started(WorkflowContextData workflowContext);
24+
25+
void completed(WorkflowContextData workflowContext);
26+
27+
void failed(WorkflowContextData workflowContext, Throwable ex);
28+
29+
void aborted(WorkflowContextData workflowContext);
30+
31+
void suspended(WorkflowContextData workflowContext);
32+
33+
void resumed(WorkflowContextData workflowContext);
34+
35+
void updated(WorkflowContextData workflowContext, TaskContextData taskContext);
36+
}

0 commit comments

Comments
 (0)