[CELEBORN-2222][CIP-14] Support Retrying when createReader failed for CelebornInputStream in CppClient#3583
Conversation
|
@HolyLow @SteNicholas @FMX @RexXiong Could you please help review this PR? Appreciate your help in improving this as needed! |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3583 +/- ##
==========================================
- Coverage 67.13% 67.04% -0.09%
==========================================
Files 357 357
Lines 21860 21924 +64
Branches 1943 1949 +6
==========================================
+ Hits 14674 14696 +22
- Misses 6166 6213 +47
+ Partials 1020 1015 -5 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Thank you for your comments @SteNicholas , I will take a look over the next couple of days. I suspect some refactoring may need to be done to this PR, I will notify you once done. |
There was a problem hiding this comment.
Pull request overview
This PR implements retry support for createReader failures in the C++ client to match the Java implementation's behavior. It adds retry configuration, peer location helper methods, and implements the retry logic with peer failover.
Changes:
- Added three configuration properties for retry behavior:
clientFetchMaxRetriesForEachReplica,dataIoRetryWait, andclientPushReplicateEnabled - Added helper methods to PartitionLocation for peer access and formatting:
hasPeer(),getPeer(), andhostAndFetchPort() - Implemented retry logic in
createReaderWithRetry()that switches between primary and peer replicas on failure
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| cpp/celeborn/protocol/tests/PartitionLocationTest.cpp | Added unit tests for new PartitionLocation helper methods |
| cpp/celeborn/protocol/PartitionLocation.h | Declared three new helper methods for peer access and port formatting |
| cpp/celeborn/protocol/PartitionLocation.cpp | Implemented the three new helper methods |
| cpp/celeborn/conf/tests/CelebornConfTest.cpp | Added tests for new configuration properties and their default values |
| cpp/celeborn/conf/CelebornConf.h | Declared three new configuration properties and their accessor methods |
| cpp/celeborn/conf/CelebornConf.cpp | Implemented configuration property definitions and accessor methods |
| cpp/celeborn/client/reader/CelebornInputStream.h | Added member variables for retry tracking and retry wait timeout |
| cpp/celeborn/client/reader/CelebornInputStream.cpp | Implemented retry logic with peer failover and sleep between retries |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Sorry for delay on this, will push a couple commits within next couple of days |
d46e2f7 to
ab9523c
Compare
|
@SteNicholas I have addressed the comments, can you please take a look again. thank you for your review |
|
@afterincomparableyum, could you please rebase the main branch to resolve the conflicts? |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
46de39c to
f05d97c
Compare
|
ping @SteNicholas for review, I have resolved conflicts as well as addressed comments from CoPilot. |
11daa55 to
799136f
Compare
|
@afterincomparableyum, could you please rebase the main branch to resolve the conflict? |
799136f to
35a6870
Compare
|
@SteNicholas I have rebased |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| throw utils::CelebornRuntimeError( | ||
| lastException, | ||
| "createPartitionReader failed after " + | ||
| std::to_string(fetchChunkRetryCnt_) + " retries for location " + | ||
| location.hostAndFetchPort(), | ||
| false); |
There was a problem hiding this comment.
The final wrapped exception message always reports the original location.hostAndFetchPort(), even if the last failure happened after switching to the peer. This can be misleading when debugging. Consider including the last attempted location (e.g., currentLocation->hostAndFetchPort()) or both original+current in the error context.
| throw utils::CelebornRuntimeError( | |
| lastException, | |
| "createPartitionReader failed after " + | |
| std::to_string(fetchChunkRetryCnt_) + " retries for location " + | |
| location.hostAndFetchPort(), | |
| false); | |
| std::string errorMessage = | |
| "createPartitionReader failed after " + | |
| std::to_string(fetchChunkRetryCnt_) + | |
| " retries. Original location: " + location.hostAndFetchPort() + | |
| ", last attempted location: " + currentLocation->hostAndFetchPort(); | |
| throw utils::CelebornRuntimeError(lastException, errorMessage, false); |
| #include <functional> | ||
| #include "celeborn/client/compress/Compressor.h" | ||
| #include "celeborn/client/reader/CelebornInputStream.h" | ||
| #include "celeborn/client/writer/PushDataCallback.h" | ||
| #include "celeborn/client/writer/PushState.h" | ||
| #include "celeborn/client/writer/ReviveManager.h" | ||
| #include "celeborn/network/NettyRpcEndpointRef.h" |
There was a problem hiding this comment.
ShuffleClient.h uses utils::ConcurrentHashMap in several type aliases/member declarations, but it does not include the header that defines it (celeborn/utils/CelebornUtils.h). It currently compiles only because CelebornInputStream.h happens to include CelebornUtils.h; add the direct include here to avoid fragile transitive dependencies.
| virtual void excludeFailedFetchLocation( | ||
| const std::string& hostAndFetchPort, | ||
| const std::exception& e) = 0; | ||
|
|
There was a problem hiding this comment.
The new ShuffleClient::excludeFailedFetchLocation API (and its implementation in ShuffleClientImpl) appears unused in the current codebase (CelebornInputStream maintains exclusion internally instead). If this isn’t intended for external callers, consider removing it to avoid expanding the public interface; otherwise, consider wiring CelebornInputStream to call through ShuffleClient so there is a single source of truth for exclusion behavior.
| virtual void excludeFailedFetchLocation( | |
| const std::string& hostAndFetchPort, | |
| const std::exception& e) = 0; |
| static constexpr std::string_view kClientFetchExcludeWorkerOnFailureEnabled{ | ||
| "celeborn.client.fetch.excludeWorkerOnFailure.enabled"}; | ||
|
|
||
| static constexpr std::string_view kClientFetchExcludedWorkerExpireTimeout{ | ||
| "celeborn.client.fetch.excludedWorker.expireTimeout"}; | ||
|
|
||
| static constexpr std::string_view | ||
| kClientAdaptiveOptimizeSkewedPartitionReadEnabled{ | ||
| "celeborn.client.adaptive.optimizeSkewedPartitionRead.enabled"}; |
There was a problem hiding this comment.
PR description lists 3 new config properties, but this change also introduces additional fetch-related configs (excludeWorkerOnFailure, excludedWorker expireTimeout, optimizeSkewedPartitionRead). Please update the PR description (or drop these configs if out of scope) so reviewers/users can understand the full config surface change.
| #include "celeborn/client/reader/CelebornInputStream.h" | ||
| #include <lz4.h> | ||
| #include "celeborn/client/compress/Decompressor.h" |
There was a problem hiding this comment.
CelebornInputStream.cpp uses std::this_thread::sleep_for(retryWait_) later in the file but does not include (and the header no longer provides it). This can fail to compile depending on transitive includes; add an explicit #include in this .cpp.
| if (isExcluded(*currentLocation)) { | ||
| CELEBORN_FAIL( | ||
| "Fetch data from excluded worker! {}", | ||
| currentLocation->hostAndFetchPort()); | ||
| } |
There was a problem hiding this comment.
When a location is already in the exclusion list, the code throws and then retries/sleeps (especially in the no-peer branch). Since isExcluded(*currentLocation) will keep returning true until the exclusion expires, these retries are guaranteed to fail and just add delay. Consider failing fast (no sleep/retry) or skipping to another available location/peer when the current location is excluded.
|
@afterincomparableyum, thanks for update. Could you take a look at the review comment of Copilot? |
This PR implements retry support for createReader failures in the C++ client, matching the behavior of the Java implementation. The implementation includes:
Added configuration properties:
Added peer location support methods to PartitionLocation:
Implemented retry logic in createReaderWithRetry():
Added unit tests for new functionality
How was this patch tested?
Unit tests and compiling