Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ public void commit() throws CommitException, UnknownTransactionStatusException {

try {
crud.waitForRecoveryCompletionIfNecessary();
} catch (CrudConflictException e) {
throw new CommitConflictException(e.getMessage(), e, getId());
} catch (CrudException e) {
throw new CommitException(e.getMessage(), e, getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ public class ConsensusCommitManager extends AbstractDistributedTransactionManage
private static final Logger logger = LoggerFactory.getLogger(ConsensusCommitManager.class);
private final DistributedStorage storage;
private final DistributedStorageAdmin admin;
private final ConsensusCommitConfig config;
private final TransactionTableMetadataManager tableMetadataManager;
private final Coordinator coordinator;
private final ParallelExecutor parallelExecutor;
private final RecoveryExecutor recoveryExecutor;
protected final CommitHandler commit;
private final Isolation isolation;
private final boolean isIncludeMetadataEnabled;
private final ConsensusCommitMutationOperationChecker mutationOperationChecker;
@Nullable private final CoordinatorGroupCommitter groupCommitter;
Expand All @@ -65,7 +65,7 @@ public ConsensusCommitManager(
super(databaseConfig);
this.storage = storage;
this.admin = admin;
config = new ConsensusCommitConfig(databaseConfig);
ConsensusCommitConfig config = new ConsensusCommitConfig(databaseConfig);
coordinator = new Coordinator(storage, config);
parallelExecutor = new ParallelExecutor(config);
tableMetadataManager =
Expand All @@ -74,7 +74,8 @@ public ConsensusCommitManager(
RecoveryHandler recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
recoveryExecutor = new RecoveryExecutor(coordinator, recovery, tableMetadataManager);
groupCommitter = CoordinatorGroupCommitter.from(config).orElse(null);
commit = createCommitHandler();
commit = createCommitHandler(config);
isolation = config.getIsolation();
isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager);
}
Expand All @@ -85,7 +86,7 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
storage = storageFactory.getStorage();
admin = storageFactory.getStorageAdmin();

config = new ConsensusCommitConfig(databaseConfig);
ConsensusCommitConfig config = new ConsensusCommitConfig(databaseConfig);
coordinator = new Coordinator(storage, config);
parallelExecutor = new ParallelExecutor(config);
tableMetadataManager =
Expand All @@ -94,7 +95,8 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
RecoveryHandler recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
recoveryExecutor = new RecoveryExecutor(coordinator, recovery, tableMetadataManager);
groupCommitter = CoordinatorGroupCommitter.from(config).orElse(null);
commit = createCommitHandler();
commit = createCommitHandler(config);
isolation = config.getIsolation();
isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager);
}
Expand All @@ -104,17 +106,17 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
ConsensusCommitManager(
DistributedStorage storage,
DistributedStorageAdmin admin,
ConsensusCommitConfig config,
DatabaseConfig databaseConfig,
Coordinator coordinator,
ParallelExecutor parallelExecutor,
RecoveryExecutor recoveryExecutor,
CommitHandler commit,
Isolation isolation,
boolean isIncludeMetadataEnabled,
@Nullable CoordinatorGroupCommitter groupCommitter) {
super(databaseConfig);
this.storage = storage;
this.admin = admin;
this.config = config;
tableMetadataManager =
new TransactionTableMetadataManager(
admin, databaseConfig.getMetadataCacheExpirationTimeSecs());
Expand All @@ -123,13 +125,14 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
this.recoveryExecutor = recoveryExecutor;
this.commit = commit;
this.groupCommitter = groupCommitter;
this.isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
this.isolation = isolation;
this.isIncludeMetadataEnabled = isIncludeMetadataEnabled;
this.mutationOperationChecker =
new ConsensusCommitMutationOperationChecker(tableMetadataManager);
}

// `groupCommitter` must be set before calling this method.
private CommitHandler createCommitHandler() {
private CommitHandler createCommitHandler(ConsensusCommitConfig config) {
if (isGroupCommitEnabled()) {
return new CommitHandlerWithGroupCommit(
storage,
Expand All @@ -156,7 +159,7 @@ public DistributedTransaction begin() {

@Override
public DistributedTransaction begin(String txId) {
return begin(txId, config.getIsolation(), false, false);
return begin(txId, isolation, false, false);
}

@Override
Expand All @@ -167,14 +170,15 @@ public DistributedTransaction beginReadOnly() {

@Override
public DistributedTransaction beginReadOnly(String txId) {
return begin(txId, config.getIsolation(), true, false);
return begin(txId, isolation, true, false);
}

/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
@Deprecated
@Override
public DistributedTransaction start(com.scalar.db.api.Isolation isolation) {
return begin(Isolation.valueOf(isolation.name()));
String txId = UUID.randomUUID().toString();
return begin(txId, Isolation.valueOf(isolation.name()), false, false);
}

/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
Expand All @@ -189,14 +193,16 @@ public DistributedTransaction start(String txId, com.scalar.db.api.Isolation iso
@Override
public DistributedTransaction start(
com.scalar.db.api.Isolation isolation, com.scalar.db.api.SerializableStrategy strategy) {
return begin(Isolation.valueOf(isolation.name()));
String txId = UUID.randomUUID().toString();
return begin(txId, Isolation.valueOf(isolation.name()), false, false);
}

/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
@Deprecated
@Override
public DistributedTransaction start(com.scalar.db.api.SerializableStrategy strategy) {
return begin(Isolation.SERIALIZABLE);
String txId = UUID.randomUUID().toString();
return begin(txId, Isolation.SERIALIZABLE, false, false);
}

/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
Expand All @@ -217,18 +223,6 @@ public DistributedTransaction start(
return begin(txId, Isolation.valueOf(isolation.name()), false, false);
}

@VisibleForTesting
DistributedTransaction begin(Isolation isolation) {
String txId = UUID.randomUUID().toString();
return begin(txId, isolation, false, false);
}

@VisibleForTesting
DistributedTransaction beginReadOnly(Isolation isolation) {
String txId = UUID.randomUUID().toString();
return begin(txId, isolation, true, false);
}

@VisibleForTesting
DistributedTransaction begin(
String txId, Isolation isolation, boolean readOnly, boolean oneOperation) {
Expand All @@ -238,7 +232,7 @@ DistributedTransaction begin(
assert groupCommitter != null;
txId = groupCommitter.reserve(txId);
}
if (!config.getIsolation().equals(isolation)) {
if (!this.isolation.equals(isolation)) {
logger.warn(
"Setting different isolation level from the one in DatabaseConfig might cause unexpected "
+ "anomalies");
Expand Down Expand Up @@ -266,7 +260,7 @@ DistributedTransaction begin(

private DistributedTransaction beginOneOperation(boolean readOnly) {
String txId = UUID.randomUUID().toString();
return begin(txId, config.getIsolation(), readOnly, true);
return begin(txId, isolation, readOnly, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.scalar.db.common.AbstractTransactionCrudOperableScanner;
import com.scalar.db.common.error.CoreError;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.exception.transaction.CrudConflictException;
import com.scalar.db.exception.transaction.CrudException;
import com.scalar.db.util.ScalarDbUtils;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand Down Expand Up @@ -196,8 +197,26 @@ Optional<TransactionResult> read(@Nullable Snapshot.Key key, Get get) throws Cru

private Optional<TransactionResult> executeRecovery(
Snapshot.Key key, Selection selection, TransactionResult result) throws CrudException {
RecoveryExecutor.RecoveryType recoveryType;
if (snapshot.getIsolation() == Isolation.READ_COMMITTED) {
// In READ_COMMITTED isolation

if (readOnly) {
// In read-only mode, we don't recover the record, but return the committed result
recoveryType = RecoveryExecutor.RecoveryType.RETURN_COMMITTED_RESULT_AND_NOT_RECOVER;
} else {
// In read-write mode, we recover the record and return the committed result
recoveryType = RecoveryExecutor.RecoveryType.RETURN_COMMITTED_RESULT_AND_RECOVER;
}
} else {
// In SNAPSHOT or SERIALIZABLE isolation, we always recover the record and return the latest
// result
recoveryType = RecoveryExecutor.RecoveryType.RETURN_LATEST_RESULT_AND_RECOVER;
}
Comment on lines +201 to +215
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RecoveryType is chosen based in the isolation level and whether the transaction is read-only or not.


RecoveryExecutor.Result recoveryResult =
recoveryExecutor.execute(key, selection, result, snapshot.getId());
recoveryExecutor.execute(key, selection, result, snapshot.getId(), recoveryType);

recoveryResults.add(recoveryResult);
return recoveryResult.recoveredResult;
}
Expand Down Expand Up @@ -337,16 +356,16 @@ private void putIntoReadSetInSnapshot(Snapshot.Key key, Optional<TransactionResu
}

private boolean isSnapshotReadRequired() {
// In one-operation mode, we don't need snapshot read
return !oneOperation;
// In one-operation mode, we don't need snapshot reads
return !oneOperation && snapshot.isSnapshotReadRequired();
}

private boolean isValidationOrSnapshotReadRequired() {
return snapshot.isValidationRequired() || isSnapshotReadRequired();
}

private void putIntoGetSetInSnapshot(Get get, Optional<TransactionResult> result) {
// If neither validation nor snapshot read is required, we don't need to put the result into
// If neither validation nor snapshot reads are required, we don't need to put the result into
// the get set
if (isValidationOrSnapshotReadRequired()) {
snapshot.putIntoGetSet(get, result);
Expand All @@ -355,7 +374,7 @@ private void putIntoGetSetInSnapshot(Get get, Optional<TransactionResult> result

private void putIntoScanSetInSnapshot(
Scan scan, LinkedHashMap<Snapshot.Key, TransactionResult> results) {
// If neither validation nor snapshot read is required, we don't need to put the results into
// If neither validation nor snapshot reads are required, we don't need to put the results into
// the scan set
if (isValidationOrSnapshotReadRequired()) {
snapshot.putIntoScanSet(scan, results);
Expand All @@ -371,12 +390,16 @@ private void putIntoScannerSetInSnapshot(
}

private void verifyNoOverlap(Scan scan, Map<Snapshot.Key, TransactionResult> results) {
// In either read-only mode or one-operation mode, we don't need to verify the overlap
if (!readOnly && !oneOperation) {
if (isOverlapVerificationRequired()) {
snapshot.verifyNoOverlap(scan, results);
}
}

private boolean isOverlapVerificationRequired() {
// In either read-only mode or one-operation mode, we don't need to verify overlap
return !readOnly && !oneOperation;
}

public void put(Put put) throws CrudException {
Snapshot.Key key = new Snapshot.Key(put);

Expand Down Expand Up @@ -476,6 +499,7 @@ private Get createGet(Snapshot.Key key) {
* complete, the validation could fail due to records with PREPARED or DELETED status.
* </ul>
*
* @throws CrudConflictException if any recovery task fails due to a conflict
* @throws CrudException if any recovery task fails
*/
public void waitForRecoveryCompletionIfNecessary() throws CrudException {
Expand All @@ -487,6 +511,11 @@ public void waitForRecoveryCompletionIfNecessary() throws CrudException {
recoveryResult.recoveryFuture.get();
}
} catch (java.util.concurrent.ExecutionException e) {
if (e.getCause() instanceof CrudConflictException) {
throw new CrudConflictException(
e.getCause().getMessage(), e.getCause(), snapshot.getId());
}

throw new CrudException(
CoreError.CONSENSUS_COMMIT_RECOVERING_RECORDS_FAILED.buildMessage(
e.getCause().getMessage()),
Expand All @@ -507,6 +536,11 @@ void waitForRecoveryCompletion() throws CrudException {
try {
recoveryResult.recoveryFuture.get();
} catch (java.util.concurrent.ExecutionException e) {
if (e.getCause() instanceof CrudConflictException) {
throw new CrudConflictException(
e.getCause().getMessage(), e.getCause(), snapshot.getId());
}

throw new CrudException(
CoreError.CONSENSUS_COMMIT_RECOVERING_RECORDS_FAILED.buildMessage(
e.getCause().getMessage()),
Expand Down Expand Up @@ -737,10 +771,10 @@ public ConsensusCommitStorageScanner(Scan scan, List<String> originalProjections
scanner = scanFromStorage(scan);
}

if (isValidationOrSnapshotReadRequired()) {
if (isValidationOrSnapshotReadRequired() || isOverlapVerificationRequired()) {
results = new LinkedHashMap<>();
} else {
// If neither validation nor snapshot read is required, we don't need to put the results
// If neither validation nor snapshot reads are required, we don't need to put the results
// into the scan set
results = null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.scalar.db.transaction.consensuscommit;

public enum Isolation {
READ_COMMITTED,
SNAPSHOT,
SERIALIZABLE,
}
Loading