Skip to content

Commit 46de39c

Browse files
author
afterincomparableyum
committed
address comments of Co Pilot AI
1 parent e9cdeb0 commit 46de39c

File tree

7 files changed

+365
-9
lines changed

7 files changed

+365
-9
lines changed

cpp/celeborn/client/ShuffleClient.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,8 @@ std::unique_ptr<CelebornInputStream> ShuffleClientImpl::readPartition(
359359
void ShuffleClientImpl::excludeFailedFetchLocation(
360360
const std::string& hostAndFetchPort,
361361
const std::exception& e) {
362-
if (pushReplicateEnabled_ && fetchExcludeWorkerOnFailureEnabled_) {
362+
if (pushReplicateEnabled_ && fetchExcludeWorkerOnFailureEnabled_ &&
363+
utils::isCriticalCauseForFetch(e)) {
363364
auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
364365
std::chrono::steady_clock::now().time_since_epoch())
365366
.count();

cpp/celeborn/client/reader/CelebornInputStream.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "celeborn/client/reader/CelebornInputStream.h"
1919
#include <lz4.h>
20+
#include <thread>
2021
#include "celeborn/client/compress/Decompressor.h"
2122

2223
namespace celeborn {
@@ -226,8 +227,7 @@ std::shared_ptr<PartitionReader> CelebornInputStream::createReaderWithRetry(
226227

227228
if (currentLocation->hasPeer() && !readSkewPartitionWithoutMapRange_) {
228229
if (fetchChunkRetryCnt_ % 2 == 0) {
229-
std::this_thread::sleep_for(
230-
std::chrono::milliseconds(retryWait_.count()));
230+
std::this_thread::sleep_for(retryWait_);
231231
}
232232
LOG(WARNING) << "CreatePartitionReader failed " << fetchChunkRetryCnt_
233233
<< "/" << fetchChunkMaxRetry_ << " times for location "
@@ -244,8 +244,7 @@ std::shared_ptr<PartitionReader> CelebornInputStream::createReaderWithRetry(
244244
<< "/" << fetchChunkMaxRetry_ << " times for location "
245245
<< currentLocation->hostAndFetchPort()
246246
<< ", retry the same location. Error: " << e.what();
247-
std::this_thread::sleep_for(
248-
std::chrono::milliseconds(retryWait_.count()));
247+
std::this_thread::sleep_for(retryWait_);
249248
}
250249
}
251250
}

cpp/celeborn/client/reader/CelebornInputStream.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
#pragma once
1919

20-
#include <thread>
21-
2220
#include "celeborn/client/compress/Decompressor.h"
2321
#include "celeborn/client/reader/WorkerPartitionReader.h"
2422
#include "celeborn/conf/CelebornConf.h"

cpp/celeborn/client/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ add_executable(
1717
celeborn_client_test
1818
WorkerPartitionReaderTest.cpp
1919
PushDataCallbackTest.cpp
20+
CelebornInputStreamRetryTest.cpp
2021
PushStateTest.cpp
2122
ReviveManagerTest.cpp
2223
Lz4DecompressorTest.cpp

0 commit comments

Comments
 (0)