Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/java/com/scalar/db/common/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,8 @@ public enum CoreError implements ScalarDbError {
"A transaction conflict occurred in the Insert operation",
"",
""),
CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_COMMITTING_RECORDS(
Category.CONCURRENCY_ERROR, "0026", "A conflict occurred when committing records", "", ""),

//
// Errors for the internal error category
Expand Down Expand Up @@ -935,6 +937,8 @@ public enum CoreError implements ScalarDbError {
""),
CONSENSUS_COMMIT_RECOVERING_RECORDS_FAILED(
Category.INTERNAL_ERROR, "0057", "Recovering records failed. Details: %s", "", ""),
CONSENSUS_COMMIT_COMMITTING_RECORDS_FAILED(
Category.INTERNAL_ERROR, "0058", "Committing records failed", "", ""),

//
// Errors for the unknown transaction status error category
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.concurrent.LazyInit;
import com.scalar.db.api.Delete;
import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.Mutation;
import com.scalar.db.api.TransactionState;
Expand All @@ -24,6 +26,8 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
Expand All @@ -38,6 +42,7 @@ public class CommitHandler {
private final ParallelExecutor parallelExecutor;
private final MutationsGrouper mutationsGrouper;
protected final boolean coordinatorWriteOmissionOnReadOnlyEnabled;
private final boolean onePhaseCommitEnabled;

@LazyInit @Nullable private BeforePreparationSnapshotHook beforePreparationSnapshotHook;

Expand All @@ -48,13 +53,15 @@ public CommitHandler(
TransactionTableMetadataManager tableMetadataManager,
ParallelExecutor parallelExecutor,
MutationsGrouper mutationsGrouper,
boolean coordinatorWriteOmissionOnReadOnlyEnabled) {
boolean coordinatorWriteOmissionOnReadOnlyEnabled,
boolean onePhaseCommitEnabled) {
this.storage = checkNotNull(storage);
this.coordinator = checkNotNull(coordinator);
this.tableMetadataManager = checkNotNull(tableMetadataManager);
this.parallelExecutor = checkNotNull(parallelExecutor);
this.mutationsGrouper = checkNotNull(mutationsGrouper);
this.coordinatorWriteOmissionOnReadOnlyEnabled = coordinatorWriteOmissionOnReadOnlyEnabled;
this.onePhaseCommitEnabled = onePhaseCommitEnabled;
}

/**
Expand Down Expand Up @@ -118,6 +125,16 @@ public void commit(Snapshot snapshot, boolean readOnly)

Optional<Future<Void>> snapshotHookFuture = invokeBeforePreparationSnapshotHook(snapshot);

if (canOnePhaseCommit(snapshot)) {
try {
onePhaseCommitRecords(snapshot);
return;
Copy link
Contributor

@komamitsu komamitsu Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the commit state, I think SSR will treat the write operations as an aborted transaction's ones and ignore them...

Copy link
Collaborator Author

@brfrn169 brfrn169 Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if SSR is enabled, one-phase commit must not be enabled. We should consider adding validation for this on the cluster side later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

} catch (Exception e) {
safelyCallOnFailureBeforeCommit(snapshot);
throw e;
}
}

if (hasWritesOrDeletesInSnapshot) {
try {
prepareRecords(snapshot);
Expand Down Expand Up @@ -170,6 +187,52 @@ public void commit(Snapshot snapshot, boolean readOnly)
}
}

@VisibleForTesting
boolean canOnePhaseCommit(Snapshot snapshot) throws CommitException {
if (!onePhaseCommitEnabled) {
return false;
}

// If validation is required (in SERIALIZABLE isolation), we cannot one-phase commit the
// transaction
if (snapshot.isValidationRequired()) {
return false;
}

// If the snapshot has no write and deletes, we do not one-phase commit the transaction
if (!snapshot.hasWritesOrDeletes()) {
return false;
}

List<Delete> deletesInDeleteSet = snapshot.getDeletesInDeleteSet();

// If a record corresponding to a delete in the delete set does not exist in the storage, we
// cannot one-phase commit the transaction. This is because the storage does not support
// delete-if-not-exists semantics, so we cannot detect conflicts with other transactions.
for (Delete delete : deletesInDeleteSet) {
Copy link
Contributor

@komamitsu komamitsu Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to know what would happen if I removed this block, so I removed the block and executed ConsensusCommitNullMetadataIntegrationTestBase. But all the tests passed unexpectedly. Is it difficult to write a test case where this block is necessary? (or I'm missing something....)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, suppose a user tries to get record A but it doesn’t exist, and then the user attempts to delete that non-existing record.

In this case, under one-phase commit, we would need to perform a delete-if-not-exists operation for that record to detect conflicts—since another transaction might insert the record. However, because we don’t have delete-if-not-exists semantics in ScalarDB, we can’t do that. As a result, we cannot apply one-phase commit in this case.

Copy link
Collaborator Author

@brfrn169 brfrn169 Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, one possible solution is to introduce a logical-delete state. In that case, we would perform a put-if-not-exists operation for the record representing that logical-delete state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks for the explanation! It would be great if we add a test case for the situation later.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, but unfortunately I wanted to find a proper integration test case to know actual use case.

Optional<TransactionResult> result = snapshot.getFromReadSet(new Snapshot.Key(delete));

// For deletes, we always perform implicit pre-reads if the result does not exit in the read
// set. So the result should always exist in the read set.
assert result != null;

if (!result.isPresent()) {
return false;
}
}

try {
// If the mutations can be grouped altogether, the mutations can be done in a single mutate
// API call, so we can one-phase commit the transaction
return mutationsGrouper.canBeGroupedAltogether(
Stream.concat(snapshot.getPutsInWriteSet().stream(), deletesInDeleteSet.stream())
.collect(Collectors.toList()));
} catch (ExecutionException e) {
throw new CommitException(
CoreError.CONSENSUS_COMMIT_COMMITTING_RECORDS_FAILED.buildMessage(), e, snapshot.getId());
}
}

protected void handleCommitConflict(Snapshot snapshot, Exception cause)
throws CommitConflictException, UnknownTransactionStatusException {
try {
Expand Down Expand Up @@ -197,6 +260,30 @@ protected void handleCommitConflict(Snapshot snapshot, Exception cause)
}
}

@VisibleForTesting
void onePhaseCommitRecords(Snapshot snapshot) throws CommitException {
try {
OnePhaseCommitMutationComposer composer =
new OnePhaseCommitMutationComposer(snapshot.getId(), tableMetadataManager);
snapshot.to(composer);

// One-phase commit does not require grouping mutations and using the parallel executor since
// it is always executed in a single mutate API call.
storage.mutate(composer.get());
} catch (NoMutationException e) {
throw new CommitConflictException(
CoreError.CONSENSUS_COMMIT_PREPARING_RECORD_EXISTS.buildMessage(), e, snapshot.getId());
} catch (RetriableExecutionException e) {
throw new CommitConflictException(
CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_COMMITTING_RECORDS.buildMessage(),
e,
snapshot.getId());
} catch (ExecutionException e) {
throw new CommitException(
CoreError.CONSENSUS_COMMIT_COMMITTING_RECORDS_FAILED.buildMessage(), e, snapshot.getId());
}
}

public void prepareRecords(Snapshot snapshot) throws PreparationException {
try {
PrepareMutationComposer composer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ public CommitHandlerWithGroupCommit(
ParallelExecutor parallelExecutor,
MutationsGrouper mutationsGrouper,
boolean coordinatorWriteOmissionOnReadOnlyEnabled,
boolean onePhaseCommitEnabled,
CoordinatorGroupCommitter groupCommitter) {
super(
storage,
coordinator,
tableMetadataManager,
parallelExecutor,
mutationsGrouper,
coordinatorWriteOmissionOnReadOnlyEnabled);
coordinatorWriteOmissionOnReadOnlyEnabled,
onePhaseCommitEnabled);
checkNotNull(groupCommitter);
// The methods of this emitter will be called via GroupCommitter.ready().
groupCommitter.setEmitter(new Emitter(coordinator));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ConsensusCommitConfig {

public static final String COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED =
PREFIX + "coordinator.write_omission_on_read_only.enabled";
public static final String ONE_PHASE_COMMIT_ENABLED = PREFIX + "one_phase_commit.enabled";
public static final String PARALLEL_IMPLICIT_PRE_READ =
PREFIX + "parallel_implicit_pre_read.enabled";
public static final String INCLUDE_METADATA_ENABLED = PREFIX + "include_metadata.enabled";
Expand Down Expand Up @@ -75,8 +76,9 @@ public class ConsensusCommitConfig {
private final boolean asyncRollbackEnabled;

private final boolean coordinatorWriteOmissionOnReadOnlyEnabled;
private final boolean onePhaseCommitEnabled;
private final boolean parallelImplicitPreReadEnabled;
private final boolean isIncludeMetadataEnabled;
private final boolean includeMetadataEnabled;

private final boolean coordinatorGroupCommitEnabled;
private final int coordinatorGroupCommitSlotCapacity;
Expand Down Expand Up @@ -145,10 +147,12 @@ public ConsensusCommitConfig(DatabaseConfig databaseConfig) {
coordinatorWriteOmissionOnReadOnlyEnabled =
getBoolean(properties, COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED, true);

isIncludeMetadataEnabled = getBoolean(properties, INCLUDE_METADATA_ENABLED, false);
onePhaseCommitEnabled = getBoolean(properties, ONE_PHASE_COMMIT_ENABLED, false);

parallelImplicitPreReadEnabled = getBoolean(properties, PARALLEL_IMPLICIT_PRE_READ, true);

includeMetadataEnabled = getBoolean(properties, INCLUDE_METADATA_ENABLED, false);

coordinatorGroupCommitEnabled = getBoolean(properties, COORDINATOR_GROUP_COMMIT_ENABLED, false);
coordinatorGroupCommitSlotCapacity =
getInt(
Expand Down Expand Up @@ -219,12 +223,16 @@ public boolean isCoordinatorWriteOmissionOnReadOnlyEnabled() {
return coordinatorWriteOmissionOnReadOnlyEnabled;
}

public boolean isOnePhaseCommitEnabled() {
return onePhaseCommitEnabled;
}

public boolean isParallelImplicitPreReadEnabled() {
return parallelImplicitPreReadEnabled;
}

public boolean isIncludeMetadataEnabled() {
return isIncludeMetadataEnabled;
return includeMetadataEnabled;
}

public boolean isCoordinatorGroupCommitEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ private CommitHandler createCommitHandler(ConsensusCommitConfig config) {
parallelExecutor,
mutationsGrouper,
config.isCoordinatorWriteOmissionOnReadOnlyEnabled(),
config.isOnePhaseCommitEnabled(),
groupCommitter);
} else {
return new CommitHandler(
Expand All @@ -151,7 +152,8 @@ private CommitHandler createCommitHandler(ConsensusCommitConfig config) {
tableMetadataManager,
parallelExecutor,
mutationsGrouper,
config.isCoordinatorWriteOmissionOnReadOnlyEnabled());
config.isCoordinatorWriteOmissionOnReadOnlyEnabled(),
config.isOnePhaseCommitEnabled());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.scalar.db.io.Key;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -50,6 +51,41 @@ public List<List<Mutation>> groupMutations(Collection<Mutation> mutations)
return groupToBatches.values().stream().flatMap(List::stream).collect(Collectors.toList());
}

public boolean canBeGroupedAltogether(Collection<Mutation> mutations) throws ExecutionException {
if (mutations.size() <= 1) {
return true;
}

Iterator<Mutation> iterator = mutations.iterator();
Mutation firstMutation = iterator.next();
assert firstMutation.forNamespace().isPresent();
StorageInfo storageInfo =
storageInfoProvider.getStorageInfo(firstMutation.forNamespace().get());
MutationGroup firstGroup = new MutationGroup(firstMutation, storageInfo);

int maxCount = firstGroup.storageInfo.getMaxAtomicMutationsCount();
int mutationCount = 1;

while (iterator.hasNext()) {
Mutation otherMutation = iterator.next();
assert otherMutation.forNamespace().isPresent();
StorageInfo otherStorageInfo =
storageInfoProvider.getStorageInfo(otherMutation.forNamespace().get());
MutationGroup otherGroup = new MutationGroup(otherMutation, otherStorageInfo);

if (!firstGroup.equals(otherGroup)) {
return false; // Found a mutation that does not belong to the first group
}

mutationCount++;
if (mutationCount > maxCount) {
return false; // Exceeds the maximum allowed count for this group
}
}

return true; // All mutations belong to the same group and within the count limit
}

private static class MutationGroup {
public final StorageInfo storageInfo;
@Nullable public final String namespace;
Expand Down
Loading