-
Notifications
You must be signed in to change notification settings - Fork 40
Support one-phase commit optimization in Consensus Commit #2811
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -2,7 +2,9 @@ | |||||
|
|
||||||
| import static com.google.common.base.Preconditions.checkNotNull; | ||||||
|
|
||||||
| import com.google.common.annotations.VisibleForTesting; | ||||||
| import com.google.errorprone.annotations.concurrent.LazyInit; | ||||||
| import com.scalar.db.api.Delete; | ||||||
| import com.scalar.db.api.DistributedStorage; | ||||||
| import com.scalar.db.api.Mutation; | ||||||
| import com.scalar.db.api.TransactionState; | ||||||
|
|
@@ -24,6 +26,8 @@ | |||||
| import java.util.List; | ||||||
| import java.util.Optional; | ||||||
| import java.util.concurrent.Future; | ||||||
| import java.util.stream.Collectors; | ||||||
| import java.util.stream.Stream; | ||||||
| import javax.annotation.Nullable; | ||||||
| import javax.annotation.concurrent.ThreadSafe; | ||||||
| import org.slf4j.Logger; | ||||||
|
|
@@ -38,6 +42,7 @@ public class CommitHandler { | |||||
| private final ParallelExecutor parallelExecutor; | ||||||
| private final MutationsGrouper mutationsGrouper; | ||||||
| protected final boolean coordinatorWriteOmissionOnReadOnlyEnabled; | ||||||
| private final boolean onePhaseCommitEnabled; | ||||||
|
|
||||||
| @LazyInit @Nullable private BeforePreparationSnapshotHook beforePreparationSnapshotHook; | ||||||
|
|
||||||
|
|
@@ -48,13 +53,15 @@ public CommitHandler( | |||||
| TransactionTableMetadataManager tableMetadataManager, | ||||||
| ParallelExecutor parallelExecutor, | ||||||
| MutationsGrouper mutationsGrouper, | ||||||
| boolean coordinatorWriteOmissionOnReadOnlyEnabled) { | ||||||
| boolean coordinatorWriteOmissionOnReadOnlyEnabled, | ||||||
| boolean onePhaseCommitEnabled) { | ||||||
| this.storage = checkNotNull(storage); | ||||||
| this.coordinator = checkNotNull(coordinator); | ||||||
| this.tableMetadataManager = checkNotNull(tableMetadataManager); | ||||||
| this.parallelExecutor = checkNotNull(parallelExecutor); | ||||||
| this.mutationsGrouper = checkNotNull(mutationsGrouper); | ||||||
| this.coordinatorWriteOmissionOnReadOnlyEnabled = coordinatorWriteOmissionOnReadOnlyEnabled; | ||||||
| this.onePhaseCommitEnabled = onePhaseCommitEnabled; | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
|
|
@@ -118,6 +125,16 @@ public void commit(Snapshot snapshot, boolean readOnly) | |||||
|
|
||||||
| Optional<Future<Void>> snapshotHookFuture = invokeBeforePreparationSnapshotHook(snapshot); | ||||||
|
|
||||||
| if (canOnePhaseCommit(snapshot)) { | ||||||
| try { | ||||||
| onePhaseCommitRecords(snapshot); | ||||||
| return; | ||||||
| } catch (Exception e) { | ||||||
| safelyCallOnFailureBeforeCommit(snapshot); | ||||||
| throw e; | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| if (hasWritesOrDeletesInSnapshot) { | ||||||
| try { | ||||||
| prepareRecords(snapshot); | ||||||
|
|
@@ -170,6 +187,52 @@ public void commit(Snapshot snapshot, boolean readOnly) | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| @VisibleForTesting | ||||||
| boolean canOnePhaseCommit(Snapshot snapshot) throws CommitException { | ||||||
| if (!onePhaseCommitEnabled) { | ||||||
| return false; | ||||||
| } | ||||||
|
|
||||||
| // If validation is required (in SERIALIZABLE isolation), we cannot one-phase commit the | ||||||
| // transaction | ||||||
| if (snapshot.isValidationRequired()) { | ||||||
| return false; | ||||||
| } | ||||||
|
|
||||||
| // If the snapshot has no write and deletes, we do not one-phase commit the transaction | ||||||
| if (!snapshot.hasWritesOrDeletes()) { | ||||||
| return false; | ||||||
| } | ||||||
|
|
||||||
| List<Delete> deletesInDeleteSet = snapshot.getDeletesInDeleteSet(); | ||||||
|
|
||||||
| // If a record corresponding to a delete in the delete set does not exist in the storage, we | ||||||
| // cannot one-phase commit the transaction. This is because the storage does not support | ||||||
| // delete-if-not-exists semantics, so we cannot detect conflicts with other transactions. | ||||||
| for (Delete delete : deletesInDeleteSet) { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wanted to know what would happen if I removed this block, so I removed the block and executed
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For example, suppose a user tries to get record A but it doesn’t exist, and then the user attempts to delete that non-existing record. In this case, under one-phase commit, we would need to perform a delete-if-not-exists operation for that record to detect conflicts—since another transaction might insert the record. However, because we don’t have delete-if-not-exists semantics in ScalarDB, we can’t do that. As a result, we cannot apply one-phase commit in this case.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By the way, one possible solution is to introduce a logical-delete state. In that case, we would perform a put-if-not-exists operation for the record representing that logical-delete state.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. Thanks for the explanation! It would be great if we add a test case for the situation later.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we have a test for it: In this case, we can't perform one-phase commit.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, but unfortunately I wanted to find a proper integration test case to know actual use case. |
||||||
| Optional<TransactionResult> result = snapshot.getFromReadSet(new Snapshot.Key(delete)); | ||||||
|
|
||||||
| // For deletes, we always perform implicit pre-reads if the result does not exit in the read | ||||||
| // set. So the result should always exist in the read set. | ||||||
| assert result != null; | ||||||
|
|
||||||
| if (!result.isPresent()) { | ||||||
| return false; | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| try { | ||||||
| // If the mutations can be grouped together, the mutations can be done in a single mutate API | ||||||
|
||||||
| // If the mutations can be grouped together, the mutations can be done in a single mutate API | |
| // If the mutations can be grouped altogether, the mutations can be done in a single mutate API |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 24c314f. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are the conditions for the one-phase commit optimization. Please see the comments for the details.
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -7,6 +7,7 @@ | |||||
| import com.scalar.db.io.Key; | ||||||
| import java.util.ArrayList; | ||||||
| import java.util.Collection; | ||||||
| import java.util.Iterator; | ||||||
| import java.util.LinkedHashMap; | ||||||
| import java.util.List; | ||||||
| import java.util.Map; | ||||||
|
|
@@ -50,6 +51,41 @@ public List<List<Mutation>> groupMutations(Collection<Mutation> mutations) | |||||
| return groupToBatches.values().stream().flatMap(List::stream).collect(Collectors.toList()); | ||||||
| } | ||||||
|
|
||||||
| public boolean canBeGroupedTogether(Collection<Mutation> mutations) throws ExecutionException { | ||||||
|
||||||
| public boolean canBeGroupedTogether(Collection<Mutation> mutations) throws ExecutionException { | |
| public boolean canBeGroupedAltogether(Collection<Mutation> mutations) throws ExecutionException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 24c314f. Thanks!
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without the commit state, I think SSR will treat the write operations as an aborted transaction's ones and ignore them...
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if SSR is enabled, one-phase commit must not be enabled. We should consider adding validation for this on the cluster side later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good!