Skip to content

Commit 9676b88

Browse files
committed
Add READ COMMITTED isolation
1 parent ff72c0f commit 9676b88

File tree

12 files changed

+4620
-2808
lines changed

12 files changed

+4620
-2808
lines changed

core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ public void commit() throws CommitException, UnknownTransactionStatusException {
198198

199199
try {
200200
crud.waitForRecoveryCompletionIfNecessary();
201+
} catch (CrudConflictException e) {
202+
throw new CommitConflictException(e.getMessage(), e, getId());
201203
} catch (CrudException e) {
202204
throw new CommitException(e.getMessage(), e, getId());
203205
}

core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.scalar.db.common.AbstractTransactionCrudOperableScanner;
2626
import com.scalar.db.common.error.CoreError;
2727
import com.scalar.db.exception.storage.ExecutionException;
28+
import com.scalar.db.exception.transaction.CrudConflictException;
2829
import com.scalar.db.exception.transaction.CrudException;
2930
import com.scalar.db.util.ScalarDbUtils;
3031
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -205,8 +206,26 @@ Optional<TransactionResult> read(@Nullable Snapshot.Key key, Get get) throws Cru
205206

206207
private Optional<TransactionResult> executeRecovery(
207208
Snapshot.Key key, Selection selection, TransactionResult result) throws CrudException {
209+
RecoveryExecutor.RecoveryType recoveryType;
210+
if (snapshot.getIsolation() == Isolation.READ_COMMITTED) {
211+
// In READ_COMMITTED isolation
212+
213+
if (readOnly) {
214+
// In read-only mode, we don't recover the record, but return the committed result
215+
recoveryType = RecoveryExecutor.RecoveryType.RETURN_COMMITTED_RESULT_AND_NOT_RECOVER;
216+
} else {
217+
// In read-write mode, we recover the record and return the committed result
218+
recoveryType = RecoveryExecutor.RecoveryType.RETURN_COMMITTED_RESULT_AND_RECOVER;
219+
}
220+
} else {
221+
// In SNAPSHOT or SERIALIZABLE isolation, we always recover the record and return the latest
222+
// result
223+
recoveryType = RecoveryExecutor.RecoveryType.RETURN_LATEST_RESULT_AND_RECOVER;
224+
}
225+
208226
RecoveryExecutor.Result recoveryResult =
209-
recoveryExecutor.execute(key, selection, result, snapshot.getId());
227+
recoveryExecutor.execute(key, selection, result, snapshot.getId(), recoveryType);
228+
210229
recoveryResults.add(recoveryResult);
211230
return recoveryResult.recoveredResult;
212231
}
@@ -339,14 +358,16 @@ public void closeScanners() throws CrudException {
339358

340359
private void putIntoReadSetInSnapshot(Snapshot.Key key, Optional<TransactionResult> result) {
341360
// In read-only mode, we don't need to put the result into the read set
342-
if (!readOnly && !snapshot.containsKeyInReadSet(key)) {
343-
snapshot.putIntoReadSet(key, result);
361+
if (!readOnly) {
362+
if (snapshot.shouldOverwriteReadSet() || !snapshot.containsKeyInReadSet(key)) {
363+
snapshot.putIntoReadSet(key, result);
364+
}
344365
}
345366
}
346367

347368
private boolean isSnapshotReadRequired() {
348369
// In single-operation mode, we don't need snapshot read
349-
return !singleOperation;
370+
return !singleOperation && snapshot.isSnapshotReadRequired();
350371
}
351372

352373
private boolean isValidationOrSnapshotReadRequired() {
@@ -379,12 +400,16 @@ private void putIntoScannerSetInSnapshot(
379400
}
380401

381402
private void verifyNoOverlap(Scan scan, Map<Snapshot.Key, TransactionResult> results) {
382-
// In either read-only mode or single-operation mode, we don't need to verify the overlap
383-
if (!readOnly && !singleOperation) {
403+
if (isOverlapVerificationRequired()) {
384404
snapshot.verifyNoOverlap(scan, results);
385405
}
386406
}
387407

408+
private boolean isOverlapVerificationRequired() {
409+
// In either read-only mode or single-operation mode, we don't need to verify overlap
410+
return !readOnly && !singleOperation;
411+
}
412+
388413
public void put(Put put) throws CrudException {
389414
Snapshot.Key key = new Snapshot.Key(put);
390415

@@ -462,11 +487,27 @@ public void waitForRecoveryCompletionIfNecessary() throws CrudException {
462487
for (RecoveryExecutor.Result recoveryResult : recoveryResults) {
463488
try {
464489
if (snapshot.containsKeyInWriteSet(recoveryResult.key)
465-
|| snapshot.containsKeyInDeleteSet(recoveryResult.key)
466-
|| snapshot.isValidationRequired()) {
490+
|| snapshot.containsKeyInDeleteSet(recoveryResult.key)) {
491+
Optional<TransactionResult> result = recoveryResult.recoveryFuture.get();
492+
493+
if (snapshot.shouldOverwriteReadSet() && !readOnly) {
494+
// Overwrite the read set with the result of the recovery only if the result in the read
495+
// set has not been updated since the updated result should be the latter one.
496+
Optional<TransactionResult> resultFromReadSet =
497+
snapshot.getFromReadSet(recoveryResult.key);
498+
if (resultFromReadSet.equals(recoveryResult.recoveredResult)) {
499+
snapshot.putIntoReadSet(recoveryResult.key, result);
500+
}
501+
}
502+
} else if (snapshot.isValidationRequired()) {
467503
recoveryResult.recoveryFuture.get();
468504
}
469505
} catch (java.util.concurrent.ExecutionException e) {
506+
if (e.getCause() instanceof CrudConflictException) {
507+
throw new CrudConflictException(
508+
e.getCause().getMessage(), e.getCause(), snapshot.getId());
509+
}
510+
470511
throw new CrudException(
471512
CoreError.CONSENSUS_COMMIT_RECOVERING_RECORDS_FAILED.buildMessage(
472513
e.getCause().getMessage()),
@@ -487,6 +528,11 @@ void waitForRecoveryCompletion() throws CrudException {
487528
try {
488529
recoveryResult.recoveryFuture.get();
489530
} catch (java.util.concurrent.ExecutionException e) {
531+
if (e.getCause() instanceof CrudConflictException) {
532+
throw new CrudConflictException(
533+
e.getCause().getMessage(), e.getCause(), snapshot.getId());
534+
}
535+
490536
throw new CrudException(
491537
CoreError.CONSENSUS_COMMIT_RECOVERING_RECORDS_FAILED.buildMessage(
492538
e.getCause().getMessage()),
@@ -717,7 +763,7 @@ public ConsensusCommitStorageScanner(Scan scan, List<String> originalProjections
717763
scanner = scanFromStorage(scan);
718764
}
719765

720-
if (isValidationOrSnapshotReadRequired()) {
766+
if (isValidationOrSnapshotReadRequired() || isOverlapVerificationRequired()) {
721767
results = new LinkedHashMap<>();
722768
} else {
723769
// If neither validation nor snapshot read is required, we don't need to put the results
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.scalar.db.transaction.consensuscommit;
22

33
public enum Isolation {
4+
READ_COMMITTED,
45
SNAPSHOT,
56
SERIALIZABLE,
67
}

core/src/main/java/com/scalar/db/transaction/consensuscommit/RecoveryExecutor.java

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.extractAfterImageColumnsFromBeforeImage;
44

55
import com.google.common.annotations.VisibleForTesting;
6+
import com.google.common.util.concurrent.Futures;
67
import com.google.common.util.concurrent.ThreadFactoryBuilder;
78
import com.google.common.util.concurrent.Uninterruptibles;
89
import com.scalar.db.api.Operation;
@@ -64,22 +65,61 @@ public RecoveryExecutor(
6465
}
6566

6667
public Result execute(
67-
Snapshot.Key key, Selection selection, TransactionResult result, String transactionId)
68+
Snapshot.Key key,
69+
Selection selection,
70+
TransactionResult result,
71+
String transactionId,
72+
RecoveryType recoveryType)
6873
throws CrudException {
6974
assert !result.isCommitted();
7075

71-
Optional<Coordinator.State> state = getCoordinatorState(result.getId());
72-
73-
Optional<TransactionResult> recoveredResult =
74-
createRecoveredResult(state, selection, result, transactionId);
75-
76-
// Recover the record
77-
Future<Void> future =
78-
executorService.submit(
79-
() -> {
80-
recovery.recover(selection, result, state);
81-
return null;
82-
});
76+
Optional<TransactionResult> recoveredResult;
77+
Future<Optional<TransactionResult>> future;
78+
79+
switch (recoveryType) {
80+
case RETURN_LATEST_RESULT_AND_RECOVER:
81+
Optional<Coordinator.State> state = getCoordinatorState(result.getId());
82+
83+
// Return the latest result
84+
recoveredResult = createRecoveredResult(state, selection, result, transactionId);
85+
86+
// Recover the record
87+
future =
88+
executorService.submit(
89+
() -> {
90+
recovery.recover(selection, result, state);
91+
return recoveredResult;
92+
});
93+
94+
break;
95+
case RETURN_COMMITTED_RESULT_AND_RECOVER:
96+
// Return the committed result
97+
recoveredResult = createRolledBackRecord(selection, result, transactionId);
98+
99+
// Recover the record
100+
future =
101+
executorService.submit(
102+
() -> {
103+
Optional<Coordinator.State> s = getCoordinatorState(result.getId());
104+
Optional<TransactionResult> r =
105+
createRecoveredResult(s, selection, result, transactionId);
106+
107+
recovery.recover(selection, result, s);
108+
return r;
109+
});
110+
111+
break;
112+
case RETURN_COMMITTED_RESULT_AND_NOT_RECOVER:
113+
// Return the committed result
114+
recoveredResult = createRolledBackRecord(selection, result, transactionId);
115+
116+
// No need to recover the record
117+
future = Futures.immediateFuture(recoveredResult);
118+
119+
break;
120+
default:
121+
throw new AssertionError("Unknown recovery type: " + recoveryType);
122+
}
83123

84124
return new Result(key, recoveredResult, future);
85125
}
@@ -263,15 +303,21 @@ public static class Result {
263303
public final Optional<TransactionResult> recoveredResult;
264304

265305
// The future that completes when the recovery is done
266-
public final Future<Void> recoveryFuture;
306+
public final Future<Optional<TransactionResult>> recoveryFuture;
267307

268308
public Result(
269309
Snapshot.Key key,
270310
Optional<TransactionResult> recoveredResult,
271-
Future<Void> recoveryFuture) {
311+
Future<Optional<TransactionResult>> recoveryFuture) {
272312
this.key = key;
273313
this.recoveredResult = recoveredResult;
274314
this.recoveryFuture = recoveryFuture;
275315
}
276316
}
317+
318+
public enum RecoveryType {
319+
RETURN_LATEST_RESULT_AND_RECOVER,
320+
RETURN_COMMITTED_RESULT_AND_RECOVER,
321+
RETURN_COMMITTED_RESULT_AND_NOT_RECOVER
322+
}
277323
}

core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,8 @@ public String getId() {
128128
return id;
129129
}
130130

131-
@VisibleForTesting
132131
@Nonnull
133-
Isolation getIsolation() {
132+
public Isolation getIsolation() {
134133
return isolation;
135134
}
136135

@@ -214,6 +213,10 @@ public boolean containsKeyInReadSet(Key key) {
214213
return readSet.containsKey(key);
215214
}
216215

216+
public Optional<TransactionResult> getFromReadSet(Key key) {
217+
return readSet.get(key);
218+
}
219+
217220
public boolean containsKeyInGetSet(Get get) {
218221
return getSet.containsKey(get);
219222
}
@@ -756,6 +759,14 @@ private boolean isSerializable() {
756759
return isolation == Isolation.SERIALIZABLE;
757760
}
758761

762+
public boolean isSnapshotReadRequired() {
763+
return isolation != Isolation.READ_COMMITTED;
764+
}
765+
766+
public boolean shouldOverwriteReadSet() {
767+
return isolation == Isolation.READ_COMMITTED;
768+
}
769+
759770
public boolean isValidationRequired() {
760771
return isSerializable();
761772
}

core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ public void prepare() throws PreparationException {
195195

196196
try {
197197
crud.waitForRecoveryCompletionIfNecessary();
198+
} catch (CrudConflictException e) {
199+
throw new PreparationConflictException(e.getMessage(), e, getId());
198200
} catch (CrudException e) {
199201
throw new PreparationException(e.getMessage(), e, getId());
200202
}

core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfigTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public void constructor_PropertiesWithDeprecatedIsolationLevelGiven_ShouldLoadPr
6262
public void constructor_UnsupportedIsolationGiven_ShouldThrowIllegalArgumentException() {
6363
// Arrange
6464
Properties props = new Properties();
65-
props.setProperty(ConsensusCommitConfig.ISOLATION_LEVEL, "READ_COMMITTED");
65+
props.setProperty(ConsensusCommitConfig.ISOLATION_LEVEL, "READ_UNCOMMITTED");
6666

6767
// Act Assert
6868
assertThatThrownBy(() -> new ConsensusCommitConfig(new DatabaseConfig(props)))

core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,18 @@ public void commit_ScannerNotClosed_ShouldThrowIllegalStateException() {
571571
assertThatThrownBy(() -> consensus.commit()).isInstanceOf(IllegalStateException.class);
572572
}
573573

574+
@Test
575+
public void
576+
commit_CrudConflictExceptionThrownByCrudHandlerWaitForRecoveryCompletionIfNecessary_ShouldThrowCommitConflictException()
577+
throws CrudException {
578+
// Arrange
579+
when(crud.getSnapshot()).thenReturn(snapshot);
580+
doThrow(CrudConflictException.class).when(crud).waitForRecoveryCompletionIfNecessary();
581+
582+
// Act Assert
583+
assertThatThrownBy(() -> consensus.commit()).isInstanceOf(CommitConflictException.class);
584+
}
585+
574586
@Test
575587
public void
576588
commit_CrudExceptionThrownByCrudHandlerWaitForRecoveryCompletionIfNecessary_ShouldThrowCommitException()

0 commit comments

Comments
 (0)