Skip to content

Commit 3b260bc

Browse files
authored
Add an extension point to collect uncommitted read-set and write-set (#2271)
1 parent dd4f9a6 commit 3b260bc

File tree

10 files changed

+426
-11
lines changed

10 files changed

+426
-11
lines changed

core/src/main/java/com/scalar/db/common/error/CoreError.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -904,6 +904,12 @@ public enum CoreError implements ScalarDbError {
904904
Category.INTERNAL_ERROR, "0044", "The Upsert operation failed. Details: %s", "", ""),
905905
JDBC_TRANSACTION_UPDATE_OPERATION_FAILED(
906906
Category.INTERNAL_ERROR, "0045", "The Update operation failed. Details: %s", "", ""),
907+
HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED(
908+
Category.INTERNAL_ERROR,
909+
"0046",
910+
"Handling the before-preparation snapshot hook failed. Details: %s",
911+
"",
912+
""),
907913

908914
//
909915
// Errors for the unknown transaction status error category
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.scalar.db.transaction.consensuscommit;
2+
3+
import java.util.concurrent.Future;
4+
5+
public interface BeforePreparationSnapshotHook {
6+
Future<Void> handle(
7+
TransactionTableMetadataManager transactionTableMetadataManager,
8+
Snapshot.ReadWriteSets readWriteSets);
9+
}

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static com.google.common.base.Preconditions.checkNotNull;
44

55
import com.google.common.collect.ImmutableList;
6+
import com.google.errorprone.annotations.concurrent.LazyInit;
67
import com.scalar.db.api.DistributedStorage;
78
import com.scalar.db.api.TransactionState;
89
import com.scalar.db.common.error.CoreError;
@@ -22,6 +23,8 @@
2223
import java.util.ArrayList;
2324
import java.util.List;
2425
import java.util.Optional;
26+
import java.util.concurrent.Future;
27+
import javax.annotation.Nullable;
2528
import javax.annotation.concurrent.ThreadSafe;
2629
import org.slf4j.Logger;
2730
import org.slf4j.LoggerFactory;
@@ -34,6 +37,8 @@ public class CommitHandler {
3437
private final TransactionTableMetadataManager tableMetadataManager;
3538
private final ParallelExecutor parallelExecutor;
3639

40+
@LazyInit @Nullable private BeforePreparationSnapshotHook beforePreparationSnapshotHook;
41+
3742
@SuppressFBWarnings("EI_EXPOSE_REP2")
3843
public CommitHandler(
3944
DistributedStorage storage,
@@ -50,7 +55,52 @@ protected void onPrepareFailure(Snapshot snapshot) {}
5055

5156
protected void onValidateFailure(Snapshot snapshot) {}
5257

58+
private Optional<Future<Void>> invokeBeforePreparationSnapshotHook(Snapshot snapshot)
59+
throws UnknownTransactionStatusException, CommitException {
60+
if (beforePreparationSnapshotHook == null) {
61+
return Optional.empty();
62+
}
63+
64+
try {
65+
return Optional.of(
66+
beforePreparationSnapshotHook.handle(tableMetadataManager, snapshot.getReadWriteSets()));
67+
} catch (Exception e) {
68+
abortState(snapshot.getId());
69+
rollbackRecords(snapshot);
70+
// TODO: This method is actually a part of preparation phase. But the callback method name
71+
// `onPrepareFailure()` should be renamed to more reasonable one.
72+
onPrepareFailure(snapshot);
73+
throw new CommitException(
74+
CoreError.HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED.buildMessage(e.getMessage()),
75+
e,
76+
snapshot.getId());
77+
}
78+
}
79+
80+
private void waitBeforePreparationSnapshotHookFuture(
81+
Snapshot snapshot, @Nullable Future<Void> snapshotHookFuture)
82+
throws UnknownTransactionStatusException, CommitException {
83+
if (snapshotHookFuture == null) {
84+
return;
85+
}
86+
87+
try {
88+
snapshotHookFuture.get();
89+
} catch (Exception e) {
90+
abortState(snapshot.getId());
91+
rollbackRecords(snapshot);
92+
// TODO: This method is actually a part of validation phase. But the callback method name
93+
// `onValidateFailure()` should be renamed to more reasonable one.
94+
onValidateFailure(snapshot);
95+
throw new CommitException(
96+
CoreError.HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED.buildMessage(e.getMessage()),
97+
e,
98+
snapshot.getId());
99+
}
100+
}
101+
53102
public void commit(Snapshot snapshot) throws CommitException, UnknownTransactionStatusException {
103+
Optional<Future<Void>> snapshotHookFuture = invokeBeforePreparationSnapshotHook(snapshot);
54104
try {
55105
prepare(snapshot);
56106
} catch (PreparationException e) {
@@ -79,6 +129,8 @@ public void commit(Snapshot snapshot) throws CommitException, UnknownTransaction
79129
throw e;
80130
}
81131

132+
waitBeforePreparationSnapshotHookFuture(snapshot, snapshotHookFuture.orElse(null));
133+
82134
commitState(snapshot);
83135
commitRecords(snapshot);
84136
}
@@ -234,4 +286,16 @@ public void rollbackRecords(Snapshot snapshot) {
234286
// ignore since records are recovered lazily
235287
}
236288
}
289+
290+
/**
291+
* Sets the {@link BeforePreparationSnapshotHook}. This method must be called immediately after
292+
* the constructor is invoked.
293+
*
294+
* @param beforePreparationSnapshotHook The snapshot hook to set.
295+
* @throws NullPointerException If the argument is null.
296+
*/
297+
public void setBeforePreparationSnapshotHook(
298+
BeforePreparationSnapshotHook beforePreparationSnapshotHook) {
299+
this.beforePreparationSnapshotHook = checkNotNull(beforePreparationSnapshotHook);
300+
}
237301
}

core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class ConsensusCommitManager extends ActiveTransactionManagedDistributedT
5050
private final Coordinator coordinator;
5151
private final ParallelExecutor parallelExecutor;
5252
private final RecoveryHandler recovery;
53-
private final CommitHandler commit;
53+
protected final CommitHandler commit;
5454
private final boolean isIncludeMetadataEnabled;
5555
private final ConsensusCommitMutationOperationChecker mutationOperationChecker;
5656
@Nullable private final CoordinatorGroupCommitter groupCommitter;
@@ -75,7 +75,7 @@ public ConsensusCommitManager(
7575
mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager);
7676
}
7777

