-
Notifications
You must be signed in to change notification settings - Fork 40
Improve read process in Consensus Commit #2798
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors the Consensus Commit read path to use a background recovery executor, centralizes lazy-recovery logic in the CrudHandler, and ensures that any pending recoveries complete before prepare or commit.
- Introduce
RecoveryExecutorto offload record recovery to a thread pool. - Update
CrudHandlerto queue and later await recovery viawaitForRecoveryCompletionIfNecessary. - Remove inline recovery calls from
ConsensusCommitandTwoPhaseConsensusCommit, and update tests accordingly.
Reviewed Changes
Copilot reviewed 23 out of 28 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| core/src/main/java/com/scalar/db/transaction/consensuscommit/RecoveryExecutor.java | New component executing lazy recoveries concurrently. |
| core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java | Integrated RecoveryExecutor, added executeRecovery and wait logic. |
| core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitUtils.java | Added utility methods for extracting after‐image columns and metadata lookup. |
|
|
||
| // Arrange | ||
| transaction = new TwoPhaseConsensusCommit(crud, commit, recovery, mutationOperationChecker); | ||
| // Arrange1 |
Copilot
AI
Jun 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The comment 'Arrange1' looks like a typo; consider changing it back to 'Arrange' for consistency with the rest of the test suite.
| // Arrange1 | |
| // Arrange |
| import static com.scalar.db.transaction.consensuscommit.Attribute.STATE; | ||
| import static com.scalar.db.transaction.consensuscommit.Attribute.toIdValue; | ||
| import static com.scalar.db.transaction.consensuscommit.Attribute.toStateValue; | ||
| import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.*; |
Copilot
AI
Jun 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Avoid wildcard static imports to keep the namespace clear. Import only the utilities you use (e.g., extractAfterImageColumnsFromBeforeImage and getTransactionTableMetadata).
| import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.*; | |
| import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.extractAfterImageColumnsFromBeforeImage; | |
| import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.getTransactionTableMetadata; |
| } | ||
| } | ||
|
|
||
| static void extractAfterImageColumnsFromBeforeImage( |
Copilot
AI
Jun 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The method name extractAfterImageColumnsFromBeforeImage is very verbose; you might rename it to something shorter like copyAfterImageColumns to improve readability.
| static void extractAfterImageColumnsFromBeforeImage( | |
| static void copyAfterImageColumns( |
| } | ||
|
|
||
| try { | ||
| crud.waitForRecoveryCompletionIfNecessary(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait lazy recoveries before committing the transaction if necessary.
| key = new Snapshot.Key(get, result.get()); | ||
| } | ||
|
|
||
| result = executeRecovery(key, get, result.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If reading an uncommitted record, the improved read process starts.
| CoreError.CONSENSUS_COMMIT_READ_UNCOMMITTED_RECORD.buildMessage(), | ||
| snapshot.getId()); | ||
| // Lazy recovery | ||
| ret = executeRecovery(key, scan, result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto. If reading an uncommitted record, the improved read process starts.
| if (snapshot.containsKeyInWriteSet(recoveryResult.key) | ||
| || snapshot.containsKeyInDeleteSet(recoveryResult.key) | ||
| || snapshot.isValidationRequired()) { | ||
| recoveryResult.recoveryFuture.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the recovered records are in writeSet or deleteSet, or if serializable validation is required, wait for the lazy recoveries to complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me clarify one thing.
Is this wait required for correctness?
Or, the recovered record will be used for sure in such cases, so it waits here to avoid unnecessary aborts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for records in writeSet or deleteSet, it's required for correctness. And in case where serializable validation is required, it's to avoid unnecessary aborts.
For records in the writeSet or deleteSet, if we don’t wait for lazy recovery to complete, we might end up preparing records whose status is still PREPARED or DELETED, which is not the intended behavior.
Similarly, when serializable validation is required, if we don’t wait for lazy recovery, the validation could fail if it reads records with PREPARED or DELETED status. That’s why waiting is necessary in these cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for records in writeSet or deleteSet, it's required for correctness.
@brfrn169 Can you give me a simple example for me?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for records in writeSet or deleteSet, it's required for correctness.
@komamitsu Sorry, it was incorrect. As discussed, it's also required to avoid unnecessary aborts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the offline discussion, I understand that it is not for correctness, but a nice-to-have for unnecessary aborts.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brfrn169 Can you add a short comment about the background reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a short comment about the background reason?
@komamitsu Yes.
For records in the writeSet or deleteSet, if we don’t wait for lazy recovery to complete, we might attempt to perform prepare-records on records whose status is still PREPARED or DELETED.
If we perform prepare-records on records that should be rolled forward, the prepare will actually succeed. However, this will create a PREPARED-state before image, which is unexpected. I don’t think it affects correctness, but it’s something we should avoid.
On the other hand, if we perform prepare-records on records that should be rolled back, the prepare will always fail, causing the transaction to abort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Looks good! Let's put it as a Java comment 🙇
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added Javadoc in 1305546. Thanks!
| public Result execute( | ||
| Snapshot.Key key, Selection selection, TransactionResult result, String transactionId) | ||
| throws CrudException { | ||
| assert !result.isCommitted(); | ||
|
|
||
| Optional<Coordinator.State> state = getCoordinatorState(result.getId()); | ||
|
|
||
| Optional<TransactionResult> recoveredResult = | ||
| createRecoveredResult(state, selection, result, transactionId); | ||
|
|
||
| // Recover the record | ||
| Future<Void> future = | ||
| executorService.submit( | ||
| () -> { | ||
| recovery.recover(selection, result, state); | ||
| return null; | ||
| }); | ||
|
|
||
| return new Result(key, recoveredResult, future); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the main part of the improved read logic.
| // Retrieve only the after images columns when including the metadata is disabled, otherwise | ||
| // retrieve all the columns | ||
| if (!isIncludeMetadataEnabled) { | ||
| LinkedHashSet<String> afterImageColumnNames = | ||
| getTransactionTableMetadata(selection).getAfterImageColumnNames(); | ||
| selection.withProjections(afterImageColumnNames); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stop specifying projections for the after images columns because before images are removed after #2787.
b061a53 to
b55e890
Compare
Torch3333
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you!
feeblefakie
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, looking good. Thank you!
Left some questions. PTAL!
| this.tableMetadataManager = Objects.requireNonNull(tableMetadataManager); | ||
| executorService = | ||
| Executors.newFixedThreadPool( | ||
| threadPoolSize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Separating configurations for the thread pool sizes for parallel execution and recovery execution might be pretty tricky for users. The inflexibility might also result in unexpected stalling since sometimes parallel execution threads are more required than recovery execution threads.
Having one thread pool and allocating threads to whichever needed is more flexible.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, we've decided to use a cached thread pool for it. Fixed in 24c4822. Thanks!
| } | ||
| } | ||
|
|
||
| private Optional<TransactionResult> createRolledBackRecord( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
createRolledBackRecord sounds a bit unnatural and not explicit to me.
How about this?
| private Optional<TransactionResult> createRolledBackRecord( | |
| private Optional<TransactionResult> createRecordFromBeforeImage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed in 309e61f. Thanks!
|
|
||
| Map<String, Column<?>> columns = new HashMap<>(); | ||
|
|
||
| extractAfterImageColumnsFromBeforeImage(columns, result, beforeImageColumnNames); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be create instead of extract since extract sounds like before image includes after image and contradicting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed in 309e61f. Thanks!
| return Optional.of(new TransactionResult(new ResultImpl(columns, tableMetadata))); | ||
| } | ||
|
|
||
| private Optional<TransactionResult> createRolledForwardResult( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private Optional<TransactionResult> createRolledForwardResult( | |
| private Optional<TransactionResult> createResultFromAfterImage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed in 309e61f. Thanks!
| if (snapshot.containsKeyInWriteSet(recoveryResult.key) | ||
| || snapshot.containsKeyInDeleteSet(recoveryResult.key) | ||
| || snapshot.isValidationRequired()) { | ||
| recoveryResult.recoveryFuture.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me clarify one thing.
Is this wait required for correctness?
Or, the recovered record will be used for sure in such cases, so it waits here to avoid unnecessary aborts?
| } | ||
| } | ||
|
|
||
| static void createAfterImageColumnsFromBeforeImage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] This method name sounds to create a new collection. Something like copy or extract might be better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@feeblefakie What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create sounds fine to me. (because I proposed it and it's more like a behavior from external.)
I feel copy and extract are more like explaining the internal behavior.
extract also sounds to me like after image is a subset of before image.
We also discussed build, but the upper-level method uses create, so using a different action name seems to intentionally mean something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I'm okay with the current name. But, how about createAfterImageColumnsFromBeforeImageOn(Map<String, Column<?>> columns, ...) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's discuss this in a separate PR. Thanks!
feeblefakie
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thank you!
So, the cache sizes is not configurable in newCachedThreadPool ?
| executorService = | ||
| Executors.newFixedThreadPool( | ||
| threadPoolSize, | ||
| Executors.newCachedThreadPool( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, it cannot set the max size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use the Java standard API Executors.newCachedThreadPool(), we cannot set a maximum pool size.
The implementation of Executors.newCachedThreadPool() is as follows:
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}If we want to set a maximum size, we can do something like this:
new ThreadPoolExecutor(0, <MAX_SIZE>,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);But for now, as discussed, we’ve decided to use Executors.newCachedThreadPool() and see how it performs during benchmarking.
komamitsu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you!
Description
This PR improves the read process in Consensus Commit.
Currently, when reading a record with the status
PREPAREDorDELETED, anUncommittedRecordExceptionis immediately thrown, and the user must retry the transaction from the beginning.The improved read algorithm is as follows:
COMMITTED, return the record as before.PREPAREDorDELETED, then check the Coordinator table:COMMITTED, start lazy recovery and return the after image.ABORTED, start lazy recovery and return the before image.UncommittedRecordException.Additionally, this PR updates the code to perform lazy recovery in background threads. In cases where the recovered record needs to be written in the transaction, or where serializable validation (under SERIALIZABLE isolation level) is required, we need to wait for the related lazy recoveries to complete before committing the transaction.
Related issues and/or PRs
N/A
Changes made
Added some inline comments. Please take a look for the details!
Checklist
Additional notes (optional)
N/A
Release notes
Improved the read algorithm in Consensus Commit to reduce unnecessary retries.