Skip to content

Commit daf4d88

Browse files
authored
Add READ_COMMITTED isolation (#2803)
1 parent f3969d9 commit daf4d88

File tree

15 files changed

+4862
-2976
lines changed

15 files changed

+4862
-2976
lines changed

core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ public void commit() throws CommitException, UnknownTransactionStatusException {
198198

199199
try {
200200
crud.waitForRecoveryCompletionIfNecessary();
201+
} catch (CrudConflictException e) {
202+
throw new CommitConflictException(e.getMessage(), e, getId());
201203
} catch (CrudException e) {
202204
throw new CommitException(e.getMessage(), e, getId());
203205
}

core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java

Lines changed: 22 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,12 @@ public class ConsensusCommitManager extends AbstractDistributedTransactionManage
4848
private static final Logger logger = LoggerFactory.getLogger(ConsensusCommitManager.class);
4949
private final DistributedStorage storage;
5050
private final DistributedStorageAdmin admin;
51-
private final ConsensusCommitConfig config;
5251
private final TransactionTableMetadataManager tableMetadataManager;
5352
private final Coordinator coordinator;
5453
private final ParallelExecutor parallelExecutor;
5554
private final RecoveryExecutor recoveryExecutor;
5655
protected final CommitHandler commit;
56+
private final Isolation isolation;
5757
private final boolean isIncludeMetadataEnabled;
5858
private final ConsensusCommitMutationOperationChecker mutationOperationChecker;
5959
@Nullable private final CoordinatorGroupCommitter groupCommitter;
@@ -65,7 +65,7 @@ public ConsensusCommitManager(
6565
super(databaseConfig);
6666
this.storage = storage;
6767
this.admin = admin;
68-
config = new ConsensusCommitConfig(databaseConfig);
68+
ConsensusCommitConfig config = new ConsensusCommitConfig(databaseConfig);
6969
coordinator = new Coordinator(storage, config);
7070
parallelExecutor = new ParallelExecutor(config);
7171
tableMetadataManager =
@@ -74,7 +74,8 @@ public ConsensusCommitManager(
7474
RecoveryHandler recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
7575
recoveryExecutor = new RecoveryExecutor(coordinator, recovery, tableMetadataManager);
7676
groupCommitter = CoordinatorGroupCommitter.from(config).orElse(null);
77-
commit = createCommitHandler();
77+
commit = createCommitHandler(config);
78+
isolation = config.getIsolation();
7879
isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
7980
mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager);
8081
}
@@ -85,7 +86,7 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
8586
storage = storageFactory.getStorage();
8687
admin = storageFactory.getStorageAdmin();
8788

88-
config = new ConsensusCommitConfig(databaseConfig);
89+
ConsensusCommitConfig config = new ConsensusCommitConfig(databaseConfig);
8990
coordinator = new Coordinator(storage, config);
9091
parallelExecutor = new ParallelExecutor(config);
9192
tableMetadataManager =
@@ -94,7 +95,8 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
9495
RecoveryHandler recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
9596
recoveryExecutor = new RecoveryExecutor(coordinator, recovery, tableMetadataManager);
9697
groupCommitter = CoordinatorGroupCommitter.from(config).orElse(null);
97-
commit = createCommitHandler();
98+
commit = createCommitHandler(config);
99+
isolation = config.getIsolation();
98100
isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
99101
mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager);
100102
}
@@ -104,17 +106,17 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
104106
ConsensusCommitManager(
105107
DistributedStorage storage,
106108
DistributedStorageAdmin admin,
107-
ConsensusCommitConfig config,
108109
DatabaseConfig databaseConfig,
109110
Coordinator coordinator,
110111
ParallelExecutor parallelExecutor,
111112
RecoveryExecutor recoveryExecutor,
112113
CommitHandler commit,
114+
Isolation isolation,
115+
boolean isIncludeMetadataEnabled,
113116
@Nullable CoordinatorGroupCommitter groupCommitter) {
114117
super(databaseConfig);
115118
this.storage = storage;
116119
this.admin = admin;
117-
this.config = config;
118120
tableMetadataManager =
119121
new TransactionTableMetadataManager(
120122
admin, databaseConfig.getMetadataCacheExpirationTimeSecs());
@@ -123,13 +125,14 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
123125
this.recoveryExecutor = recoveryExecutor;
124126
this.commit = commit;
125127
this.groupCommitter = groupCommitter;
126-
this.isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
128+
this.isolation = isolation;
129+
this.isIncludeMetadataEnabled = isIncludeMetadataEnabled;
127130
this.mutationOperationChecker =
128131
new ConsensusCommitMutationOperationChecker(tableMetadataManager);
129132
}
130133

