Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/java/com/scalar/db/common/error/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,8 @@ public enum CoreError implements ScalarDbError {
"Getting the storage information failed. Namespace: %s",
"",
""),
CONSENSUS_COMMIT_RECOVERING_RECORDS_FAILED(
Category.INTERNAL_ERROR, "0057", "Recovering records failed. Details: %s", "", ""),

//
// Errors for the unknown transaction status error category
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static com.scalar.db.transaction.consensuscommit.Attribute.STATE;
import static com.scalar.db.transaction.consensuscommit.Attribute.toIdValue;
import static com.scalar.db.transaction.consensuscommit.Attribute.toStateValue;
import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.getTransactionTableMetadata;

import com.google.common.annotations.VisibleForTesting;
import com.scalar.db.api.ConditionBuilder;
Expand Down Expand Up @@ -105,7 +106,7 @@ private Put composePut(Operation base, @Nullable TransactionResult result)
// Set before image columns to null
if (result != null) {
TransactionTableMetadata transactionTableMetadata =
tableMetadataManager.getTransactionTableMetadata(base);
getTransactionTableMetadata(tableMetadataManager, base);
LinkedHashSet<String> beforeImageColumnNames =
transactionTableMetadata.getBeforeImageColumnNames();
TableMetadata tableMetadata = transactionTableMetadata.getTableMetadata();
Expand Down Expand Up @@ -137,8 +138,9 @@ private Key getPartitionKey(Operation base, @Nullable TransactionResult result)
assert base instanceof Selection;
if (result != null) {
// for rollforward in lazy recovery
TransactionTableMetadata metadata = tableMetadataManager.getTransactionTableMetadata(base);
return ScalarDbUtils.getPartitionKey(result, metadata.getTableMetadata());
TransactionTableMetadata transactionTableMetadata =
getTransactionTableMetadata(tableMetadataManager, base);
return ScalarDbUtils.getPartitionKey(result, transactionTableMetadata.getTableMetadata());
} else {
throw new AssertionError(
"This path should not be reached since the EXTRA_WRITE strategy is deleted");
Expand All @@ -155,8 +157,9 @@ private Optional<Key> getClusteringKey(Operation base, @Nullable TransactionResu
assert base instanceof Selection;
if (result != null) {
// for rollforward in lazy recovery
TransactionTableMetadata metadata = tableMetadataManager.getTransactionTableMetadata(base);
return ScalarDbUtils.getClusteringKey(result, metadata.getTableMetadata());
TransactionTableMetadata transactionTableMetadata =
getTransactionTableMetadata(tableMetadataManager, base);
return ScalarDbUtils.getClusteringKey(result, transactionTableMetadata.getTableMetadata());
} else {
throw new AssertionError(
"This path should not be reached since the EXTRA_WRITE strategy is deleted");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import com.scalar.db.exception.transaction.UnsatisfiedConditionException;
import com.scalar.db.util.ScalarDbUtils;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
Expand All @@ -51,24 +49,19 @@ public class ConsensusCommit extends AbstractDistributedTransaction {
private static final Logger logger = LoggerFactory.getLogger(ConsensusCommit.class);
private final CrudHandler crud;
private final CommitHandler commit;
private final RecoveryHandler recovery;
private final ConsensusCommitMutationOperationChecker mutationOperationChecker;
@Nullable private final CoordinatorGroupCommitter groupCommitter;
private Runnable beforeRecoveryHook;

@SuppressFBWarnings("EI_EXPOSE_REP2")
public ConsensusCommit(
CrudHandler crud,
CommitHandler commit,
RecoveryHandler recovery,
ConsensusCommitMutationOperationChecker mutationOperationChecker,
@Nullable CoordinatorGroupCommitter groupCommitter) {
this.crud = checkNotNull(crud);
this.commit = checkNotNull(commit);
this.recovery = checkNotNull(recovery);
this.mutationOperationChecker = mutationOperationChecker;
this.groupCommitter = groupCommitter;
this.beforeRecoveryHook = () -> {};
}

@Override
Expand All @@ -78,63 +71,18 @@ public String getId() {

@Override
public Optional<Result> get(Get get) throws CrudException {
get = copyAndSetTargetToIfNot(get);
try {
return crud.get(get);
} catch (UncommittedRecordException e) {
lazyRecovery(e);
throw e;
}
return crud.get(copyAndSetTargetToIfNot(get));
}

@Override
public List<Result> scan(Scan scan) throws CrudException {
scan = copyAndSetTargetToIfNot(scan);
try {
return crud.scan(scan);
} catch (UncommittedRecordException e) {
lazyRecovery(e);
throw e;
}
return crud.scan(copyAndSetTargetToIfNot(scan));
}

@Override
public Scanner getScanner(Scan scan) throws CrudException {
scan = copyAndSetTargetToIfNot(scan);
Scanner scanner = crud.getScanner(scan);

return new Scanner() {
@Override
public Optional<Result> one() throws CrudException {
try {
return scanner.one();
} catch (UncommittedRecordException e) {
lazyRecovery(e);
throw e;
}
}

@Override
public List<Result> all() throws CrudException {
try {
return scanner.all();
} catch (UncommittedRecordException e) {
lazyRecovery(e);
throw e;
}
}

@Override
public void close() throws CrudException {
scanner.close();
}

@Nonnull
@Override
public Iterator<Result> iterator() {
return scanner.iterator();
}
};
return crud.getScanner(scan);
}

/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
Expand All @@ -143,12 +91,7 @@ public Iterator<Result> iterator() {
public void put(Put put) throws CrudException {
put = copyAndSetTargetToIfNot(put);
checkMutation(put);
try {
crud.put(put);
} catch (UncommittedRecordException e) {
lazyRecovery(e);
throw e;
}
crud.put(put);
}

/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
Expand All @@ -165,12 +108,7 @@ public void put(List<Put> puts) throws CrudException {
public void delete(Delete delete) throws CrudException {
delete = copyAndSetTargetToIfNot(delete);
checkMutation(delete);
try {
crud.delete(delete);
} catch (UncommittedRecordException e) {
lazyRecovery(e);
throw e;
}
crud.delete(delete);
}

/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
Expand All @@ -196,12 +134,7 @@ public void upsert(Upsert upsert) throws CrudException {
upsert = copyAndSetTargetToIfNot(upsert);
Put put = ConsensusCommitUtils.createPutForUpsert(upsert);
checkMutation(put);
try {
crud.put(put);
} catch (UncommittedRecordException e) {
lazyRecovery(e);
throw e;
}
crud.put(put);
}

@Override
Expand All @@ -222,9 +155,6 @@ public void update(Update update) throws CrudException {

// If the condition is not specified, it means that the record does not exist. In this case,
// we do nothing
} catch (UncommittedRecordException e) {
lazyRecovery(e);
throw e;
}
}

Expand Down Expand Up @@ -257,9 +187,6 @@ public void commit() throws CommitException, UnknownTransactionStatusException {
try {
crud.readIfImplicitPreReadEnabled();
} catch (CrudConflictException e) {
if (e instanceof UncommittedRecordException) {
lazyRecovery((UncommittedRecordException) e);
}
throw new CommitConflictException(
CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHILE_IMPLICIT_PRE_READ.buildMessage(),
e,
Expand All @@ -269,6 +196,12 @@ public void commit() throws CommitException, UnknownTransactionStatusException {
CoreError.CONSENSUS_COMMIT_EXECUTING_IMPLICIT_PRE_READ_FAILED.buildMessage(), e, getId());
}

try {
crud.waitForRecoveryCompletionIfNecessary();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait lazy recoveries before committing the transaction if necessary.

} catch (CrudException e) {
throw new CommitException(e.getMessage(), e, getId());
}

commit.commit(crud.getSnapshot(), crud.isReadOnly());
}

Expand All @@ -295,22 +228,6 @@ CommitHandler getCommitHandler() {
return commit;
}

@VisibleForTesting
RecoveryHandler getRecoveryHandler() {
return recovery;
}

@VisibleForTesting
void setBeforeRecoveryHook(Runnable beforeRecoveryHook) {
this.beforeRecoveryHook = beforeRecoveryHook;
}

private void lazyRecovery(UncommittedRecordException e) {
logger.debug("Recover uncommitted records: {}", e.getResults());
beforeRecoveryHook.run();
e.getResults().forEach(r -> recovery.recover(e.getSelection(), r));
}

private void checkMutation(Mutation mutation) throws CrudException {
try {
mutationOperationChecker.check(mutation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,8 @@ public class ConsensusCommitConfig {

public static final String COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED =
PREFIX + "coordinator.write_omission_on_read_only.enabled";

public static final String PARALLEL_IMPLICIT_PRE_READ =
PREFIX + "parallel_implicit_pre_read.enabled";

public static final int DEFAULT_PARALLEL_EXECUTOR_COUNT = 128;

public static final String INCLUDE_METADATA_ENABLED = PREFIX + "include_metadata.enabled";

public static final String COORDINATOR_GROUP_COMMIT_PREFIX = PREFIX + "coordinator.group_commit.";
Expand All @@ -59,6 +55,8 @@ public class ConsensusCommitConfig {
public static final String COORDINATOR_GROUP_COMMIT_METRICS_MONITOR_LOG_ENABLED =
COORDINATOR_GROUP_COMMIT_PREFIX + "metrics_monitor_log_enabled";

public static final int DEFAULT_PARALLEL_EXECUTOR_COUNT = 128;

public static final int DEFAULT_COORDINATOR_GROUP_COMMIT_SLOT_CAPACITY = 20;
public static final int DEFAULT_COORDINATOR_GROUP_COMMIT_GROUP_SIZE_FIX_TIMEOUT_MILLIS = 40;
public static final int DEFAULT_COORDINATOR_GROUP_COMMIT_DELAYED_SLOT_MOVE_TIMEOUT_MILLIS = 1200;
Expand All @@ -77,9 +75,7 @@ public class ConsensusCommitConfig {
private final boolean asyncRollbackEnabled;

private final boolean coordinatorWriteOmissionOnReadOnlyEnabled;

private final boolean parallelImplicitPreReadEnabled;

private final boolean isIncludeMetadataEnabled;

private final boolean coordinatorGroupCommitEnabled;
Expand Down Expand Up @@ -149,10 +145,10 @@ public ConsensusCommitConfig(DatabaseConfig databaseConfig) {
coordinatorWriteOmissionOnReadOnlyEnabled =
getBoolean(properties, COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED, true);

parallelImplicitPreReadEnabled = getBoolean(properties, PARALLEL_IMPLICIT_PRE_READ, true);

isIncludeMetadataEnabled = getBoolean(properties, INCLUDE_METADATA_ENABLED, false);

parallelImplicitPreReadEnabled = getBoolean(properties, PARALLEL_IMPLICIT_PRE_READ, true);

coordinatorGroupCommitEnabled = getBoolean(properties, COORDINATOR_GROUP_COMMIT_ENABLED, false);
coordinatorGroupCommitSlotCapacity =
getInt(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class ConsensusCommitManager extends AbstractDistributedTransactionManage
private final TransactionTableMetadataManager tableMetadataManager;
private final Coordinator coordinator;
private final ParallelExecutor parallelExecutor;
private final RecoveryHandler recovery;
private final RecoveryExecutor recoveryExecutor;
protected final CommitHandler commit;
private final boolean isIncludeMetadataEnabled;
private final ConsensusCommitMutationOperationChecker mutationOperationChecker;
Expand All @@ -71,7 +71,8 @@ public ConsensusCommitManager(
tableMetadataManager =
new TransactionTableMetadataManager(
admin, databaseConfig.getMetadataCacheExpirationTimeSecs());
recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
RecoveryHandler recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
recoveryExecutor = new RecoveryExecutor(coordinator, recovery, tableMetadataManager);
groupCommitter = CoordinatorGroupCommitter.from(config).orElse(null);
commit = createCommitHandler();
isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
Expand All @@ -90,7 +91,8 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
tableMetadataManager =
new TransactionTableMetadataManager(
admin, databaseConfig.getMetadataCacheExpirationTimeSecs());
recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
RecoveryHandler recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
recoveryExecutor = new RecoveryExecutor(coordinator, recovery, tableMetadataManager);
groupCommitter = CoordinatorGroupCommitter.from(config).orElse(null);
commit = createCommitHandler();
isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
Expand All @@ -106,7 +108,7 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
DatabaseConfig databaseConfig,
Coordinator coordinator,
ParallelExecutor parallelExecutor,
RecoveryHandler recovery,
RecoveryExecutor recoveryExecutor,
CommitHandler commit,
@Nullable CoordinatorGroupCommitter groupCommitter) {
super(databaseConfig);
Expand All @@ -118,7 +120,7 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
admin, databaseConfig.getMetadataCacheExpirationTimeSecs());
this.coordinator = coordinator;
this.parallelExecutor = parallelExecutor;
this.recovery = recovery;
this.recoveryExecutor = recoveryExecutor;
this.commit = commit;
this.groupCommitter = groupCommitter;
this.isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
Expand Down Expand Up @@ -246,13 +248,14 @@ DistributedTransaction begin(
new CrudHandler(
storage,
snapshot,
recoveryExecutor,
tableMetadataManager,
isIncludeMetadataEnabled,
parallelExecutor,
readOnly,
oneOperation);
DistributedTransaction transaction =
new ConsensusCommit(crud, commit, recovery, mutationOperationChecker, groupCommitter);
new ConsensusCommit(crud, commit, mutationOperationChecker, groupCommitter);
if (readOnly) {
transaction = new ReadOnlyDistributedTransaction(transaction);
}
Expand Down Expand Up @@ -511,6 +514,7 @@ public void close() {
storage.close();
admin.close();
parallelExecutor.close();
recoveryExecutor.close();
if (isGroupCommitEnabled()) {
assert groupCommitter != null;
groupCommitter.close();
Expand Down
Loading