-
Notifications
You must be signed in to change notification settings - Fork 40
Support one-phase commit optimization in Consensus Commit #2811
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a one-phase commit optimization path in Consensus Commit, allowing eligible transactions to commit with a single storage mutate call.
- Added
onePhaseCommitEnabledflag and configuration property. - Extended
CommitHandlerand related managers to select one-phase vs. two-phase logic. - Created
OnePhaseCommitMutationComposerand updated grouping logic and tests.
Reviewed Changes
Copilot reviewed 15 out of 16 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java | Updated tests to invoke insert with one-phase flag and verify storage/coordinator calls |
| integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java | Added onePhaseCommitEnabled parameter to manager creation |
| core/src/test/java/com/scalar/db/transaction/consensuscommit/MutationsGrouperTest.java | Added extensive canBeGroupedTogether unit tests |
| core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfigTest.java | Added tests for the new one-phase commit configuration |
| core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java | Disabled unsupported one-phase+group commit test cases |
| core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java | Extended tests for canOnePhaseCommit, onePhaseCommitRecords, and commit branches |
| core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java | Propagated onePhaseCommitEnabled flag into constructor |
| core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java | Updated to signature to remove obsolete exception and reflect one-phase composer |
| core/src/main/java/com/scalar/db/transaction/consensuscommit/OnePhaseCommitMutationComposer.java | Introduced new composer for building one-phase commit mutations |
| core/src/main/java/com/scalar/db/transaction/consensuscommit/MutationsGrouper.java | Added canBeGroupedTogether helper method |
| core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java | Wired onePhaseCommitEnabled into handler factory |
| core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfig.java | Added ONE_PHASE_COMMIT_ENABLED property and accessor |
| core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java | Extended constructor to accept the one-phase flag |
| core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java | Integrated one-phase commit path and new APIs |
| core/src/main/java/com/scalar/db/common/CoreError.java | Added new error codes for committing conflicts and failures |
Comments suppressed due to low confidence (5)
core/src/main/java/com/scalar/db/transaction/consensuscommit/MutationsGrouper.java:54
- Add a unit test where the number of mutations exactly equals the storage's max atomic mutations count to confirm canBeGroupedTogether returns true at the boundary.
public boolean canBeGroupedTogether(Collection<Mutation> mutations) throws ExecutionException {
core/src/main/java/com/scalar/db/transaction/consensuscommit/MutationsGrouper.java:61
- [nitpick] Relying on Java assertions for null-safety can lead to silent bypass in production; consider throwing an explicit IllegalArgumentException or NullPointerException if forNamespace() is absent.
assert firstMutation.forNamespace().isPresent();
core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java:191
- [nitpick] The commit logic is becoming large and mixes one-phase and two-phase paths; consider extracting the canOnePhaseCommit and onePhaseCommitRecords functionality into a separate helper or service class to simplify CommitHandler.
boolean canOnePhaseCommit(Snapshot snapshot) throws CommitException {
core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java:128
- [nitpick] Using an early return for the one-phase commit path reduces nesting but also bypasses shared cleanup logic; consider reorganizing the method or extracting both paths to avoid unintended side effects.
if (canOnePhaseCommit(snapshot)) {
core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java:227
- [nitpick] Collecting mutations into a List via Stream.concat and collect can allocate unnecessary intermediate lists; consider passing a Stream or using an iterator-based approach to avoid extra allocations in hot paths.
return mutationsGrouper.canBeGroupedTogether(
| @VisibleForTesting | ||
| boolean canOnePhaseCommit(Snapshot snapshot) throws CommitException { | ||
| if (!onePhaseCommitEnabled) { | ||
| return false; | ||
| } | ||
|
|
||
| // If validation is required (in SERIALIZABLE isolation), we cannot one-phase commit the | ||
| // transaction | ||
| if (snapshot.isValidationRequired()) { | ||
| return false; | ||
| } | ||
|
|
||
| // If the snapshot has no write and deletes, we do not one-phase commit the transaction | ||
| if (!snapshot.hasWritesOrDeletes()) { | ||
| return false; | ||
| } | ||
|
|
||
| List<Delete> deletesInDeleteSet = snapshot.getDeletesInDeleteSet(); | ||
|
|
||
| // If a record corresponding to a delete in the delete set does not exist in the storage, we | ||
| // cannot one-phase commit the transaction. This is because the storage does not support | ||
| // delete-if-not-exists semantics, so we cannot detect conflicts with other transactions. | ||
| for (Delete delete : deletesInDeleteSet) { | ||
| Optional<TransactionResult> result = snapshot.getFromReadSet(new Snapshot.Key(delete)); | ||
|
|
||
| // For deletes, we always perform implicit pre-reads if the result does not exit in the read | ||
| // set. So the result should always exist in the read set. | ||
| assert result != null; | ||
|
|
||
| if (!result.isPresent()) { | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| try { | ||
| // If the mutations can be grouped together, the mutations can be done in a single mutate API | ||
| // call, so we can one-phase commit the transaction | ||
| return mutationsGrouper.canBeGroupedTogether( | ||
| Stream.concat(snapshot.getPutsInWriteSet().stream(), deletesInDeleteSet.stream()) | ||
| .collect(Collectors.toList())); | ||
| } catch (ExecutionException e) { | ||
| throw new CommitException( | ||
| CoreError.CONSENSUS_COMMIT_COMMITTING_RECORDS_FAILED.buildMessage(), e, snapshot.getId()); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are the conditions for the one-phase commit optimization. Please see the comments for the details.
| return groupToBatches.values().stream().flatMap(List::stream).collect(Collectors.toList()); | ||
| } | ||
|
|
||
| public boolean canBeGroupedTogether(Collection<Mutation> mutations) throws ExecutionException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method determines whether all the mutations can be grouped into a single group.
| import javax.annotation.concurrent.NotThreadSafe; | ||
|
|
||
| @NotThreadSafe | ||
| public class OnePhaseCommitMutationComposer extends AbstractMutationComposer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OnePhaseCommitMutationComposer creates mutations for one-phase commit.
| } | ||
|
|
||
| public boolean canBeGroupedTogether(Collection<Mutation> mutations) throws ExecutionException { | ||
| if (mutations.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (mutations.isEmpty()) { | |
| if (mutations.size() <= 1) { |
This small optimization works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Fixed in 6a0b9c4. Thanks!
| if (canOnePhaseCommit(snapshot)) { | ||
| try { | ||
| onePhaseCommitRecords(snapshot); | ||
| return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without the commit state, I think SSR will treat the write operations as an aborted transaction's ones and ignore them...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if SSR is enabled, one-phase commit must not be enabled. We should consider adding validation for this on the cluster side later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good!
feeblefakie
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left minor naming suggestions, but LGTM! Thank you!
| return groupToBatches.values().stream().flatMap(List::stream).collect(Collectors.toList()); | ||
| } | ||
|
|
||
| public boolean canBeGroupedTogether(Collection<Mutation> mutations) throws ExecutionException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| public boolean canBeGroupedTogether(Collection<Mutation> mutations) throws ExecutionException { | |
| public boolean canBeGroupedAltogether(Collection<Mutation> mutations) throws ExecutionException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 24c314f. Thanks!
| } | ||
|
|
||
| try { | ||
| // If the mutations can be grouped together, the mutations can be done in a single mutate API |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // If the mutations can be grouped together, the mutations can be done in a single mutate API | |
| // If the mutations can be grouped altogether, the mutations can be done in a single mutate API |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 24c314f. Thanks!
| // If a record corresponding to a delete in the delete set does not exist in the storage, we | ||
| // cannot one-phase commit the transaction. This is because the storage does not support | ||
| // delete-if-not-exists semantics, so we cannot detect conflicts with other transactions. | ||
| for (Delete delete : deletesInDeleteSet) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to know what would happen if I removed this block, so I removed the block and executed ConsensusCommitNullMetadataIntegrationTestBase. But all the tests passed unexpectedly. Is it difficult to write a test case where this block is necessary? (or I'm missing something....)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, suppose a user tries to get record A but it doesn’t exist, and then the user attempts to delete that non-existing record.
In this case, under one-phase commit, we would need to perform a delete-if-not-exists operation for that record to detect conflicts—since another transaction might insert the record. However, because we don’t have delete-if-not-exists semantics in ScalarDB, we can’t do that. As a result, we cannot apply one-phase commit in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way, one possible solution is to introduce a logical-delete state. In that case, we would perform a put-if-not-exists operation for the record representing that logical-delete state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Thanks for the explanation! It would be great if we add a test case for the situation later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have a test for it:
https://github.com/scalar-labs/scalardb/pull/2811/files#diff-23bc469b473150d216e1205dae9897c9ff85492d4846055cb5dccf6a60fefacfR1008-R1026
In this case, we can't perform one-phase commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, but unfortunately I wanted to find a proper integration test case to know actual use case.
Torch3333
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you!
komamitsu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! 👍
Description
This PR adds support for one-phase commit optimization in Consensus Commit. Users can enable it by setting the property
scalar.db.consensus_commit.one_phase_commit.enabledtotrue. The default isfalse.The basic idea is that if the storage can execute all mutations in a transaction atomically, we don’t need two-phase commit mechanisms such as prepare-records, commit-state, and then commit-records. Instead, we can simply perform commit-records in a single phase.
Related issues and/or PRs
N/A
Changes made
Added inline comments. Please take a look for the details.
Checklist
Additional notes (optional)
N/A
Release notes
Added support for one-phase commit optimization in Consensus Commit, significantly improving performance by skipping prepare-records and commit-state when all mutations can be committed atomically. Enable this optimization with the
scalar.db.consensus_commit.one_phase_commit.enabledproperty.