Skip to content

Commit 785b9ed

Browse files
committed
[Fix #782] Adding transaction support
1 parent 3ab34c0 commit 785b9ed

File tree

9 files changed

+237
-94
lines changed

9 files changed

+237
-94
lines changed

impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceReader.java

Lines changed: 55 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Map;
2727
import java.util.Map.Entry;
2828
import java.util.Optional;
29+
import java.util.function.Function;
2930
import java.util.stream.Collectors;
3031

3132
public abstract class BigMapInstanceReader<V, T, S> implements PersistenceInstanceReader {
@@ -36,43 +37,75 @@ protected BigMapInstanceReader(BigMapInstanceStore<String, V, T, S> store) {
3637
this.store = store;
3738
}
3839

40+
private <Result> Result doTransaction(
41+
Function<BigMapInstanceTransaction<String, V, T, S>, Result> operations) {
42+
BigMapInstanceTransaction<String, V, T, S> transaction = store.begin();
43+
try {
44+
Result result = operations.apply(transaction);
45+
transaction.commit();
46+
return result;
47+
} catch (Exception ex) {
48+
transaction.rollback();
49+
throw ex;
50+
}
51+
}
52+
3953
@Override
4054
public Map<String, WorkflowInstance> readAll(WorkflowDefinition definition) {
41-
Map<String, V> instances = store.instanceData(definition);
42-
Map<String, S> status = store.status(definition);
43-
return instances.entrySet().stream()
44-
.map(
45-
e ->
46-
restore(
47-
definition,
48-
e.getKey(),
49-
e.getValue(),
50-
store.tasks(e.getKey()),
51-
status.get(e.getKey())))
52-
.collect(Collectors.toMap(WorkflowInstance::id, i -> i));
55+
return doTransaction(
56+
t -> {
57+
Map<String, V> instances = t.instanceData(definition);
58+
Map<String, S> status = t.status(definition);
59+
return instances.entrySet().stream()
60+
.map(
61+
e ->
62+
restore(
63+
definition,
64+
e.getKey(),
65+
e.getValue(),
66+
t.tasks(e.getKey()),
67+
status.get(e.getKey())))
68+
.collect(Collectors.toMap(WorkflowInstance::id, i -> i));
69+
});
5370
}
5471

5572
@Override
5673
public Map<String, WorkflowInstance> read(
5774
WorkflowDefinition definition, Collection<String> instanceIds) {
58-
return instanceIds.stream()
59-
.map(id -> read(definition, id))
60-
.flatMap(Optional::stream)
61-
.collect(Collectors.toMap(WorkflowInstance::id, id -> id));
75+
return doTransaction(
76+
t -> {
77+
Map<String, V> instances = t.instanceData(definition);
78+
Map<String, S> status = t.status(definition);
79+
return instanceIds.stream()
80+
.map(id -> read(instances, status, t.tasks(id), definition, id))
81+
.flatMap(Optional::stream)
82+
.collect(Collectors.toMap(WorkflowInstance::id, id -> id));
83+
});
6284
}
6385

6486
@Override
6587
public Optional<WorkflowInstance> read(WorkflowDefinition definition, String instanceId) {
66-
Map<String, V> instances = store.instanceData(definition);
88+
return doTransaction(
89+
t ->
90+
read(
91+
t.instanceData(definition),
92+
t.status(definition),
93+
t.tasks(instanceId),
94+
definition,
95+
instanceId));
96+
}
97+
98+
private Optional<WorkflowInstance> read(
99+
Map<String, V> instances,
100+
Map<String, S> status,
101+
Map<String, T> tasks,
102+
WorkflowDefinition definition,
103+
String instanceId) {
67104
return instances.containsKey(instanceId)
68105
? Optional.empty()
69106
: Optional.of(
70107
restore(
71-
definition,
72-
instanceId,
73-
instances.get(instanceId),
74-
store.tasks(instanceId),
75-
store.status(definition).get(instanceId)));
108+
definition, instanceId, instances.get(instanceId), tasks, status.get(instanceId)));
76109
}
77110

78111
public void close() {}

impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceStore.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,6 @@
1515
*/
1616
package io.serverlessworkflow.impl.persistence.bigmap;
1717

