Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class CommitHandler {
protected final Coordinator coordinator;
private final TransactionTableMetadataManager tableMetadataManager;
private final ParallelExecutor parallelExecutor;
protected final boolean coordinatorWriteOmissionOnReadOnlyEnabled;

@LazyInit @Nullable private BeforePreparationSnapshotHook beforePreparationSnapshotHook;

Expand All @@ -44,11 +45,13 @@ public CommitHandler(
DistributedStorage storage,
Coordinator coordinator,
TransactionTableMetadataManager tableMetadataManager,
ParallelExecutor parallelExecutor) {
ParallelExecutor parallelExecutor,
boolean coordinatorWriteOmissionOnReadOnlyEnabled) {
this.storage = checkNotNull(storage);
this.coordinator = checkNotNull(coordinator);
this.tableMetadataManager = checkNotNull(tableMetadataManager);
this.parallelExecutor = checkNotNull(parallelExecutor);
this.coordinatorWriteOmissionOnReadOnlyEnabled = coordinatorWriteOmissionOnReadOnlyEnabled;
}

/**
Expand Down Expand Up @@ -106,42 +109,62 @@ private void waitBeforePreparationSnapshotHookFuture(
}
}

public void commit(Snapshot snapshot) throws CommitException, UnknownTransactionStatusException {
public void commit(Snapshot snapshot, boolean readOnly)
throws CommitException, UnknownTransactionStatusException {
boolean hasWritesOrDeletesInSnapshot = !readOnly && snapshot.hasWritesOrDeletes();

Optional<Future<Void>> snapshotHookFuture = invokeBeforePreparationSnapshotHook(snapshot);
try {
prepare(snapshot);
} catch (PreparationException e) {
safelyCallOnFailureBeforeCommit(snapshot);
abortState(snapshot.getId());
rollbackRecords(snapshot);
if (e instanceof PreparationConflictException) {
throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null));

if (hasWritesOrDeletesInSnapshot) {
try {
prepare(snapshot);
} catch (PreparationException e) {
safelyCallOnFailureBeforeCommit(snapshot);
abortState(snapshot.getId());
rollbackRecords(snapshot);
if (e instanceof PreparationConflictException) {
throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null));
}
throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null));
} catch (Exception e) {
safelyCallOnFailureBeforeCommit(snapshot);
throw e;
}
throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null));
} catch (Exception e) {
safelyCallOnFailureBeforeCommit(snapshot);
throw e;
}

try {
validate(snapshot);
} catch (ValidationException e) {
safelyCallOnFailureBeforeCommit(snapshot);
abortState(snapshot.getId());
rollbackRecords(snapshot);
if (e instanceof ValidationConflictException) {
throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null));
if (snapshot.hasReads()) {
try {
validate(snapshot);
} catch (ValidationException e) {
safelyCallOnFailureBeforeCommit(snapshot);

// If the transaction has no writes and deletes, we don't need to abort-state and
// rollback-records since there are no changes to be made.
if (hasWritesOrDeletesInSnapshot || !coordinatorWriteOmissionOnReadOnlyEnabled) {
abortState(snapshot.getId());
}
if (hasWritesOrDeletesInSnapshot) {
rollbackRecords(snapshot);
}

if (e instanceof ValidationConflictException) {
throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null));
}
throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null));
} catch (Exception e) {
safelyCallOnFailureBeforeCommit(snapshot);
throw e;
}
throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null));
} catch (Exception e) {
safelyCallOnFailureBeforeCommit(snapshot);
throw e;
}

waitBeforePreparationSnapshotHookFuture(snapshot, snapshotHookFuture.orElse(null));

commitState(snapshot);
commitRecords(snapshot);
if (hasWritesOrDeletesInSnapshot || !coordinatorWriteOmissionOnReadOnlyEnabled) {
commitState(snapshot);
}
if (hasWritesOrDeletesInSnapshot) {
commitRecords(snapshot);
}
}

protected void handleCommitConflict(Snapshot snapshot, Exception cause)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.TransactionState;
import com.scalar.db.exception.transaction.CommitConflictException;
import com.scalar.db.exception.transaction.CommitException;
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
import com.scalar.db.transaction.consensuscommit.Coordinator.State;
import com.scalar.db.util.groupcommit.Emittable;
Expand All @@ -28,15 +29,31 @@ public CommitHandlerWithGroupCommit(
Coordinator coordinator,
TransactionTableMetadataManager tableMetadataManager,
ParallelExecutor parallelExecutor,
boolean coordinatorWriteOmissionOnReadOnlyEnabled,
CoordinatorGroupCommitter groupCommitter) {
super(storage, coordinator, tableMetadataManager, parallelExecutor);
super(
storage,
coordinator,
tableMetadataManager,
parallelExecutor,
coordinatorWriteOmissionOnReadOnlyEnabled);

checkNotNull(groupCommitter);
// The methods of this emitter will be called via GroupCommitter.ready().
groupCommitter.setEmitter(new Emitter(coordinator));
this.groupCommitter = groupCommitter;
}

@Override
public void commit(Snapshot snapshot, boolean readOnly)
throws CommitException, UnknownTransactionStatusException {
if (!readOnly && !snapshot.hasWritesOrDeletes() && coordinatorWriteOmissionOnReadOnlyEnabled) {
cancelGroupCommitIfNeeded(snapshot.getId());
}

super.commit(snapshot, readOnly);
}

@Override
protected void onFailureBeforeCommit(Snapshot snapshot) {
cancelGroupCommitIfNeeded(snapshot.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public void commit() throws CommitException, UnknownTransactionStatusException {
CoreError.CONSENSUS_COMMIT_EXECUTING_IMPLICIT_PRE_READ_FAILED.buildMessage(), e, getId());
}

commit.commit(crud.getSnapshot());
commit.commit(crud.getSnapshot(), crud.isReadOnly());
}

@Override
Expand All @@ -280,7 +280,7 @@ public void rollback() {
logger.warn("Failed to close the scanner", e);
}

if (groupCommitter != null) {
if (groupCommitter != null && !crud.isReadOnly()) {
groupCommitter.remove(crud.getSnapshot().getId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ public class ConsensusCommitConfig {
public static final String ASYNC_COMMIT_ENABLED = PREFIX + "async_commit.enabled";
public static final String ASYNC_ROLLBACK_ENABLED = PREFIX + "async_rollback.enabled";

public static final String COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED =
PREFIX + "coordinator.write_omission_on_read_only.enabled";

public static final String PARALLEL_IMPLICIT_PRE_READ =
PREFIX + "parallel_implicit_pre_read.enabled";

Expand Down Expand Up @@ -73,10 +76,12 @@ public class ConsensusCommitConfig {
private final boolean asyncCommitEnabled;
private final boolean asyncRollbackEnabled;

private final boolean isIncludeMetadataEnabled;
private final boolean coordinatorWriteOmissionOnReadOnlyEnabled;

private final boolean parallelImplicitPreReadEnabled;

private final boolean isIncludeMetadataEnabled;

private final boolean coordinatorGroupCommitEnabled;
private final int coordinatorGroupCommitSlotCapacity;
private final int coordinatorGroupCommitGroupSizeFixTimeoutMillis;
Expand All @@ -92,7 +97,9 @@ public ConsensusCommitConfig(DatabaseConfig databaseConfig) {
DatabaseConfig.TRANSACTION_MANAGER + " should be '" + TRANSACTION_MANAGER_NAME + "'");
}

if (databaseConfig.getProperties().containsKey("scalar.db.isolation_level")) {
Properties properties = databaseConfig.getProperties();

if (properties.containsKey("scalar.db.isolation_level")) {
logger.warn(
"The property \"scalar.db.isolation_level\" is deprecated and will be removed in 5.0.0. "
+ "Please use \""
Expand All @@ -102,91 +109,78 @@ public ConsensusCommitConfig(DatabaseConfig databaseConfig) {
isolation =
Isolation.valueOf(
getString(
databaseConfig.getProperties(),
properties,
ISOLATION_LEVEL,
getString(
databaseConfig.getProperties(),
properties,
"scalar.db.isolation_level", // for backward compatibility
Isolation.SNAPSHOT.toString()))
.toUpperCase(Locale.ROOT));
if (isolation.equals(Isolation.SERIALIZABLE)) {
validateCrossPartitionScanConfig(databaseConfig);

if (databaseConfig
.getProperties()
.containsKey("scalar.db.consensus_commit.serializable_strategy")) {
if (properties.containsKey("scalar.db.consensus_commit.serializable_strategy")) {
logger.warn(
"The property \"scalar.db.consensus_commit.serializable_strategy\" is deprecated and will "
+ "be removed in 5.0.0. The EXTRA_READ strategy is always used for the SERIALIZABLE "
+ "isolation level.");
}
}

coordinatorNamespace = getString(databaseConfig.getProperties(), COORDINATOR_NAMESPACE, null);
coordinatorNamespace = getString(properties, COORDINATOR_NAMESPACE, null);

parallelExecutorCount =
getInt(
databaseConfig.getProperties(),
PARALLEL_EXECUTOR_COUNT,
DEFAULT_PARALLEL_EXECUTOR_COUNT);
parallelPreparationEnabled =
getBoolean(databaseConfig.getProperties(), PARALLEL_PREPARATION_ENABLED, true);
parallelCommitEnabled =
getBoolean(databaseConfig.getProperties(), PARALLEL_COMMIT_ENABLED, true);
getInt(properties, PARALLEL_EXECUTOR_COUNT, DEFAULT_PARALLEL_EXECUTOR_COUNT);
parallelPreparationEnabled = getBoolean(properties, PARALLEL_PREPARATION_ENABLED, true);
parallelCommitEnabled = getBoolean(properties, PARALLEL_COMMIT_ENABLED, true);

// Use the value of parallel commit for parallel validation and parallel rollback as default
// value
parallelValidationEnabled =
getBoolean(
databaseConfig.getProperties(), PARALLEL_VALIDATION_ENABLED, parallelCommitEnabled);
getBoolean(properties, PARALLEL_VALIDATION_ENABLED, parallelCommitEnabled);
parallelRollbackEnabled =
getBoolean(
databaseConfig.getProperties(), PARALLEL_ROLLBACK_ENABLED, parallelCommitEnabled);
getBoolean(properties, PARALLEL_ROLLBACK_ENABLED, parallelCommitEnabled);

asyncCommitEnabled = getBoolean(databaseConfig.getProperties(), ASYNC_COMMIT_ENABLED, false);
asyncCommitEnabled = getBoolean(properties, ASYNC_COMMIT_ENABLED, false);

// Use the value of async commit for async rollback as default value
asyncRollbackEnabled =
getBoolean(databaseConfig.getProperties(), ASYNC_ROLLBACK_ENABLED, asyncCommitEnabled);
asyncRollbackEnabled = getBoolean(properties, ASYNC_ROLLBACK_ENABLED, asyncCommitEnabled);

coordinatorWriteOmissionOnReadOnlyEnabled =
getBoolean(properties, COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED, true);

isIncludeMetadataEnabled =
getBoolean(databaseConfig.getProperties(), INCLUDE_METADATA_ENABLED, false);
parallelImplicitPreReadEnabled = getBoolean(properties, PARALLEL_IMPLICIT_PRE_READ, true);

parallelImplicitPreReadEnabled =
getBoolean(databaseConfig.getProperties(), PARALLEL_IMPLICIT_PRE_READ, true);
isIncludeMetadataEnabled = getBoolean(properties, INCLUDE_METADATA_ENABLED, false);

coordinatorGroupCommitEnabled =
getBoolean(databaseConfig.getProperties(), COORDINATOR_GROUP_COMMIT_ENABLED, false);
coordinatorGroupCommitEnabled = getBoolean(properties, COORDINATOR_GROUP_COMMIT_ENABLED, false);
coordinatorGroupCommitSlotCapacity =
getInt(
databaseConfig.getProperties(),
properties,
COORDINATOR_GROUP_COMMIT_SLOT_CAPACITY,
DEFAULT_COORDINATOR_GROUP_COMMIT_SLOT_CAPACITY);
coordinatorGroupCommitGroupSizeFixTimeoutMillis =
getInt(
databaseConfig.getProperties(),
properties,
COORDINATOR_GROUP_COMMIT_GROUP_SIZE_FIX_TIMEOUT_MILLIS,
DEFAULT_COORDINATOR_GROUP_COMMIT_GROUP_SIZE_FIX_TIMEOUT_MILLIS);
coordinatorGroupCommitDelayedSlotMoveTimeoutMillis =
getInt(
databaseConfig.getProperties(),
properties,
COORDINATOR_GROUP_COMMIT_DELAYED_SLOT_MOVE_TIMEOUT_MILLIS,
DEFAULT_COORDINATOR_GROUP_COMMIT_DELAYED_SLOT_MOVE_TIMEOUT_MILLIS);
coordinatorGroupCommitOldGroupAbortTimeoutMillis =
getInt(
databaseConfig.getProperties(),
properties,
COORDINATOR_GROUP_COMMIT_OLD_GROUP_ABORT_TIMEOUT_MILLIS,
DEFAULT_COORDINATOR_GROUP_COMMIT_OLD_GROUP_ABORT_TIMEOUT_MILLIS);
coordinatorGroupCommitTimeoutCheckIntervalMillis =
getInt(
databaseConfig.getProperties(),
properties,
COORDINATOR_GROUP_COMMIT_TIMEOUT_CHECK_INTERVAL_MILLIS,
DEFAULT_COORDINATOR_GROUP_COMMIT_TIMEOUT_CHECK_INTERVAL_MILLIS);
coordinatorGroupCommitMetricsMonitorLogEnabled =
getBoolean(
databaseConfig.getProperties(),
COORDINATOR_GROUP_COMMIT_METRICS_MONITOR_LOG_ENABLED,
false);
getBoolean(properties, COORDINATOR_GROUP_COMMIT_METRICS_MONITOR_LOG_ENABLED, false);
}

public Isolation getIsolation() {
Expand Down Expand Up @@ -225,14 +219,18 @@ public boolean isAsyncRollbackEnabled() {
return asyncRollbackEnabled;
}

public boolean isIncludeMetadataEnabled() {
return isIncludeMetadataEnabled;
public boolean isCoordinatorWriteOmissionOnReadOnlyEnabled() {
return coordinatorWriteOmissionOnReadOnlyEnabled;
}

public boolean isParallelImplicitPreReadEnabled() {
return parallelImplicitPreReadEnabled;
}

public boolean isIncludeMetadataEnabled() {
return isIncludeMetadataEnabled;
}

public boolean isCoordinatorGroupCommitEnabled() {
return coordinatorGroupCommitEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,19 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
private CommitHandler createCommitHandler() {
if (isGroupCommitEnabled()) {
return new CommitHandlerWithGroupCommit(
storage, coordinator, tableMetadataManager, parallelExecutor, groupCommitter);
storage,
coordinator,
tableMetadataManager,
parallelExecutor,
config.isCoordinatorWriteOmissionOnReadOnlyEnabled(),
groupCommitter);
} else {
return new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor);
return new CommitHandler(
storage,
coordinator,
tableMetadataManager,
parallelExecutor,
config.isCoordinatorWriteOmissionOnReadOnlyEnabled());
}
}

Expand Down Expand Up @@ -222,7 +232,7 @@ DistributedTransaction begin(
String txId, Isolation isolation, boolean readOnly, boolean oneOperation) {
checkArgument(!Strings.isNullOrEmpty(txId));
checkNotNull(isolation);
if (isGroupCommitEnabled()) {
if (!readOnly && isGroupCommitEnabled()) {
assert groupCommitter != null;
txId = groupCommitter.reserve(txId);
}
Comment on lines +235 to 238
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In read-only mode, skip reserving a slot in the group commit.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,10 @@ public Snapshot getSnapshot() {
return snapshot;
}

public boolean isReadOnly() {
return readOnly;
}

private interface ConsensusCommitScanner extends TransactionCrudOperable.Scanner {
boolean isClosed();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,14 @@ public boolean containsKeyInGetSet(Get get) {
return getSet.containsKey(get);
}

public boolean hasWritesOrDeletes() {
return !writeSet.isEmpty() || !deleteSet.isEmpty();
}

public boolean hasReads() {
return !getSet.isEmpty() || !scanSet.isEmpty() || !scannerSet.isEmpty();
}

public Optional<TransactionResult> getResult(Key key) throws CrudException {
Optional<TransactionResult> result = readSet.getOrDefault(key, Optional.empty());
return mergeResult(key, result);
Expand Down
Loading