Skip to content

Commit fd2e415

Browse files
committed
Further optimizations for Consensus Commit
1 parent 30057d8 commit fd2e415

File tree

11 files changed

+1234
-174
lines changed

11 files changed

+1234
-174
lines changed

core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public DistributedTransaction begin() {
144144

145145
@Override
146146
public DistributedTransaction begin(String txId) {
147-
return begin(txId, config.getIsolation(), false);
147+
return begin(txId, config.getIsolation(), false, false);
148148
}
149149

150150
@Override
@@ -155,7 +155,7 @@ public DistributedTransaction beginReadOnly() {
155155

156156
@Override
157157
public DistributedTransaction beginReadOnly(String txId) {
158-
return begin(txId, config.getIsolation(), true);
158+
return begin(txId, config.getIsolation(), true, false);
159159
}
160160

161161
/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
@@ -169,7 +169,7 @@ public DistributedTransaction start(com.scalar.db.api.Isolation isolation) {
169169
@Deprecated
170170
@Override
171171
public DistributedTransaction start(String txId, com.scalar.db.api.Isolation isolation) {
172-
return begin(txId, Isolation.valueOf(isolation.name()), false);
172+
return begin(txId, Isolation.valueOf(isolation.name()), false, false);
173173
}
174174

175175
/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
@@ -192,7 +192,7 @@ public DistributedTransaction start(com.scalar.db.api.SerializableStrategy strat
192192
@Override
193193
public DistributedTransaction start(
194194
String txId, com.scalar.db.api.SerializableStrategy strategy) {
195-
return begin(txId, Isolation.SERIALIZABLE, false);
195+
return begin(txId, Isolation.SERIALIZABLE, false, false);
196196
}
197197

198198
/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
@@ -202,23 +202,24 @@ public DistributedTransaction start(
202202
String txId,
203203
com.scalar.db.api.Isolation isolation,
204204
com.scalar.db.api.SerializableStrategy strategy) {
205-
return begin(txId, Isolation.valueOf(isolation.name()), false);
205+
return begin(txId, Isolation.valueOf(isolation.name()), false, false);
206206
}
207207

208208
@VisibleForTesting
209209
DistributedTransaction begin(Isolation isolation) {
210210
String txId = UUID.randomUUID().toString();
211-
return begin(txId, isolation, false);
211+
return begin(txId, isolation, false, false);
212212
}
213213

214214
@VisibleForTesting
215215
DistributedTransaction beginReadOnly(Isolation isolation) {
216216
String txId = UUID.randomUUID().toString();
217-
return begin(txId, isolation, true);
217+
return begin(txId, isolation, true, false);
218218
}
219219

220220
@VisibleForTesting
221-
DistributedTransaction begin(String txId, Isolation isolation, boolean readOnly) {
221+
DistributedTransaction begin(
222+
String txId, Isolation isolation, boolean readOnly, boolean singleOperation) {
222223
checkArgument(!Strings.isNullOrEmpty(txId));
223224
checkNotNull(isolation);
224225
if (isGroupCommitEnabled()) {
@@ -238,7 +239,8 @@ DistributedTransaction begin(String txId, Isolation isolation, boolean readOnly)
238239
tableMetadataManager,
239240
isIncludeMetadataEnabled,
240241
parallelExecutor,
241-
readOnly);
242+
readOnly,
243+
singleOperation);
242244
DistributedTransaction transaction =
243245
new ConsensusCommit(crud, commit, recovery, mutationOperationChecker, groupCommitter);
244246
if (readOnly) {
@@ -249,6 +251,11 @@ DistributedTransaction begin(String txId, Isolation isolation, boolean readOnly)
249251
return transaction;
250252
}
251253

254+
private DistributedTransaction beginSingleOperation(boolean readOnly) {
255+
String txId = UUID.randomUUID().toString();
256+
return begin(txId, config.getIsolation(), readOnly, true);
257+
}
258+
252259
@Override
253260
public Optional<Result> get(Get get) throws CrudException, UnknownTransactionStatusException {
254261
return executeTransaction(t -> t.get(copyAndSetTargetToIfNot(get)), true);
@@ -261,7 +268,7 @@ public List<Result> scan(Scan scan) throws CrudException, UnknownTransactionStat
261268

262269
@Override
263270
public Scanner getScanner(Scan scan) throws CrudException {
264-
DistributedTransaction transaction = beginReadOnly();
271+
DistributedTransaction transaction = beginSingleOperation(true);
265272

266273
TransactionCrudOperable.Scanner scanner;
267274
try {
@@ -431,12 +438,7 @@ private <R> R executeTransaction(
431438
ThrowableFunction<DistributedTransaction, R, TransactionException> throwableFunction,
432439
boolean readOnly)
433440
throws CrudException, UnknownTransactionStatusException {
434-
DistributedTransaction transaction;
435-
if (readOnly) {
436-
transaction = beginReadOnly();
437-
} else {
438-
transaction = begin();
439-
}
441+
DistributedTransaction transaction = beginSingleOperation(readOnly);
440442

441443
try {
442444
R result = throwableFunction.apply(transaction);

core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java

Lines changed: 87 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ public class CrudHandler {
5151
// Whether the transaction is in read-only mode or not.
5252
private final boolean readOnly;
5353

54+
// Whether the transaction is in single-operation mode or not. Single-operation mode refers to
55+
// executing a CRUD operation directly through `DistributedTransactionManager` without explicitly
56+
// beginning a transaction.
57+
private final boolean singleOperation;
58+
5459
private final List<ConsensusCommitScanner> scanners = new ArrayList<>();
5560

5661
@SuppressFBWarnings("EI_EXPOSE_REP2")
@@ -60,14 +65,16 @@ public CrudHandler(
6065
TransactionTableMetadataManager tableMetadataManager,
6166
boolean isIncludeMetadataEnabled,
6267
ParallelExecutor parallelExecutor,
63-
boolean readOnly) {
68+
boolean readOnly,
69+
boolean singleOperation) {
6470
this.storage = checkNotNull(storage);
6571
this.snapshot = checkNotNull(snapshot);
6672
this.tableMetadataManager = tableMetadataManager;
6773
this.isIncludeMetadataEnabled = isIncludeMetadataEnabled;
6874
this.mutationConditionsValidator = new MutationConditionsValidator(snapshot.getId());
6975
this.parallelExecutor = parallelExecutor;
7076
this.readOnly = readOnly;
77+
this.singleOperation = singleOperation;
7178
}
7279

7380
@VisibleForTesting
@@ -78,14 +85,16 @@ public CrudHandler(
7885
boolean isIncludeMetadataEnabled,
7986
MutationConditionsValidator mutationConditionsValidator,
8087
ParallelExecutor parallelExecutor,
81-
boolean readOnly) {
88+
boolean readOnly,
89+
boolean singleOperation) {
8290
this.storage = checkNotNull(storage);
8391
this.snapshot = checkNotNull(snapshot);
8492
this.tableMetadataManager = tableMetadataManager;
8593
this.isIncludeMetadataEnabled = isIncludeMetadataEnabled;
8694
this.mutationConditionsValidator = mutationConditionsValidator;
8795
this.parallelExecutor = parallelExecutor;
8896
this.readOnly = readOnly;
97+
this.singleOperation = singleOperation;
8998
}
9099

91100
public Optional<Result> get(Get originalGet) throws CrudException {
@@ -102,11 +111,27 @@ public Optional<Result> get(Get originalGet) throws CrudException {
102111
key = new Snapshot.Key(get);
103112
}
104113

105-
readUnread(key, get);
106-
107-
return snapshot
108-
.getResult(key, get)
109-
.map(r -> new FilteredResult(r, originalProjections, metadata, isIncludeMetadataEnabled));
114+
if (isSnapshotReadRequired()) {
115+
readUnread(key, get);
116+
return snapshot
117+
.getResult(key, get)
118+
.map(r -> new FilteredResult(r, originalProjections, metadata, isIncludeMetadataEnabled));
119+
} else {
120+
Optional<TransactionResult> result = read(key, get);
121+
if (get.getConjunctions().isEmpty()) {
122+
return snapshot
123+
.mergeResult(key, result)
124+
.map(
125+
r ->
126+
new FilteredResult(r, originalProjections, metadata, isIncludeMetadataEnabled));
127+
} else {
128+
return snapshot
129+
.mergeResult(key, result, get.getConjunctions())
130+
.map(
131+
r ->
132+
new FilteredResult(r, originalProjections, metadata, isIncludeMetadataEnabled));
133+
}
134+
}
110135
}
111136

112137
// Only for a Get with index, the argument `key` is null
@@ -120,7 +145,7 @@ void readUnread(@Nullable Snapshot.Key key, Get get) throws CrudException {
120145
// Although this class is not thread-safe, this method is actually thread-safe, so we call it
121146
// concurrently in the implicit pre-read
122147
@VisibleForTesting
123-
void read(@Nullable Snapshot.Key key, Get get) throws CrudException {
148+
Optional<TransactionResult> read(@Nullable Snapshot.Key key, Get get) throws CrudException {
124149
Optional<TransactionResult> result = getFromStorage(get);
125150
if (!result.isPresent() || result.get().isCommitted()) {
126151
if (result.isPresent() || get.getConjunctions().isEmpty()) {
@@ -142,8 +167,8 @@ void read(@Nullable Snapshot.Key key, Get get) throws CrudException {
142167
}
143168
}
144169
}
145-
snapshot.putIntoGetSet(get, result);
146-
return;
170+
putIntoGetSetInSnapshot(get, result);
171+
return result;
147172
}
148173
throw new UncommittedRecordException(
149174
get,
@@ -204,7 +229,7 @@ private LinkedHashMap<Snapshot.Key, TransactionResult> scanInternal(Scan scan)
204229
}
205230
}
206231

207-
snapshot.putIntoScanSet(scan, results);
232+
putIntoScanSetInSnapshot(scan, results);
208233

209234
return results;
210235
}
@@ -263,9 +288,43 @@ private void putIntoReadSetInSnapshot(Snapshot.Key key, Optional<TransactionResu
263288
}
264289
}
265290

291+
private boolean isSnapshotReadRequired() {
292+
// In single-operation mode, we don't need snapshot read
293+
return !singleOperation;
294+
}
295+
296+
private boolean isValidationOrSnapshotReadRequired() {
297+
return snapshot.isValidationRequired() || isSnapshotReadRequired();
298+
}
299+
300+
private void putIntoGetSetInSnapshot(Get get, Optional<TransactionResult> result) {
301+
// If neither validation nor snapshot read is required, we don't need to put the result into
302+
// the get set
303+
if (isValidationOrSnapshotReadRequired()) {
304+
snapshot.putIntoGetSet(get, result);
305+
}
306+
}
307+
308+
private void putIntoScanSetInSnapshot(
309+
Scan scan, LinkedHashMap<Snapshot.Key, TransactionResult> results) {
310+
// If neither validation nor snapshot read is required, we don't need to put the results into
311+
// the scan set
312+
if (isValidationOrSnapshotReadRequired()) {
313+
snapshot.putIntoScanSet(scan, results);
314+
}
315+
}
316+
317+
private void putIntoScannerSetInSnapshot(
318+
Scan scan, LinkedHashMap<Snapshot.Key, TransactionResult> results) {
319+
// if validation is not required, we don't need to put the results into the scanner set
320+
if (snapshot.isValidationRequired()) {
321+
snapshot.putIntoScannerSet(scan, results);
322+
}
323+
}
324+
266325
private void verifyNoOverlap(Scan scan, Map<Snapshot.Key, TransactionResult> results) {
267-
// In read-only mode, we don't need to verify the overlap
268-
if (!readOnly) {
326+
// In either read-only mode or single-operation mode, we don't need to verify the overlap
327+
if (!readOnly && !singleOperation) {
269328
snapshot.verifyNoOverlap(scan, results);
270329
}
271330
}
@@ -432,7 +491,7 @@ private class ConsensusCommitStorageScanner extends AbstractTransactionCrudOpera
432491
private final List<String> originalProjections;
433492
private final Scanner scanner;
434493

435-
private final LinkedHashMap<Snapshot.Key, TransactionResult> results = new LinkedHashMap<>();
494+
private final LinkedHashMap<Snapshot.Key, TransactionResult> results;
436495
private final AtomicBoolean fullyScanned = new AtomicBoolean();
437496
private final AtomicBoolean closed = new AtomicBoolean();
438497

@@ -441,6 +500,14 @@ public ConsensusCommitStorageScanner(Scan scan, List<String> originalProjections
441500
this.scan = scan;
442501
this.originalProjections = originalProjections;
443502
scanner = scanFromStorage(scan);
503+
504+
if (isValidationOrSnapshotReadRequired()) {
505+
results = new LinkedHashMap<>();
506+
} else {
507+
// If neither validation nor snapshot read is required, we don't need to put the results
508+
// into the scan set
509+
results = null;
510+
}
444511
}
445512

446513
@Override
@@ -456,7 +523,10 @@ public Optional<Result> one() throws CrudException {
456523
Snapshot.Key key = new Snapshot.Key(scan, r.get());
457524
TransactionResult result = new TransactionResult(r.get());
458525
processScanResult(key, scan, result);
459-
results.put(key, result);
526+
527+
if (results != null) {
528+
results.put(key, result);
529+
}
460530

461531
TableMetadata metadata = getTableMetadata(scan);
462532
return Optional.of(
@@ -499,10 +569,10 @@ public void close() {
499569
if (fullyScanned.get()) {
500570
// If the scanner is fully scanned, we can treat it as a normal scan, and put the results
501571
// into the scan set
502-
snapshot.putIntoScanSet(scan, results);
572+
putIntoScanSetInSnapshot(scan, results);
503573
} else {
504574
// If the scanner is not fully scanned, put the results into the scanner set
505-
snapshot.putIntoScannerSet(scan, results);
575+
putIntoScannerSetInSnapshot(scan, results);
506576
}
507577

508578
verifyNoOverlap(scan, results);

core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ public Optional<LinkedHashMap<Snapshot.Key, TransactionResult>> getResults(Scan
235235
return Optional.of(scanSet.get(scan));
236236
}
237237

238-
private Optional<TransactionResult> mergeResult(Key key, Optional<TransactionResult> result)
238+
public Optional<TransactionResult> mergeResult(Key key, Optional<TransactionResult> result)
239239
throws CrudException {
240240
if (deleteSet.containsKey(key)) {
241241
return Optional.empty();
@@ -249,7 +249,7 @@ private Optional<TransactionResult> mergeResult(Key key, Optional<TransactionRes
249249
}
250250
}
251251

252-
private Optional<TransactionResult> mergeResult(
252+
public Optional<TransactionResult> mergeResult(
253253
Key key, Optional<TransactionResult> result, Set<Conjunction> conjunctions)
254254
throws CrudException {
255255
return mergeResult(key, result)

0 commit comments

Comments
 (0)