Skip to content

Commit aa4905f

Browse files
committed
Add further optimizations for Consensus Commit (#2764)
1 parent 1747d5e commit aa4905f

File tree

11 files changed

+1238
-182
lines changed

11 files changed

+1238
-182
lines changed

core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public DistributedTransaction begin() {
144144

145145
@Override
146146
public DistributedTransaction begin(String txId) {
147-
return begin(txId, config.getIsolation(), false);
147+
return begin(txId, config.getIsolation(), false, false);
148148
}
149149

150150
@Override
@@ -155,7 +155,7 @@ public DistributedTransaction beginReadOnly() {
155155

156156
@Override
157157
public DistributedTransaction beginReadOnly(String txId) {
158-
return begin(txId, config.getIsolation(), true);
158+
return begin(txId, config.getIsolation(), true, false);
159159
}
160160

161161
/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
@@ -169,7 +169,7 @@ public DistributedTransaction start(com.scalar.db.api.Isolation isolation) {
169169
@Deprecated
170170
@Override
171171
public DistributedTransaction start(String txId, com.scalar.db.api.Isolation isolation) {
172-
return begin(txId, Isolation.valueOf(isolation.name()), false);
172+
return begin(txId, Isolation.valueOf(isolation.name()), false, false);
173173
}
174174

175175
/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
@@ -192,7 +192,7 @@ public DistributedTransaction start(com.scalar.db.api.SerializableStrategy strat
192192
@Override
193193
public DistributedTransaction start(
194194
String txId, com.scalar.db.api.SerializableStrategy strategy) {
195-
return begin(txId, Isolation.SERIALIZABLE, false);
195+
return begin(txId, Isolation.SERIALIZABLE, false, false);
196196
}
197197

198198
/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
@@ -202,23 +202,24 @@ public DistributedTransaction start(
202202
String txId,
203203
com.scalar.db.api.Isolation isolation,
204204
com.scalar.db.api.SerializableStrategy strategy) {
205-
return begin(txId, Isolation.valueOf(isolation.name()), false);
205+
return begin(txId, Isolation.valueOf(isolation.name()), false, false);
206206
}
207207

208208
@VisibleForTesting
209209
DistributedTransaction begin(Isolation isolation) {
210210
String txId = UUID.randomUUID().toString();
211-
return begin(txId, isolation, false);
211+
return begin(txId, isolation, false, false);
212212
}
213213

214214
@VisibleForTesting
215215
DistributedTransaction beginReadOnly(Isolation isolation) {
216216
String txId = UUID.randomUUID().toString();
217-
return begin(txId, isolation, true);
217+
return begin(txId, isolation, true, false);
218218
}
219219

220220
@VisibleForTesting
221-
DistributedTransaction begin(String txId, Isolation isolation, boolean readOnly) {
221+
DistributedTransaction begin(
222+
String txId, Isolation isolation, boolean readOnly, boolean oneOperation) {
222223
checkArgument(!Strings.isNullOrEmpty(txId));
223224
checkNotNull(isolation);
224225
if (isGroupCommitEnabled()) {
@@ -238,7 +239,8 @@ DistributedTransaction begin(String txId, Isolation isolation, boolean readOnly)
238239
tableMetadataManager,
239240
isIncludeMetadataEnabled,
240241
parallelExecutor,
241-
readOnly);
242+
readOnly,
243+
oneOperation);
242244
DistributedTransaction transaction =
243245
new ConsensusCommit(crud, commit, recovery, mutationOperationChecker, groupCommitter);
244246
if (readOnly) {
@@ -249,6 +251,11 @@ DistributedTransaction begin(String txId, Isolation isolation, boolean readOnly)
249251
return transaction;
250252
}
251253

254+
private DistributedTransaction beginOneOperation(boolean readOnly) {
255+
String txId = UUID.randomUUID().toString();
256+
return begin(txId, config.getIsolation(), readOnly, true);
257+
}
258+
252259
@Override
253260
public Optional<Result> get(Get get) throws CrudException, UnknownTransactionStatusException {
254261
return executeTransaction(t -> t.get(copyAndSetTargetToIfNot(get)), true);
@@ -261,7 +268,7 @@ public List<Result> scan(Scan scan) throws CrudException, UnknownTransactionStat
261268

262269
@Override
263270
public Scanner getScanner(Scan scan) throws CrudException {
264-
DistributedTransaction transaction = beginReadOnly();
271+
DistributedTransaction transaction = beginOneOperation(true);
265272

266273
TransactionCrudOperable.Scanner scanner;
267274
try {
@@ -431,12 +438,7 @@ private <R> R executeTransaction(
431438
ThrowableFunction<DistributedTransaction, R, TransactionException> throwableFunction,
432439
boolean readOnly)
433440
throws CrudException, UnknownTransactionStatusException {
434-
DistributedTransaction transaction;
435-
if (readOnly) {
436-
transaction = beginReadOnly();
437-
} else {
438-
transaction = begin();
439-
}
441+
DistributedTransaction transaction = beginOneOperation(readOnly);
440442

441443
try {
442444
R result = throwableFunction.apply(transaction);

core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java

Lines changed: 77 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ public class CrudHandler {
5151
// Whether the transaction is in read-only mode or not.
5252
private final boolean readOnly;
5353

54+
// Whether the transaction is in one-operation mode or not. One-operation mode refers to executing
55+
// a CRUD operation directly through `DistributedTransactionManager` without explicitly beginning
56+
// a transaction.
57+
private final boolean oneOperation;
58+
5459
private final List<ConsensusCommitScanner> scanners = new ArrayList<>();
5560

5661
@SuppressFBWarnings("EI_EXPOSE_REP2")
@@ -60,14 +65,16 @@ public CrudHandler(
6065
TransactionTableMetadataManager tableMetadataManager,
6166
boolean isIncludeMetadataEnabled,
6267
ParallelExecutor parallelExecutor,
63-
boolean readOnly) {
68+
boolean readOnly,
69+
boolean oneOperation) {
6470
this.storage = checkNotNull(storage);
6571
this.snapshot = checkNotNull(snapshot);
6672
this.tableMetadataManager = tableMetadataManager;
6773
this.isIncludeMetadataEnabled = isIncludeMetadataEnabled;
6874
this.mutationConditionsValidator = new MutationConditionsValidator(snapshot.getId());
6975
this.parallelExecutor = parallelExecutor;
7076
this.readOnly = readOnly;
77+
this.oneOperation = oneOperation;
7178
}
7279

7380
@VisibleForTesting
@@ -78,14 +85,16 @@ public CrudHandler(
7885
boolean isIncludeMetadataEnabled,
7986
MutationConditionsValidator mutationConditionsValidator,
8087
ParallelExecutor parallelExecutor,
81-
boolean readOnly) {
88+
boolean readOnly,
89+
boolean oneOperation) {
8290
this.storage = checkNotNull(storage);
8391
this.snapshot = checkNotNull(snapshot);
8492
this.tableMetadataManager = tableMetadataManager;
8593
this.isIncludeMetadataEnabled = isIncludeMetadataEnabled;
8694
this.mutationConditionsValidator = mutationConditionsValidator;
8795
this.parallelExecutor = parallelExecutor;
8896
this.readOnly = readOnly;
97+
this.oneOperation = oneOperation;
8998
}
9099

91100
public Optional<Result> get(Get originalGet) throws CrudException {
@@ -102,11 +111,17 @@ public Optional<Result> get(Get originalGet) throws CrudException {
102111
key = new Snapshot.Key(get);
103112
}
104113

105-
readUnread(key, get);
106-
107-
return snapshot
108-
.getResult(key, get)
109-
.map(r -> new FilteredResult(r, originalProjections, metadata, isIncludeMetadataEnabled));
114+
if (isSnapshotReadRequired()) {
115+
readUnread(key, get);
116+
return snapshot
117+
.getResult(key, get)
118+
.map(r -> new FilteredResult(r, originalProjections, metadata, isIncludeMetadataEnabled));
119+
} else {
120+
Optional<TransactionResult> result = read(key, get);
121+
return snapshot
122+
.mergeResult(key, result, get.getConjunctions())
123+
.map(r -> new FilteredResult(r, originalProjections, metadata, isIncludeMetadataEnabled));
124+
}
110125
}
111126

112127
// Only for a Get with index, the argument `key` is null
@@ -120,7 +135,7 @@ void readUnread(@Nullable Snapshot.Key key, Get get) throws CrudException {
120135
// Although this class is not thread-safe, this method is actually thread-safe, so we call it
121136
// concurrently in the implicit pre-read
122137
@VisibleForTesting
123-
void read(@Nullable Snapshot.Key key, Get get) throws CrudException {
138+
Optional<TransactionResult> read(@Nullable Snapshot.Key key, Get get) throws CrudException {
124139
Optional<TransactionResult> result = getFromStorage(get);
125140
if (!result.isPresent() || result.get().isCommitted()) {
126141
if (result.isPresent() || get.getConjunctions().isEmpty()) {
@@ -142,8 +157,8 @@ void read(@Nullable Snapshot.Key key, Get get) throws CrudException {
142157
}
143158
}
144159
}
145-
snapshot.putIntoGetSet(get, result);
146-
return;
160+
putIntoGetSetInSnapshot(get, result);
161+
return result;
147162
}
148163
throw new UncommittedRecordException(
149164
get,
@@ -204,7 +219,7 @@ private LinkedHashMap<Snapshot.Key, TransactionResult> scanInternal(Scan scan)
204219
}
205220
}
206221

207-
snapshot.putIntoScanSet(scan, results);
222+
putIntoScanSetInSnapshot(scan, results);
208223

209224
return results;
210225
}
@@ -263,9 +278,43 @@ private void putIntoReadSetInSnapshot(Snapshot.Key key, Optional<TransactionResu
263278
}
264279
}
265280

281+
private boolean isSnapshotReadRequired() {
282+
// In one-operation mode, we don't need snapshot read
283+
return !oneOperation;
284+
}
285+
286+
private boolean isValidationOrSnapshotReadRequired() {
287+
return snapshot.isValidationRequired() || isSnapshotReadRequired();
288+
}
289+
290+
private void putIntoGetSetInSnapshot(Get get, Optional<TransactionResult> result) {
291+
// If neither validation nor snapshot read is required, we don't need to put the result into
292+
// the get set
293+
if (isValidationOrSnapshotReadRequired()) {
294+
snapshot.putIntoGetSet(get, result);
295+
}
296+
}
297+
298+
private void putIntoScanSetInSnapshot(
299+
Scan scan, LinkedHashMap<Snapshot.Key, TransactionResult> results) {
300+
// If neither validation nor snapshot read is required, we don't need to put the results into
301+
// the scan set
302+
if (isValidationOrSnapshotReadRequired()) {
303+
snapshot.putIntoScanSet(scan, results);
304+
}
305+
}
306+
307+
private void putIntoScannerSetInSnapshot(
308+
Scan scan, LinkedHashMap<Snapshot.Key, TransactionResult> results) {
309+
// if validation is not required, we don't need to put the results into the scanner set
310+
if (snapshot.isValidationRequired()) {
311+
snapshot.putIntoScannerSet(scan, results);
312+
}
313+
}
314+
266315
private void verifyNoOverlap(Scan scan, Map<Snapshot.Key, TransactionResult> results) {
267-
// In read-only mode, we don't need to verify the overlap
268-
if (!readOnly) {
316+
// In either read-only mode or one-operation mode, we don't need to verify the overlap
317+
if (!readOnly && !oneOperation) {
269318
snapshot.verifyNoOverlap(scan, results);
270319
}
271320
}
@@ -432,7 +481,7 @@ private class ConsensusCommitStorageScanner extends AbstractTransactionCrudOpera
432481
private final List<String> originalProjections;
433482
private final Scanner scanner;
434483

435-
private final LinkedHashMap<Snapshot.Key, TransactionResult> results = new LinkedHashMap<>();
484+
@Nullable private final LinkedHashMap<Snapshot.Key, TransactionResult> results;
436485
private final AtomicBoolean fullyScanned = new AtomicBoolean();
437486
private final AtomicBoolean closed = new AtomicBoolean();
438487

@@ -441,6 +490,14 @@ public ConsensusCommitStorageScanner(Scan scan, List<String> originalProjections
441490
this.scan = scan;
442491
this.originalProjections = originalProjections;
443492
scanner = scanFromStorage(scan);
493+
494+
if (isValidationOrSnapshotReadRequired()) {
495+
results = new LinkedHashMap<>();
496+
} else {
497+
// If neither validation nor snapshot read is required, we don't need to put the results
498+
// into the scan set
499+
results = null;
500+
}
444501
}
445502

446503
@Override
@@ -456,7 +513,10 @@ public Optional<Result> one() throws CrudException {
456513
Snapshot.Key key = new Snapshot.Key(scan, r.get());
457514
TransactionResult result = new TransactionResult(r.get());
458515
processScanResult(key, scan, result);
459-
results.put(key, result);
516+
517+
if (results != null) {
518+
results.put(key, result);
519+
}
460520

461521
TableMetadata metadata = getTableMetadata(scan);
462522
return Optional.of(
@@ -499,10 +559,10 @@ public void close() {
499559
if (fullyScanned.get()) {
500560
// If the scanner is fully scanned, we can treat it as a normal scan, and put the results
501561
// into the scan set
502-
snapshot.putIntoScanSet(scan, results);
562+
putIntoScanSetInSnapshot(scan, results);
503563
} else {
504564
// If the scanner is not fully scanned, put the results into the scanner set
505-
snapshot.putIntoScannerSet(scan, results);
565+
putIntoScannerSetInSnapshot(scan, results);
506566
}
507567

508568
verifyNoOverlap(scan, results);

core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -249,18 +249,22 @@ private Optional<TransactionResult> mergeResult(Key key, Optional<TransactionRes
249249
}
250250
}
251251

252-
private Optional<TransactionResult> mergeResult(
252+
public Optional<TransactionResult> mergeResult(
253253
Key key, Optional<TransactionResult> result, Set<Conjunction> conjunctions)
254254
throws CrudException {
255-
return mergeResult(key, result)
256-
.filter(
257-
r ->
258-
// We need to apply conditions if it is a merged result because the transaction’s
259-
// write makes the record no longer match the conditions. Of course, we can just
260-
// return the result without checking the condition if there is no condition.
261-
!r.isMergedResult()
262-
|| conjunctions.isEmpty()
263-
|| ScalarDbUtils.columnsMatchAnyOfConjunctions(r.getColumns(), conjunctions));
255+
Optional<TransactionResult> ret = mergeResult(key, result);
256+
257+
if (conjunctions.isEmpty()) {
258+
// We can just return the result without checking the condition if there is no condition.
259+
return ret;
260+
}
261+
262+
return ret.filter(
263+
r ->
264+
// We need to apply conditions if it is a merged result because the transaction’s write
265+
// makes the record no longer match the conditions.
266+
!r.isMergedResult()
267+
|| ScalarDbUtils.columnsMatchAnyOfConjunctions(r.getColumns(), conjunctions));
264268
}
265269

266270
private TableMetadata getTableMetadata(Key key) throws CrudException {

0 commit comments

Comments
 (0)