-
Notifications
You must be signed in to change notification settings - Fork 40
Optimize mutations based on the storage’s mutation atomicity unit in Consensus Commit #2807
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize mutations based on the storage’s mutation atomicity unit in Consensus Commit #2807
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR replaces the existing PartitionedMutations approach with a new MutationsGrouper that batches storage mutations according to each storage’s mutation atomicity unit, and updates all call sites accordingly.
- Introduce
MutationsGrouperand wire it intoCommitHandler,CommitHandlerWithGroupCommit,ConsensusCommitManager, andTwoPhaseConsensusCommitManager - Remove the old
PartitionedMutationsclass and its tests - Update integration tests to verify the number of
storage.mutate(...)calls based on the configured atomicity unit
Reviewed Changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| integration-test/.../ConsensusCommitSpecificIntegrationTestBase.java | Switch on StorageInfo.getMutationAtomicityUnit() in two existing tests |
| integration-test/.../ConsensusCommitNullMetadataIntegrationTestBase.java | Instantiate and pass MutationsGrouper in createCommitHandler |
| core/.../PartitionedMutationsTest.java | Deleted obsolete tests for the removed PartitionedMutations class |
| core/.../PartitionedMutationsKeyTest.java | Deleted obsolete tests for the removed PartitionedMutations.Key |
| core/.../MutationsGrouperTest.java | Added comprehensive tests for new MutationsGrouper |
| core/.../CommitHandlerWithGroupCommitTest.java | Updated test helper to include MutationsGrouper |
| core/.../CommitHandlerTest.java | Inject and mock StorageInfoProvider and MutationsGrouper |
| core/.../TwoPhaseConsensusCommitManager.java | Pass MutationsGrouper into 2PC manager constructor |
| core/.../PartitionedMutations.java | Removed retired class |
| core/.../MutationsGrouper.java | New class that groups and batches mutations based on atomicity unit |
| core/.../ConsensusCommitManager.java | Wire in MutationsGrouper in commit handler factory |
| core/.../CommitHandlerWithGroupCommit.java | Add MutationsGrouper field and constructor parameter |
| core/.../CommitHandler.java | Replace PartitionedMutations with MutationsGrouper.groupMutations |
Comments suppressed due to low confidence (2)
integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java:7189
- [nitpick] This test method name is extremely long and reduces readability. Consider shortening it to clearly state the scenario and expected behavior—e.g.,
putAndCommit_varyingAtomicity_shouldBatchMutationsCorrectly().
putAndCommit_SinglePartitionMutationsGiven_ShouldBehaveCorrectlyBasedOnStorageMutationAtomicityUnit(
core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java:70
- The variable
storageInfoProvideris not declared in this test class. Add@Mock protected StorageInfoProvider storageInfoProvider;and initialize mocks (e.g., viaMockitoAnnotations.openMocks(this);) before using it.
new MutationsGrouper(storageInfoProvider),
| @BeforeEach | ||
| void setUp() throws Exception { | ||
| MockitoAnnotations.openMocks(this).close(); |
Copilot
AI
Jun 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling .close() immediately after openMocks tears down the mocks before the test runs. Instead, store the returned AutoCloseable in a field and close it in an @AfterEach method, or omit .close() here.
| @BeforeEach | |
| void setUp() throws Exception { | |
| MockitoAnnotations.openMocks(this).close(); | |
| private AutoCloseable autoCloseableMocks; | |
| @BeforeEach | |
| void setUp() throws Exception { | |
| autoCloseableMocks = MockitoAnnotations.openMocks(this); |
|
|
||
| @BeforeEach | ||
| public void setUp() throws Exception { | ||
| MockitoAnnotations.openMocks(this).close(); | ||
|
|
||
| grouper = new MutationsGrouper(storageInfoProvider); | ||
| } | ||
|
|
Copilot
AI
Jun 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to other tests, closing the Mockito session immediately will deactivate mocks. Keep the AutoCloseable returned by openMocks in a field and close in an @AfterEach.
| @BeforeEach | |
| public void setUp() throws Exception { | |
| MockitoAnnotations.openMocks(this).close(); | |
| grouper = new MutationsGrouper(storageInfoProvider); | |
| } | |
| private AutoCloseable mocks; | |
| @BeforeEach | |
| public void setUp() throws Exception { | |
| mocks = MockitoAnnotations.openMocks(this); | |
| grouper = new MutationsGrouper(storageInfoProvider); | |
| } | |
| @AfterEach | |
| public void tearDown() throws Exception { | |
| if (mocks != null) { | |
| mocks.close(); | |
| } | |
| } |
core/src/main/java/com/scalar/db/transaction/consensuscommit/MutationsGrouper.java
Show resolved
Hide resolved
| import javax.annotation.concurrent.ThreadSafe; | ||
|
|
||
| @ThreadSafe | ||
| public class MutationsGrouper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added MutationsGrouper that groups the mutations by based on the storage’s mutation atomicity unit.
| new PrepareMutationComposer(snapshot.getId(), tableMetadataManager); | ||
| snapshot.to(composer); | ||
| PartitionedMutations mutations = new PartitionedMutations(composer.get()); | ||
| List<List<Mutation>> groupedMutations = mutationsGrouper.groupMutations(composer.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Split the mutations for prepare-records.
| new CommitMutationComposer(snapshot.getId(), tableMetadataManager); | ||
| snapshot.to(composer); | ||
| PartitionedMutations mutations = new PartitionedMutations(composer.get()); | ||
| List<List<Mutation>> groupedMutations = mutationsGrouper.groupMutations(composer.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Split the mutations for commit-records.
| new RollbackMutationComposer(snapshot.getId(), storage, tableMetadataManager); | ||
| snapshot.to(composer); | ||
| PartitionedMutations mutations = new PartitionedMutations(composer.get()); | ||
| List<List<Mutation>> groupedMutations = mutationsGrouper.groupMutations(composer.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Split the mutations for rollback-records.
Torch3333
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you!
feeblefakie
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thank you!
| public List<List<Mutation>> groupMutations(Collection<Mutation> mutations) | ||
| throws ExecutionException { | ||
| // MutationGroup mutations by their storage info and atomicity unit | ||
| List<MutationGroup> groups = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] This can be removed by using groupToBatches.keySet() instead?
| assert mutation.forNamespace().isPresent(); | ||
| StorageInfo storageInfo = storageInfoProvider.getStorageInfo(mutation.forNamespace().get()); | ||
|
|
||
| MutationGroup matchedGroup = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] If MutationGroup.equals() has a similar logic to isSameGroup(), maybe these lines can be simplified like this?
MutationGroup group = new MutationGroup(mutation, storageInfo);
List<List<Mutation>> batches =
groupToBatches.computeIfAbsent(group, g -> new ArrayList<>());
int maxCount = group.storageInfo.getMaxAtomicMutationsCount();There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. Fixed in b44e397. Thanks!
komamitsu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! 👍
…Consensus Commit (#2807)
Description
This PR adds an optimization for handling mutations in Consensus Commit based on the storage’s mutation atomicity unit.
Currently, Consensus Commit always splits the necessary mutations by partition and executes them using the
DistributedStorage.mutate()method. However, we can instead split the mutations based on the storage’s mutation atomicity unit and execute them accordingly. That’s the basic idea behind this optimization.Related issues and/or PRs
N/A
Changes made
Added some inline comments. Please take a look for the details.
Checklist
Additional notes (optional)
N/A
Release notes
Added an optimization to Consensus Commit that splits mutations based on the storage’s mutation atomicity unit, improving efficiency and performance in certain storage implementations.