Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/java/com/scalar/db/common/error/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,8 @@ public enum CoreError implements ScalarDbError {
"Getting the storage information failed. Namespace: %s",
"",
""),
CONSENSUS_COMMIT_RECOVERING_RECORDS_FAILED(
Category.INTERNAL_ERROR, "0057", "Recovering records failed. Details: %s", "", ""),

//
// Errors for the unknown transaction status error category
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static com.scalar.db.transaction.consensuscommit.Attribute.STATE;
import static com.scalar.db.transaction.consensuscommit.Attribute.toIdValue;
import static com.scalar.db.transaction.consensuscommit.Attribute.toStateValue;
import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.getTransactionTableMetadata;

import com.google.common.annotations.VisibleForTesting;
import com.scalar.db.api.ConditionBuilder;
Expand Down Expand Up @@ -105,7 +106,7 @@ private Put composePut(Operation base, @Nullable TransactionResult result)
// Set before image columns to null
if (result != null) {
TransactionTableMetadata transactionTableMetadata =
tableMetadataManager.getTransactionTableMetadata(base);
getTransactionTableMetadata(tableMetadataManager, base);
LinkedHashSet<String> beforeImageColumnNames =
transactionTableMetadata.getBeforeImageColumnNames();
TableMetadata tableMetadata = transactionTableMetadata.getTableMetadata();
Expand Down Expand Up @@ -137,8 +138,9 @@ private Key getPartitionKey(Operation base, @Nullable TransactionResult result)
assert base instanceof Selection;
if (result != null) {
// for rollforward in lazy recovery
TransactionTableMetadata metadata = tableMetadataManager.getTransactionTableMetadata(base);
return ScalarDbUtils.getPartitionKey(result, metadata.getTableMetadata());
TransactionTableMetadata transactionTableMetadata =
getTransactionTableMetadata(tableMetadataManager, base);
return ScalarDbUtils.getPartitionKey(result, transactionTableMetadata.getTableMetadata());
} else {
throw new AssertionError(
"This path should not be reached since the EXTRA_WRITE strategy is deleted");
Expand All @@ -155,8 +157,9 @@ private Optional<Key> getClusteringKey(Operation base, @Nullable TransactionResu
assert base instanceof Selection;
if (result != null) {
// for rollforward in lazy recovery
TransactionTableMetadata metadata = tableMetadataManager.getTransactionTableMetadata(base);
return ScalarDbUtils.getClusteringKey(result, metadata.getTableMetadata());
TransactionTableMetadata transactionTableMetadata =
getTransactionTableMetadata(tableMetadataManager, base);
return ScalarDbUtils.getClusteringKey(result, transactionTableMetadata.getTableMetadata());
} else {
throw new AssertionError(
"This path should not be reached since the EXTRA_WRITE strategy is deleted");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import com.scalar.db.exception.transaction.UnsatisfiedConditionException;
import com.scalar.db.util.ScalarDbUtils;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
Expand All @@ -51,24 +49,19 @@ public class ConsensusCommit extends AbstractDistributedTransaction {
private static final Logger logger = LoggerFactory.getLogger(ConsensusCommit.class);
private final CrudHandler crud;
private final CommitHandler commit;
private final RecoveryHandler recovery;
private final ConsensusCommitMutationOperationChecker mutationOperationChecker;
@Nullable private final CoordinatorGroupCommitter groupCommitter;
private Runnable beforeRecoveryHook;

@SuppressFBWarnings("EI_EXPOSE_REP2")
public ConsensusCommit(
CrudHandler crud,
CommitHandler commit,
RecoveryHandler recovery,
ConsensusCommitMutationOperationChecker mutationOperationChecker,
@Nullable CoordinatorGroupCommitter groupCommitter) {
this.crud = checkNotNull(crud);
this.commit = checkNotNull(commit);
this.recovery = checkNotNull(recovery);
this.mutationOperationChecker = mutationOperationChecker;
this.groupCommitter = groupCommitter;
this.beforeRecoveryHook = () -> {};
}

@Override
Expand All @@ -78,63 +71,18 @@ public String getId() {

@Override
public Optional<Result> get(Get get) throws CrudException {
get = copyAndSetTargetToIfNot(get);
try {
return crud.get(get);
} catch (UncommittedRecordException e) {
lazyRecovery(e);
throw e;
}
return crud.get(copyAndSetTargetToIfNot(get));
}

@Override
public List<Result> scan(Scan scan) throws CrudException {
scan = copyAndSetTargetToIfNot(scan);
try {
return crud.scan(scan);
} catch (UncommittedRecordException e) {
lazyRecovery(e);
throw e;
}
return crud.scan(copyAndSetTargetToIfNot(scan));
}

@Override
public Scanner getScanner(Scan scan) throws CrudException {
scan = copyAndSetTargetToIfNot(scan);
Scanner scanner = crud.getScanner(scan);

return new Scanner() {
@Override
public Optional<Result> one() throws CrudException {
try {
return scanner.one();
} catch (UncommittedRecordException e) {
lazyRecovery(e);
throw e;
}
}

@Override
public List<Result> all() throws CrudException {
try {
return scanner.all();
} catch (UncommittedRecordException e) {
lazyRecovery(e);
throw e;
}
}

@Override
public void close() throws CrudException {
scanner.close();
}

@Nonnull
@Override
public Iterator<Result> iterator() {
return scanner.iterator();
}
};
return crud.getScanner(scan);
}

/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
Expand All @@ -143,12 +91,7 @@ public Iterator<Result> iterator() {
public void put(Put put) throws CrudException {
put = copyAndSetTargetToIfNot(put);
checkMutation(put);
try {
crud.put(put);
} catch (UncommittedRecordException e) {
lazyRecovery(e);
throw e;
}
crud.put(put);
}

/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
Expand All @@ -165,12 +108,7 @@ public void put(List<Put> puts) throws CrudException {
public void delete(Delete delete) throws CrudException {
delete = copyAndSetTargetToIfNot(delete);
checkMutation(delete);
try {
crud.delete(delete);
} catch (UncommittedRecordException e) {
lazyRecovery(e);
throw e;
}
crud.delete(delete);
}

/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
Expand All @@ -196,12 +134,7 @@ public void upsert(Upsert upsert) throws CrudException {
upsert = copyAndSetTargetToIfNot(upsert);
Put put = ConsensusCommitUtils.createPutForUpsert(upsert);
checkMutation(put);
try {
crud.put(put);
} catch (UncommittedRecordException e) {
lazyRecovery(e);
throw e;
}
crud.put(put);
}

@Override
Expand All @@ -222,9 +155,6 @@ public void update(Update update) throws CrudException {

// If the condition is not specified, it means that the record does not exist. In this case,
// we do nothing
} catch (UncommittedRecordException e) {
lazyRecovery(e);
throw e;
}
}

Expand Down Expand Up @@ -257,9 +187,6 @@ public void commit() throws CommitException, UnknownTransactionStatusException {
try {
crud.readIfImplicitPreReadEnabled();
} catch (CrudConflictException e) {
if (e instanceof UncommittedRecordException) {
lazyRecovery((UncommittedRecordException) e);
}
throw new CommitConflictException(
CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHILE_IMPLICIT_PRE_READ.buildMessage(),
e,
Expand All @@ -269,6 +196,12 @@ public void commit() throws CommitException, UnknownTransactionStatusException {
CoreError.CONSENSUS_COMMIT_EXECUTING_IMPLICIT_PRE_READ_FAILED.buildMessage(), e, getId());
}

try {
crud.waitForRecoveryCompletionIfNecessary();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait lazy recoveries before committing the transaction if necessary.

} catch (CrudException e) {
throw new CommitException(e.getMessage(), e, getId());
}

commit.commit(crud.getSnapshot(), crud.isReadOnly());
}

Expand All @@ -295,22 +228,6 @@ CommitHandler getCommitHandler() {
return commit;
}

@VisibleForTesting
RecoveryHandler getRecoveryHandler() {
return recovery;
}

@VisibleForTesting
void setBeforeRecoveryHook(Runnable beforeRecoveryHook) {
this.beforeRecoveryHook = beforeRecoveryHook;
}

private void lazyRecovery(UncommittedRecordException e) {
logger.debug("Recover uncommitted records: {}", e.getResults());
beforeRecoveryHook.run();
e.getResults().forEach(r -> recovery.recover(e.getSelection(), r));
}

private void checkMutation(Mutation mutation) throws CrudException {
try {
mutationOperationChecker.check(mutation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,9 @@ public class ConsensusCommitConfig {

public static final String COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED =
PREFIX + "coordinator.write_omission_on_read_only.enabled";

public static final String RECOVERY_EXECUTOR_COUNT = PREFIX + "recovery_executor_count";
public static final String PARALLEL_IMPLICIT_PRE_READ =
PREFIX + "parallel_implicit_pre_read.enabled";

public static final int DEFAULT_PARALLEL_EXECUTOR_COUNT = 128;

public static final String INCLUDE_METADATA_ENABLED = PREFIX + "include_metadata.enabled";

public static final String COORDINATOR_GROUP_COMMIT_PREFIX = PREFIX + "coordinator.group_commit.";
Expand All @@ -59,6 +56,9 @@ public class ConsensusCommitConfig {
public static final String COORDINATOR_GROUP_COMMIT_METRICS_MONITOR_LOG_ENABLED =
COORDINATOR_GROUP_COMMIT_PREFIX + "metrics_monitor_log_enabled";

public static final int DEFAULT_PARALLEL_EXECUTOR_COUNT = 128;
public static final int DEFAULT_RECOVERY_EXECUTOR_COUNT = 128;

public static final int DEFAULT_COORDINATOR_GROUP_COMMIT_SLOT_CAPACITY = 20;
public static final int DEFAULT_COORDINATOR_GROUP_COMMIT_GROUP_SIZE_FIX_TIMEOUT_MILLIS = 40;
public static final int DEFAULT_COORDINATOR_GROUP_COMMIT_DELAYED_SLOT_MOVE_TIMEOUT_MILLIS = 1200;
Expand All @@ -77,9 +77,8 @@ public class ConsensusCommitConfig {
private final boolean asyncRollbackEnabled;

private final boolean coordinatorWriteOmissionOnReadOnlyEnabled;

private final int recoveryExecutorCount;
private final boolean parallelImplicitPreReadEnabled;

private final boolean isIncludeMetadataEnabled;

private final boolean coordinatorGroupCommitEnabled;
Expand Down Expand Up @@ -149,10 +148,13 @@ public ConsensusCommitConfig(DatabaseConfig databaseConfig) {
coordinatorWriteOmissionOnReadOnlyEnabled =
getBoolean(properties, COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED, true);

parallelImplicitPreReadEnabled = getBoolean(properties, PARALLEL_IMPLICIT_PRE_READ, true);
recoveryExecutorCount =
getInt(properties, RECOVERY_EXECUTOR_COUNT, DEFAULT_RECOVERY_EXECUTOR_COUNT);

isIncludeMetadataEnabled = getBoolean(properties, INCLUDE_METADATA_ENABLED, false);

parallelImplicitPreReadEnabled = getBoolean(properties, PARALLEL_IMPLICIT_PRE_READ, true);

coordinatorGroupCommitEnabled = getBoolean(properties, COORDINATOR_GROUP_COMMIT_ENABLED, false);
coordinatorGroupCommitSlotCapacity =
getInt(
Expand Down Expand Up @@ -223,6 +225,10 @@ public boolean isCoordinatorWriteOmissionOnReadOnlyEnabled() {
return coordinatorWriteOmissionOnReadOnlyEnabled;
}

public int getRecoveryExecutorCount() {
return recoveryExecutorCount;
}

public boolean isParallelImplicitPreReadEnabled() {
return parallelImplicitPreReadEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class ConsensusCommitManager extends AbstractDistributedTransactionManage
private final TransactionTableMetadataManager tableMetadataManager;
private final Coordinator coordinator;
private final ParallelExecutor parallelExecutor;
private final RecoveryHandler recovery;
private final RecoveryExecutor recoveryExecutor;
protected final CommitHandler commit;
private final boolean isIncludeMetadataEnabled;
private final ConsensusCommitMutationOperationChecker mutationOperationChecker;
Expand All @@ -71,7 +71,10 @@ public ConsensusCommitManager(
tableMetadataManager =
new TransactionTableMetadataManager(
admin, databaseConfig.getMetadataCacheExpirationTimeSecs());
recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
RecoveryHandler recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
recoveryExecutor =
new RecoveryExecutor(
coordinator, recovery, tableMetadataManager, config.getRecoveryExecutorCount());
groupCommitter = CoordinatorGroupCommitter.from(config).orElse(null);
commit = createCommitHandler();
isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
Expand All @@ -90,7 +93,10 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
tableMetadataManager =
new TransactionTableMetadataManager(
admin, databaseConfig.getMetadataCacheExpirationTimeSecs());
recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
RecoveryHandler recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
recoveryExecutor =
new RecoveryExecutor(
coordinator, recovery, tableMetadataManager, config.getRecoveryExecutorCount());
groupCommitter = CoordinatorGroupCommitter.from(config).orElse(null);
commit = createCommitHandler();
isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
Expand All @@ -106,7 +112,7 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
DatabaseConfig databaseConfig,
Coordinator coordinator,
ParallelExecutor parallelExecutor,
RecoveryHandler recovery,
RecoveryExecutor recoveryExecutor,
CommitHandler commit,
@Nullable CoordinatorGroupCommitter groupCommitter) {
super(databaseConfig);
Expand All @@ -118,7 +124,7 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
admin, databaseConfig.getMetadataCacheExpirationTimeSecs());
this.coordinator = coordinator;
this.parallelExecutor = parallelExecutor;
this.recovery = recovery;
this.recoveryExecutor = recoveryExecutor;
this.commit = commit;
this.groupCommitter = groupCommitter;
this.isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
Expand Down Expand Up @@ -246,13 +252,14 @@ DistributedTransaction begin(
new CrudHandler(
storage,
snapshot,
recoveryExecutor,
tableMetadataManager,
isIncludeMetadataEnabled,
parallelExecutor,
readOnly,
oneOperation);
DistributedTransaction transaction =
new ConsensusCommit(crud, commit, recovery, mutationOperationChecker, groupCommitter);
new ConsensusCommit(crud, commit, mutationOperationChecker, groupCommitter);
if (readOnly) {
transaction = new ReadOnlyDistributedTransaction(transaction);
}
Expand Down Expand Up @@ -511,6 +518,7 @@ public void close() {
storage.close();
admin.close();
parallelExecutor.close();
recoveryExecutor.close();
if (isGroupCommitEnabled()) {
assert groupCommitter != null;
groupCommitter.close();
Expand Down
Loading