18-
import io.serverlessworkflow.impl.WorkflowDefinitionData;
19-
import java.util.Map;
20-
2118
public interface BigMapInstanceStore<K, V, T, S> extends AutoCloseable {
22-
23-
Map<K, V> instanceData(WorkflowDefinitionData definition);
24-
25-
Map<K, S> status(WorkflowDefinitionData workflowContext);
26-
27-
Map<String, T> tasks(K instanceId);
28-
29-
void cleanupTasks(K instanceId);
19+
BigMapInstanceTransaction<K, V, T, S> begin();
3020
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence.bigmap;
17+
18+
import io.serverlessworkflow.impl.WorkflowDefinitionData;
19+
import java.util.Map;
20+
21+
public interface BigMapInstanceTransaction<K, V, T, S> {
22+
23+
Map<K, V> instanceData(WorkflowDefinitionData definition);
24+
25+
Map<K, S> status(WorkflowDefinitionData workflowContext);
26+
27+
Map<String, T> tasks(K instanceId);
28+
29+
void cleanupTasks(K instanceId);
30+
31+
void commit();
32+
33+
void rollback();
34+
}

impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.serverlessworkflow.impl.WorkflowInstanceData;
2222
import io.serverlessworkflow.impl.WorkflowStatus;
2323
import io.serverlessworkflow.impl.persistence.PersistenceInstanceWriter;
24+
import java.util.function.Consumer;
2425

2526
public abstract class BigMapInstanceWriter<K, V, T, S> implements PersistenceInstanceWriter {
2627

@@ -30,11 +31,23 @@ protected BigMapInstanceWriter(BigMapInstanceStore<K, V, T, S> store) {
3031
this.store = store;
3132
}
3233

34+
private void doTransaction(Consumer<BigMapInstanceTransaction<K, V, T, S>> operations) {
35+
BigMapInstanceTransaction<K, V, T, S> transaction = store.begin();
36+
try {
37+
operations.accept(transaction);
38+
transaction.commit();
39+
} catch (Exception ex) {
40+
transaction.rollback();
41+
throw ex;
42+
}
43+
}
44+
3345
@Override
3446
public void started(WorkflowContextData workflowContext) {
35-
store
36-
.instanceData(workflowContext.definition())
37-
.put(key(workflowContext), marshallInstance(workflowContext.instanceData()));
47+
doTransaction(
48+
t ->
49+
t.instanceData(workflowContext.definition())
50+
.put(key(workflowContext), marshallInstance(workflowContext.instanceData())));
3851
}
3952

4053
@Override
@@ -57,33 +70,35 @@ public void taskStarted(WorkflowContextData workflowContext, TaskContextData tas
5770

5871
@Override
5972
public void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext) {
60-
K key = key(workflowContext);
61-
store
62-
.tasks(key)
63-
.put(
64-
taskContext.position().jsonPointer(),
65-
marshallTaskCompleted(workflowContext, (TaskContext) taskContext));
73+
doTransaction(
74+
t ->
75+
t.tasks(key(workflowContext))
76+
.put(
77+
taskContext.position().jsonPointer(),
78+
marshallTaskCompleted(workflowContext, (TaskContext) taskContext)));
6679
}
6780

6881
@Override
6982
public void suspended(WorkflowContextData workflowContext) {
70-
store
71-
.status(workflowContext.definition())
72-
.put(key(workflowContext), marshallStatus(WorkflowStatus.SUSPENDED));
83+
doTransaction(
84+
t ->
85+
t.status(workflowContext.definition())
86+
.put(key(workflowContext), marshallStatus(WorkflowStatus.SUSPENDED)));
7387
}
7488

7589
@Override
7690
public void resumed(WorkflowContextData workflowContext) {
77-
store
78-
.status(workflowContext.definition())
79-
.put(key(workflowContext), marshallStatus(WorkflowStatus.RUNNING));
91+
doTransaction(t -> t.status(workflowContext.definition()).remove(key(workflowContext)));
8092
}
8193

8294
protected void removeProcessInstance(WorkflowContextData workflowContext) {
83-
K key = key(workflowContext);
84-
store.instanceData(workflowContext.definition()).remove(key);
85-
store.status(workflowContext.definition()).remove(key);
86-
store.cleanupTasks(key);
95+
doTransaction(
96+
t -> {
97+
K key = key(workflowContext);
98+
t.instanceData(workflowContext.definition()).remove(key);
99+
t.status(workflowContext.definition()).remove(key);
100+
t.cleanupTasks(key);
101+
});
87102
}
88103

89104
protected abstract K key(WorkflowContextData workflowContext);

impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java

Lines changed: 7 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -15,61 +15,26 @@
1515
*/
1616
package io.serverlessworkflow.impl.persistence.mvstore;
1717

18-
import io.serverlessworkflow.api.types.Document;
19-
import io.serverlessworkflow.api.types.Workflow;
20-
import io.serverlessworkflow.impl.WorkflowDefinitionData;
2118
import io.serverlessworkflow.impl.persistence.bigmap.BigMapInstanceStore;
22-
import java.util.Map;
19+
import io.serverlessworkflow.impl.persistence.bigmap.BigMapInstanceTransaction;
2320
import org.h2.mvstore.MVStore;
21+
import org.h2.mvstore.tx.TransactionStore;
2422

2523
public class MVStorePersistenceStore
2624
implements BigMapInstanceStore<String, byte[], byte[], byte[]> {
27-
private final MVStore mvStore;
28-
29-
protected static final String ID_SEPARATOR = "-";
25+
private final TransactionStore mvStore;
3026

3127
public MVStorePersistenceStore(String dbName) {
32-
this.mvStore = MVStore.open(dbName);
33-
}
34-
35-
protected static String identifier(Workflow workflow, String sep) {
36-
Document document = workflow.getDocument();
37-
return document.getNamespace() + sep + document.getName() + sep + document.getVersion();
28+
this.mvStore = new TransactionStore(MVStore.open(dbName));
3829
}
3930

4031
@Override
4132
public void close() {
42-
if (!mvStore.isClosed()) {
43-
mvStore.close();
44-
}
45-
}
46-
47-
@Override
48-
public Map<String, byte[]> instanceData(WorkflowDefinitionData workflowContext) {
49-
return openMap(workflowContext, "instances");
50-
}
51-
52-
@Override
53-
public Map<String, byte[]> tasks(String instanceId) {
54-
return mvStore.openMap(mapTaskName(instanceId));
55-
}
56-
57-
@Override
58-
public Map<String, byte[]> status(WorkflowDefinitionData workflowContext) {
59-
return openMap(workflowContext, "status");
60-
}
61-
62-
private Map<String, byte[]> openMap(WorkflowDefinitionData workflowDefinition, String suffix) {
63-
return mvStore.openMap(
64-
identifier(workflowDefinition.workflow(), ID_SEPARATOR) + ID_SEPARATOR + suffix);
65-
}
66-
67-
private String mapTaskName(String instanceId) {
68-
return instanceId + ID_SEPARATOR + "tasks";
33+
mvStore.close();
6934
}
7035

7136
@Override
72-
public void cleanupTasks(String instanceId) {
73-
mvStore.removeMap(mapTaskName(instanceId));
37+
public BigMapInstanceTransaction<String, byte[], byte[], byte[]> begin() {
38+
return new MVStoreTransaction(mvStore.begin());
7439
}
7540
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence.mvstore;
17+
18+
import io.serverlessworkflow.api.types.Document;
19+
import io.serverlessworkflow.api.types.Workflow;
20+
import io.serverlessworkflow.impl.WorkflowDefinitionData;
21+
import io.serverlessworkflow.impl.persistence.bigmap.BigMapInstanceTransaction;
22+
import java.util.Map;
23+
import org.h2.mvstore.tx.Transaction;
24+
import org.h2.mvstore.tx.TransactionMap;
25+
26+
public class MVStoreTransaction
27+
implements BigMapInstanceTransaction<String, byte[], byte[], byte[]> {
28+
29+
protected static final String ID_SEPARATOR = "-";
30+
31+
private final Transaction transaction;
32+
33+
public MVStoreTransaction(Transaction transaction) {
34+
this.transaction = transaction;
35+
}
36+
37+
protected static String identifier(Workflow workflow, String sep) {
38+
Document document = workflow.getDocument();
39+
return document.getNamespace() + sep + document.getName() + sep + document.getVersion();
40+
}
41+
42+
@Override
43+
public Map<String, byte[]> instanceData(WorkflowDefinitionData workflowContext) {
44+
return openMap(workflowContext, "instances");
45+
}
46+
47+
@Override
48+
public Map<String, byte[]> tasks(String instanceId) {
49+
return taskMap(instanceId);
50+
}
51+
52+
@Override
53+
public Map<String, byte[]> status(WorkflowDefinitionData workflowContext) {
54+
return openMap(workflowContext, "status");
55+
}
56+
57+
@Override
58+
public void cleanupTasks(String instanceId) {
59+
transaction.removeMap(taskMap(instanceId));
60+
}
61+
62+
private TransactionMap<String, byte[]> taskMap(String instanceId) {
63+
return transaction.openMap(mapTaskName(instanceId));
64+
}
65+
66+
private Map<String, byte[]> openMap(WorkflowDefinitionData workflowDefinition, String suffix) {
67+
return transaction.openMap(
68+
identifier(workflowDefinition.workflow(), ID_SEPARATOR) + ID_SEPARATOR + suffix);
69+
}
70+
71+
private String mapTaskName(String instanceId) {
72+
return instanceId + ID_SEPARATOR + "tasks";
73+
}
74+
75+
@Override
76+
public void commit() {
77+
transaction.commit();
78+
}
79+
80+
@Override
81+
public void rollback() {
82+
transaction.rollback();
83+
}
84+
}

impl/test/db-samples/running.db

0 Bytes
Binary file not shown.

impl/test/db-samples/suspended.db

0 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)