78-
ConsensusCommitManager(DatabaseConfig databaseConfig) {
78+
protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
7979
super(databaseConfig);
8080
StorageFactory storageFactory = StorageFactory.create(databaseConfig.getProperties());
8181
storage = storageFactory.getStorage();

core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitUtils.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Map;
2020
import java.util.Set;
2121
import java.util.stream.Collectors;
22+
import javax.annotation.Nullable;
2223

2324
public final class ConsensusCommitUtils {
2425

@@ -299,4 +300,18 @@ static String convertUnsatisfiedConditionExceptionMessageForUpdate(
299300
}
300301
return message;
301302
}
303+
304+
/**
305+
* Returns the next `tx_version` based on the current value.
306+
*
307+
* @param currentTxVersion The current `tx_version`, if it exists, or null otherwise.
308+
* @return The next `tx_version`.
309+
*/
310+
public static int getNextTxVersion(@Nullable Integer currentTxVersion) {
311+
if (currentTxVersion == null) {
312+
return 1;
313+
} else {
314+
return currentTxVersion + 1;
315+
}
316+
}
302317
}

core/src/main/java/com/scalar/db/transaction/consensuscommit/PrepareMutationComposer.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static com.scalar.db.transaction.consensuscommit.Attribute.ID;
44
import static com.scalar.db.transaction.consensuscommit.Attribute.VERSION;
5+
import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.getNextTxVersion;
56