131134
// `groupCommitter` must be set before calling this method.
132-
private CommitHandler createCommitHandler() {
135+
private CommitHandler createCommitHandler(ConsensusCommitConfig config) {
133136
if (isGroupCommitEnabled()) {
134137
return new CommitHandlerWithGroupCommit(
135138
storage,
@@ -156,7 +159,7 @@ public DistributedTransaction begin() {
156159

157160
@Override
158161
public DistributedTransaction begin(String txId) {
159-
return begin(txId, config.getIsolation(), false, false);
162+
return begin(txId, isolation, false, false);
160163
}
161164

162165
@Override
@@ -167,14 +170,15 @@ public DistributedTransaction beginReadOnly() {
167170

168171
@Override
169172
public DistributedTransaction beginReadOnly(String txId) {
170-
return begin(txId, config.getIsolation(), true, false);
173+
return begin(txId, isolation, true, false);
171174
}
172175

173176
/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
174177
@Deprecated
175178
@Override
176179
public DistributedTransaction start(com.scalar.db.api.Isolation isolation) {
177-
return begin(Isolation.valueOf(isolation.name()));
180+
String txId = UUID.randomUUID().toString();
181+
return begin(txId, Isolation.valueOf(isolation.name()), false, false);
178182
}
179183

180184
/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
@@ -189,14 +193,16 @@ public DistributedTransaction start(String txId, com.scalar.db.api.Isolation iso
189193
@Override
190194
public DistributedTransaction start(
191195
com.scalar.db.api.Isolation isolation, com.scalar.db.api.SerializableStrategy strategy) {
192-
return begin(Isolation.valueOf(isolation.name()));
196+
String txId = UUID.randomUUID().toString();
197+
return begin(txId, Isolation.valueOf(isolation.name()), false, false);
193198
}
194199

195200
/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
196201
@Deprecated
197202
@Override
198203
public DistributedTransaction start(com.scalar.db.api.SerializableStrategy strategy) {
199-
return begin(Isolation.SERIALIZABLE);
204+
String txId = UUID.randomUUID().toString();
205+
return begin(txId, Isolation.SERIALIZABLE, false, false);
200206
}
201207

202208
/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
@@ -217,18 +223,6 @@ public DistributedTransaction start(
217223
return begin(txId, Isolation.valueOf(isolation.name()), false, false);
218224
}
219225

220-
@VisibleForTesting
221-
DistributedTransaction begin(Isolation isolation) {
222-
String txId = UUID.randomUUID().toString();
223-
return begin(txId, isolation, false, false);
224-
}
225-
226-
@VisibleForTesting
227-
DistributedTransaction beginReadOnly(Isolation isolation) {
228-
String txId = UUID.randomUUID().toString();
229-
return begin(txId, isolation, true, false);
230-
}
231-
232226
@VisibleForTesting
233227
DistributedTransaction begin(
234228
String txId, Isolation isolation, boolean readOnly, boolean oneOperation) {
@@ -238,7 +232,7 @@ DistributedTransaction begin(
238232
assert groupCommitter != null;
239233
txId = groupCommitter.reserve(txId);
240234
}
241-
if (!config.getIsolation().equals(isolation)) {
235+
if (!this.isolation.equals(isolation)) {
242236
logger.warn(
243237
"Setting different isolation level from the one in DatabaseConfig might cause unexpected "
244238
+ "anomalies");
@@ -266,7 +260,7 @@ DistributedTransaction begin(
266260

267261
private DistributedTransaction beginOneOperation(boolean readOnly) {
268262
String txId = UUID.randomUUID().toString();
269-
return begin(txId, config.getIsolation(), readOnly, true);
263+
return begin(txId, isolation, readOnly, true);
270264
}
271265

272266
@Override

core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.scalar.db.common.AbstractTransactionCrudOperableScanner;
2626
import com.scalar.db.common.error.CoreError;
2727
import com.scalar.db.exception.storage.ExecutionException;
28+
import com.scalar.db.exception.transaction.CrudConflictException;
2829
import com.scalar.db.exception.transaction.CrudException;
2930
import com.scalar.db.util.ScalarDbUtils;
3031
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -196,8 +197,26 @@ Optional<TransactionResult> read(@Nullable Snapshot.Key key, Get get) throws Cru
196197

197198
private Optional<TransactionResult> executeRecovery(
198199
Snapshot.Key key, Selection selection, TransactionResult result) throws CrudException {
200+
RecoveryExecutor.RecoveryType recoveryType;
201+
if (snapshot.getIsolation() == Isolation.READ_COMMITTED) {
202+
// In READ_COMMITTED isolation
203+
204+
if (readOnly) {
205+
// In read-only mode, we don't recover the record, but return the committed result
206+
recoveryType = RecoveryExecutor.RecoveryType.RETURN_COMMITTED_RESULT_AND_NOT_RECOVER;
207+
} else {
208+
// In read-write mode, we recover the record and return the committed result
209+
recoveryType = RecoveryExecutor.RecoveryType.RETURN_COMMITTED_RESULT_AND_RECOVER;
210+
}
211+
} else {
212+
// In SNAPSHOT or SERIALIZABLE isolation, we always recover the record and return the latest
213+
// result
214+
recoveryType = RecoveryExecutor.RecoveryType.RETURN_LATEST_RESULT_AND_RECOVER;
215+
}
216+
199217
RecoveryExecutor.Result recoveryResult =
200-
recoveryExecutor.execute(key, selection, result, snapshot.getId());
218+
recoveryExecutor.execute(key, selection, result, snapshot.getId(), recoveryType);
219+
201220
recoveryResults.add(recoveryResult);
202221
return recoveryResult.recoveredResult;
203222
}
@@ -337,16 +356,16 @@ private void putIntoReadSetInSnapshot(Snapshot.Key key, Optional<TransactionResu
337356
}
338357

339358
private boolean isSnapshotReadRequired() {
340-
// In one-operation mode, we don't need snapshot read
341-
return !oneOperation;
359+
// In one-operation mode, we don't need snapshot reads
360+
return !oneOperation && snapshot.isSnapshotReadRequired();
342361
}
343362

344363
private boolean isValidationOrSnapshotReadRequired() {
345364
return snapshot.isValidationRequired() || isSnapshotReadRequired();
346365
}
347366

348367
private void putIntoGetSetInSnapshot(Get get, Optional<TransactionResult> result) {
349-
// If neither validation nor snapshot read is required, we don't need to put the result into
368+
// If neither validation nor snapshot reads are required, we don't need to put the result into
350369
// the get set
351370
if (isValidationOrSnapshotReadRequired()) {
352371
snapshot.putIntoGetSet(get, result);
@@ -355,7 +374,7 @@ private void putIntoGetSetInSnapshot(Get get, Optional<TransactionResult> result
355374

356375
private void putIntoScanSetInSnapshot(
357376
Scan scan, LinkedHashMap<Snapshot.Key, TransactionResult> results) {
358-
// If neither validation nor snapshot read is required, we don't need to put the results into
377+
// If neither validation nor snapshot reads are required, we don't need to put the results into
359378
// the scan set
360379
if (isValidationOrSnapshotReadRequired()) {
361380
snapshot.putIntoScanSet(scan, results);
@@ -371,12 +390,16 @@ private void putIntoScannerSetInSnapshot(
371390
}
372391

373392
private void verifyNoOverlap(Scan scan, Map<Snapshot.Key, TransactionResult> results) {
374-
// In either read-only mode or one-operation mode, we don't need to verify the overlap
375-
if (!readOnly && !oneOperation) {
393+
if (isOverlapVerificationRequired()) {
376394
snapshot.verifyNoOverlap(scan, results);
377395
}
378396
}
379397

398+
private boolean isOverlapVerificationRequired() {
399+
// In either read-only mode or one-operation mode, we don't need to verify overlap
400+
return !readOnly && !oneOperation;
401+
}
402+
380403
public void put(Put put) throws CrudException {
381404
Snapshot.Key key = new Snapshot.Key(put);
382405

@@ -476,6 +499,7 @@ private Get createGet(Snapshot.Key key) {
476499
* complete, the validation could fail due to records with PREPARED or DELETED status.
477500
* </ul>
478501
*
502+
* @throws CrudConflictException if any recovery task fails due to a conflict
479503
* @throws CrudException if any recovery task fails
480504
*/
481505
public void waitForRecoveryCompletionIfNecessary() throws CrudException {
@@ -487,6 +511,11 @@ public void waitForRecoveryCompletionIfNecessary() throws CrudException {
487511
recoveryResult.recoveryFuture.get();
488512
}
489513
} catch (java.util.concurrent.ExecutionException e) {
514+
if (e.getCause() instanceof CrudConflictException) {
515+
throw new CrudConflictException(
516+
e.getCause().getMessage(), e.getCause(), snapshot.getId());
517+
}
518+
490519
throw new CrudException(
491520
CoreError.CONSENSUS_COMMIT_RECOVERING_RECORDS_FAILED.buildMessage(
492521
e.getCause().getMessage()),
@@ -507,6 +536,11 @@ void waitForRecoveryCompletion() throws CrudException {
507536
try {
508537
recoveryResult.recoveryFuture.get();
509538
} catch (java.util.concurrent.ExecutionException e) {
539+
if (e.getCause() instanceof CrudConflictException) {
540+
throw new CrudConflictException(
541+
e.getCause().getMessage(), e.getCause(), snapshot.getId());
542+
}
543+
510544
throw new CrudException(
511545
CoreError.CONSENSUS_COMMIT_RECOVERING_RECORDS_FAILED.buildMessage(
512546
e.getCause().getMessage()),
@@ -737,10 +771,10 @@ public ConsensusCommitStorageScanner(Scan scan, List<String> originalProjections
737771
scanner = scanFromStorage(scan);
738772
}
739773

740-
if (isValidationOrSnapshotReadRequired()) {
774+
if (isValidationOrSnapshotReadRequired() || isOverlapVerificationRequired()) {
741775
results = new LinkedHashMap<>();
742776
} else {
743-
// If neither validation nor snapshot read is required, we don't need to put the results
777+
// If neither validation nor snapshot reads are required, we don't need to put the results
744778
// into the scan set
745779
results = null;
746780
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.scalar.db.transaction.consensuscommit;
22

33
public enum Isolation {
4+
READ_COMMITTED,
45
SNAPSHOT,
56
SERIALIZABLE,
67
}

0 commit comments

Comments
 (0)