67
import com.google.common.annotations.VisibleForTesting;
78
import com.scalar.db.api.ConditionBuilder;
@@ -66,7 +67,7 @@ private void add(Put base, @Nullable TransactionResult result) throws ExecutionE
6667
if (!base.isInsertModeEnabled() && result != null) { // overwrite existing record
6768
createBeforeColumns(base, result).forEach(putBuilder::value);
6869
int version = result.getVersion();
69-
putBuilder.intValue(Attribute.VERSION, version + 1);
70+
putBuilder.intValue(Attribute.VERSION, getNextTxVersion(version));
7071

7172
// check if the record is not interrupted by other conflicting transactions
7273
if (result.isDeemedAsCommitted()) {
@@ -82,7 +83,7 @@ private void add(Put base, @Nullable TransactionResult result) throws ExecutionE
8283
.build());
8384
}
8485
} else { // initial record or insert mode enabled
85-
putBuilder.intValue(Attribute.VERSION, 1);
86+
putBuilder.intValue(Attribute.VERSION, getNextTxVersion(null));
8687

8788
// check if the record is not created by other conflicting transactions
8889
putBuilder.condition(ConditionBuilder.putIfNotExists());
@@ -107,7 +108,7 @@ private void add(Delete base, @Nullable TransactionResult result) throws Executi
107108
if (result != null) {
108109
createBeforeColumns(base, result).forEach(putBuilder::value);
109110
int version = result.getVersion();
110-
putBuilder.intValue(Attribute.VERSION, version + 1);
111+
putBuilder.intValue(Attribute.VERSION, getNextTxVersion(version));
111112

112113
// check if the record is not interrupted by other conflicting transactions
113114
if (result.isDeemedAsCommitted()) {
@@ -122,7 +123,7 @@ private void add(Delete base, @Nullable TransactionResult result) throws Executi
122123
.build());
123124
}
124125
} else {
125-
putBuilder.intValue(Attribute.VERSION, 1);
126+
putBuilder.intValue(Attribute.VERSION, getNextTxVersion(null));
126127

127128
// check if the record is not created by other conflicting transactions
128129
putBuilder.condition(ConditionBuilder.putIfNotExists());

core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.scalar.db.util.ScalarDbUtils;
2727
import java.io.IOException;
2828
import java.util.ArrayList;
29+
import java.util.Collection;
2930
import java.util.Comparator;
3031
import java.util.HashMap;
3132
import java.util.HashSet;
@@ -161,6 +162,10 @@ public List<Delete> getDeletesInDeleteSet() {
161162
return new ArrayList<>(deleteSet.values());
162163
}
163164

165+
public ReadWriteSets getReadWriteSets() {
166+
return new ReadWriteSets(id, readSet, writeSet.entrySet(), deleteSet.entrySet());
167+
}
168+
164169
public Optional<TransactionResult> mergeResult(Key key, Optional<TransactionResult> result)
165170
throws CrudException {
166171
if (deleteSet.containsKey(key)) {
@@ -653,4 +658,38 @@ public String toString() {
653658
.toString();
654659
}
655660
}
661+
662+
public static class ReadWriteSets {
663+
public final String transactionId;
664+
public final Map<Key, Optional<TransactionResult>> readSetMap;
665+
public final List<Entry<Key, Put>> writeSet;
666+
public final List<Entry<Key, Delete>> deleteSet;
667+
668+
public ReadWriteSets(
669+
String transactionId,
670+
Map<Key, Optional<TransactionResult>> readSetMap,
671+
Collection<Entry<Key, Put>> writeSet,
672+
Collection<Entry<Key, Delete>> deleteSet) {
673+
this.transactionId = transactionId;
674+
this.readSetMap = new HashMap<>(readSetMap);
675+
this.writeSet = new ArrayList<>(writeSet);
676+
this.deleteSet = new ArrayList<>(deleteSet);
677+
}
678+
679+
@Override
680+
public boolean equals(Object o) {
681+
if (this == o) return true;
682+
if (!(o instanceof ReadWriteSets)) return false;
683+
ReadWriteSets that = (ReadWriteSets) o;
684+
return Objects.equals(transactionId, that.transactionId)
685+
&& Objects.equals(readSetMap, that.readSetMap)
686+
&& Objects.equals(writeSet, that.writeSet)
687+
&& Objects.equals(deleteSet, that.deleteSet);
688+
}
689+
690+
@Override
691+
public int hashCode() {
692+
return Objects.hash(transactionId, readSetMap, writeSet, deleteSet);
693+
}
694+
}
656695
}

0 commit comments

Comments
 (0)