Skip to content

Commit aa69268

Browse files
afterincomparableyumafterincomparableyum
authored andcommitted
addressing comments
1 parent 95ede62 commit aa69268

File tree

4 files changed

+62
-67
lines changed

4 files changed

+62
-67
lines changed

cpp/celeborn/client/tests/PushStateTest.cpp

Lines changed: 44 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,25 @@ class PushStateTest : public testing::Test {
2828
conf::CelebornConf conf;
2929
conf.registerProperty(
3030
conf::CelebornConf::kClientPushLimitInFlightTimeoutMs,
31-
std::to_string(pushTimeoutMs_));
31+
std::to_string(kPushTimeoutMs_));
3232
conf.registerProperty(
3333
conf::CelebornConf::kClientPushLimitInFlightSleepDeltaMs,
34-
std::to_string(pushSleepDeltaMs_));
34+
std::to_string(kPushSleepDeltaMs_));
3535
conf.registerProperty(
3636
conf::CelebornConf::kClientPushMaxReqsInFlightTotal,
37-
std::to_string(maxReqsInFlight_));
37+
std::to_string(kMaxReqsInFlight_));
3838
conf.registerProperty(
3939
conf::CelebornConf::kClientPushMaxReqsInFlightPerWorker,
40-
std::to_string(maxReqsInFlight_));
40+
std::to_string(kMaxReqsInFlight_));
4141

4242
pushState_ = std::make_unique<PushState>(conf);
4343
}
4444

4545
std::unique_ptr<PushState> pushState_;
46-
static constexpr int pushTimeoutMs_ = 100;
47-
static constexpr int pushSleepDeltaMs_ = 10;
48-
static constexpr int maxReqsInFlight_ = 2;
49-
static constexpr int defaultBatchSize_ = 1024;
46+
static constexpr int kPushTimeoutMs_ = 100;
47+
static constexpr int kPushSleepDeltaMs_ = 10;
48+
static constexpr int kMaxReqsInFlight_ = 2;
49+
static constexpr int kDefaultBatchSize_ = 1024;
5050
};
5151

5252
class PushStateBytesSizeTest : public testing::Test {
@@ -55,10 +55,10 @@ class PushStateBytesSizeTest : public testing::Test {
5555
conf::CelebornConf conf;
5656
conf.registerProperty(
5757
conf::CelebornConf::kClientPushLimitInFlightTimeoutMs,
58-
std::to_string(pushTimeoutMs_));
58+
std::to_string(kPushTimeoutMs_));
5959
conf.registerProperty(
6060
conf::CelebornConf::kClientPushLimitInFlightSleepDeltaMs,
61-
std::to_string(pushSleepDeltaMs_));
61+
std::to_string(kPushSleepDeltaMs_));
6262
conf.registerProperty(
6363
conf::CelebornConf::kClientPushMaxReqsInFlightTotal, "2");
6464
conf.registerProperty(
@@ -67,58 +67,58 @@ class PushStateBytesSizeTest : public testing::Test {
6767
conf::CelebornConf::kClientPushMaxBytesSizeInFlightEnabled, "true");
6868
conf.registerProperty(
6969
conf::CelebornConf::kClientPushMaxBytesSizeInFlightTotal,
70-
std::to_string(maxBytesSizeTotal_) + "B");
70+
std::to_string(kMaxBytesSizeTotal_) + "B");
7171
conf.registerProperty(
7272
conf::CelebornConf::kClientPushMaxBytesSizeInFlightPerWorker,
73-
std::to_string(maxBytesSizePerWorker_) + "B");
73+
std::to_string(kMaxBytesSizePerWorker_) + "B");
7474
conf.registerProperty(
7575
conf::CelebornConf::kClientPushBufferMaxSize,
76-
std::to_string(bufferMaxSize_) + "B");
76+
std::to_string(kBufferMaxSize_) + "B");
7777

7878
pushState_ = std::make_unique<PushState>(conf);
7979
}
8080

8181
std::unique_ptr<PushState> pushState_;
82-
static constexpr int pushTimeoutMs_ = 100;
83-
static constexpr int pushSleepDeltaMs_ = 10;
84-
static constexpr int batchSize_ = 1024;
85-
static constexpr long maxBytesSizeTotal_ = 3000;
86-
static constexpr long maxBytesSizePerWorker_ = 2500;
87-
static constexpr int bufferMaxSize_ = 65536;
82+
static constexpr int kPushTimeoutMs_ = 100;
83+
static constexpr int kPushSleepDeltaMs_ = 10;
84+
static constexpr int kBatchSize_ = 1024;
85+
static constexpr long kMaxBytesSizeTotal_ = 3000;
86+
static constexpr long kMaxBytesSizePerWorker_ = 2500;
87+
static constexpr int kBufferMaxSize_ = 65536;
8888
};
8989

9090
TEST_F(PushStateTest, limitMaxInFlight) {
9191
const std::string hostAndPushPort = "xx.xx.xx.xx:8080";
92-
const int addBatchCalls = maxReqsInFlight_ + 1;
92+
const int addBatchCalls = kMaxReqsInFlight_ + 1;
9393
std::vector<bool> addBatchMarks(addBatchCalls, false);
9494
std::thread addBatchThread([&]() {
9595
for (auto i = 0; i < addBatchCalls; i++) {
96-
pushState_->addBatch(i, defaultBatchSize_, hostAndPushPort);
96+
pushState_->addBatch(i, kDefaultBatchSize_, hostAndPushPort);
9797
EXPECT_FALSE(pushState_->limitMaxInFlight(hostAndPushPort));
9898
addBatchMarks[i] = true;
9999
}
100100
});
101101

102-
std::this_thread::sleep_for(std::chrono::milliseconds(pushSleepDeltaMs_));
103-
for (auto i = 0; i < maxReqsInFlight_; i++) {
102+
std::this_thread::sleep_for(std::chrono::milliseconds(kPushSleepDeltaMs_));
103+
for (auto i = 0; i < kMaxReqsInFlight_; i++) {
104104
EXPECT_TRUE(addBatchMarks[i]);
105105
}
106-
EXPECT_FALSE(addBatchMarks[maxReqsInFlight_]);
106+
EXPECT_FALSE(addBatchMarks[kMaxReqsInFlight_]);
107107

108108
pushState_->removeBatch(0, hostAndPushPort);
109109
addBatchThread.join();
110-
EXPECT_TRUE(addBatchMarks[maxReqsInFlight_]);
110+
EXPECT_TRUE(addBatchMarks[kMaxReqsInFlight_]);
111111
}
112112

113113
TEST_F(PushStateTest, limitMaxInFlightTimeout) {
114114
const std::string hostAndPushPort = "xx.xx.xx.xx:8080";
115-
const int addBatchCalls = maxReqsInFlight_ + 1;
115+
const int addBatchCalls = kMaxReqsInFlight_ + 1;
116116
std::vector<bool> addBatchMarks(addBatchCalls, false);
117117
std::thread addBatchThread([&]() {
118118
for (auto i = 0; i < addBatchCalls; i++) {
119-
pushState_->addBatch(i, defaultBatchSize_, hostAndPushPort);
119+
pushState_->addBatch(i, kDefaultBatchSize_, hostAndPushPort);
120120
auto result = pushState_->limitMaxInFlight(hostAndPushPort);
121-
if (i < maxReqsInFlight_) {
121+
if (i < kMaxReqsInFlight_) {
122122
EXPECT_FALSE(result);
123123
} else {
124124
EXPECT_TRUE(result);
@@ -127,27 +127,27 @@ TEST_F(PushStateTest, limitMaxInFlightTimeout) {
127127
}
128128
});
129129

130-
std::this_thread::sleep_for(std::chrono::milliseconds(pushSleepDeltaMs_));
131-
for (auto i = 0; i < maxReqsInFlight_; i++) {
130+
std::this_thread::sleep_for(std::chrono::milliseconds(kPushSleepDeltaMs_));
131+
for (auto i = 0; i < kMaxReqsInFlight_; i++) {
132132
EXPECT_TRUE(addBatchMarks[i]);
133133
}
134-
EXPECT_FALSE(addBatchMarks[maxReqsInFlight_]);
134+
EXPECT_FALSE(addBatchMarks[kMaxReqsInFlight_]);
135135

136136
addBatchThread.join();
137-
EXPECT_FALSE(addBatchMarks[maxReqsInFlight_]);
137+
EXPECT_FALSE(addBatchMarks[kMaxReqsInFlight_]);
138138
}
139139

140140
TEST_F(PushStateTest, limitZeroInFlight) {
141141
const std::string hostAndPushPort = "xx.xx.xx.xx:8080";
142142
const int addBatchCalls = 1;
143143
std::vector<bool> addBatchMarks(addBatchCalls, false);
144144
std::thread addBatchThread([&]() {
145-
pushState_->addBatch(0, defaultBatchSize_, hostAndPushPort);
145+
pushState_->addBatch(0, kDefaultBatchSize_, hostAndPushPort);
146146
EXPECT_FALSE(pushState_->limitZeroInFlight());
147147
addBatchMarks[0] = true;
148148
});
149149

150-
std::this_thread::sleep_for(std::chrono::milliseconds(pushSleepDeltaMs_));
150+
std::this_thread::sleep_for(std::chrono::milliseconds(kPushSleepDeltaMs_));
151151
EXPECT_FALSE(addBatchMarks[0]);
152152

153153
pushState_->removeBatch(0, hostAndPushPort);
@@ -160,13 +160,13 @@ TEST_F(PushStateTest, limitZeroInFlightTimeout) {
160160
const int addBatchCalls = 1;
161161
std::vector<bool> addBatchMarks(addBatchCalls, false);
162162
std::thread addBatchThread([&]() {
163-
pushState_->addBatch(0, defaultBatchSize_, hostAndPushPort);
163+
pushState_->addBatch(0, kDefaultBatchSize_, hostAndPushPort);
164164
auto result = pushState_->limitZeroInFlight();
165165
EXPECT_TRUE(result);
166166
addBatchMarks[0] = !result;
167167
});
168168

169-
std::this_thread::sleep_for(std::chrono::milliseconds(pushSleepDeltaMs_));
169+
std::this_thread::sleep_for(std::chrono::milliseconds(kPushSleepDeltaMs_));
170170
EXPECT_FALSE(addBatchMarks[0]);
171171

172172
addBatchThread.join();
@@ -201,7 +201,7 @@ TEST_F(PushStateBytesSizeTest, limitMaxInFlightByBytesSize) {
201201

202202
std::thread addBatchThread([&]() {
203203
for (auto i = 0; i < addBatchCalls; i++) {
204-
pushState_->addBatch(i, batchSize_, hostAndPushPort);
204+
pushState_->addBatch(i, kBatchSize_, hostAndPushPort);
205205
auto result = pushState_->limitMaxInFlight(hostAndPushPort);
206206
addBatchMarks[i] = true;
207207
if (i < expectedAllowedBatches) {
@@ -210,7 +210,7 @@ TEST_F(PushStateBytesSizeTest, limitMaxInFlightByBytesSize) {
210210
}
211211
});
212212

213-
std::this_thread::sleep_for(std::chrono::milliseconds(pushSleepDeltaMs_));
213+
std::this_thread::sleep_for(std::chrono::milliseconds(kPushSleepDeltaMs_));
214214
for (auto i = 0; i < expectedAllowedBatches; i++) {
215215
EXPECT_TRUE(addBatchMarks[i]) << "Batch " << i << " should have completed";
216216
}
@@ -224,20 +224,20 @@ TEST_F(PushStateBytesSizeTest, limitMaxInFlightByTotalBytesSize) {
224224
const std::string hostAndPushPort1 = "xx.xx.xx.xx:8080";
225225
const std::string hostAndPushPort2 = "yy.yy.yy.yy:8080";
226226

227-
pushState_->addBatch(0, batchSize_, hostAndPushPort1);
227+
pushState_->addBatch(0, kBatchSize_, hostAndPushPort1);
228228
EXPECT_FALSE(pushState_->limitMaxInFlight(hostAndPushPort1));
229229

230-
pushState_->addBatch(1, batchSize_, hostAndPushPort2);
230+
pushState_->addBatch(1, kBatchSize_, hostAndPushPort2);
231231
EXPECT_FALSE(pushState_->limitMaxInFlight(hostAndPushPort2));
232232

233233
std::atomic<bool> thirdBatchCompleted{false};
234234
std::thread addBatchThread([&]() {
235-
pushState_->addBatch(2, batchSize_, hostAndPushPort1);
235+
pushState_->addBatch(2, kBatchSize_, hostAndPushPort1);
236236
pushState_->limitMaxInFlight(hostAndPushPort1);
237237
thirdBatchCompleted = true;
238238
});
239239

240-
std::this_thread::sleep_for(std::chrono::milliseconds(pushSleepDeltaMs_));
240+
std::this_thread::sleep_for(std::chrono::milliseconds(kPushSleepDeltaMs_));
241241
EXPECT_FALSE(thirdBatchCompleted.load())
242242
<< "Third batch should be blocked due to total bytes limit";
243243

@@ -250,8 +250,8 @@ TEST_F(PushStateBytesSizeTest, limitMaxInFlightByTotalBytesSize) {
250250
TEST_F(PushStateBytesSizeTest, cleanupClearsBytesSizeTracking) {
251251
const std::string hostAndPushPort = "xx.xx.xx.xx:8080";
252252

253-
pushState_->addBatch(0, batchSize_, hostAndPushPort);
254-
pushState_->addBatch(1, batchSize_, hostAndPushPort);
253+
pushState_->addBatch(0, kBatchSize_, hostAndPushPort);
254+
pushState_->addBatch(1, kBatchSize_, hostAndPushPort);
255255
pushState_->cleanup();
256256

257257
EXPECT_FALSE(pushState_->limitMaxInFlight(hostAndPushPort));

cpp/celeborn/client/writer/PushState.cpp

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,6 @@ PushState::PushState(const conf::CelebornConf& conf)
3030
maxInFlightBytesSizeTotal_(conf.clientPushMaxBytesSizeInFlightTotal()),
3131
maxInFlightBytesSizePerWorker_(
3232
conf.clientPushMaxBytesSizeInFlightPerWorker()) {
33-
if (maxInFlightBytesSizeEnabled_) {
34-
inflightBytesSizePerAddress_.emplace();
35-
inflightBatchBytesSizes_.emplace();
36-
}
3733
}
3834

3935
int PushState::nextBatchId() {
@@ -51,11 +47,11 @@ void PushState::addBatch(
5147
totalInflightReqs_.fetch_add(1);
5248

5349
if (maxInFlightBytesSizeEnabled_) {
54-
auto bytesSizePerAddress = inflightBytesSizePerAddress_->computeIfAbsent(
50+
auto bytesSizePerAddress = inflightBytesSizePerAddress_.computeIfAbsent(
5551
hostAndPushPort,
5652
[&]() { return std::make_shared<std::atomic<long>>(0); });
5753
bytesSizePerAddress->fetch_add(batchBytesSize);
58-
inflightBatchBytesSizes_->set(batchId, batchBytesSize);
54+
inflightBatchBytesSizes_.set(batchId, batchBytesSize);
5955
totalInflightBytes_.fetch_add(batchBytesSize);
6056
}
6157
}
@@ -80,11 +76,11 @@ void PushState::removeBatch(int batchId, const std::string& hostAndPushPort) {
8076
totalInflightReqs_.fetch_sub(1);
8177

8278
if (maxInFlightBytesSizeEnabled_) {
83-
auto inflightBatchBytesSize = inflightBatchBytesSizes_->get(batchId);
84-
inflightBatchBytesSizes_->erase(batchId);
79+
auto inflightBatchBytesSize = inflightBatchBytesSizes_.get(batchId);
80+
inflightBatchBytesSizes_.erase(batchId);
8581
if (inflightBatchBytesSize.has_value()) {
8682
auto inflightBytesSize =
87-
inflightBytesSizePerAddress_->get(hostAndPushPort);
83+
inflightBytesSizePerAddress_.get(hostAndPushPort);
8884
if (inflightBytesSize.has_value()) {
8985
inflightBytesSize.value()->fetch_sub(inflightBatchBytesSize.value());
9086
}
@@ -105,13 +101,13 @@ bool PushState::limitMaxInFlight(const std::string& hostAndPushPort) {
105101
[&]() { return std::make_shared<utils::ConcurrentHashSet<int>>(); });
106102
std::shared_ptr<std::atomic<long>> batchBytesSize = nullptr;
107103
if (maxInFlightBytesSizeEnabled_) {
108-
batchBytesSize = inflightBytesSizePerAddress_->computeIfAbsent(
104+
batchBytesSize = inflightBytesSizePerAddress_.computeIfAbsent(
109105
hostAndPushPort,
110106
[&]() { return std::make_shared<std::atomic<long>>(0); });
111107
}
112108
long times = waitInflightTimeoutMs_ / deltaMs_;
113109
for (; times > 0; times--) {
114-
if (cleaned_) {
110+
if (cleaned_.load()) {
115111
return false;
116112
}
117113

@@ -169,7 +165,7 @@ bool PushState::limitZeroInFlight() {
169165

170166
long times = waitInflightTimeoutMs_ / deltaMs_;
171167
for (; times > 0; times--) {
172-
if (cleaned_) {
168+
if (cleaned_.load()) {
173169
return false;
174170
}
175171
if (totalInflightReqs_ <= 0) {
@@ -228,16 +224,14 @@ std::optional<std::string> PushState::getExceptionMsg() const {
228224
void PushState::cleanup() {
229225
LOG(INFO) << "Cleanup " << totalInflightReqs_.load()
230226
<< " requests in flight.";
231-
cleaned_ = true;
227+
cleaned_.store(true);
232228
inflightBatchesPerAddress_.clear();
233229
totalInflightReqs_ = 0;
234230
pushStrategy_->clear();
235231

236232
if (maxInFlightBytesSizeEnabled_) {
237-
LOG(INFO) << "Cleanup " << totalInflightBytes_.load()
238-
<< " bytes in flight.";
239-
inflightBytesSizePerAddress_->clear();
240-
inflightBatchBytesSizes_->clear();
233+
inflightBytesSizePerAddress_.clear();
234+
inflightBatchBytesSizes_.clear();
241235
totalInflightBytes_ = 0;
242236
}
243237
}

cpp/celeborn/client/writer/PushState.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,13 @@ class PushState {
8686
std::string,
8787
std::shared_ptr<utils::ConcurrentHashSet<int>>>
8888
inflightBatchesPerAddress_;
89-
std::optional<
90-
utils::ConcurrentHashMap<std::string, std::shared_ptr<std::atomic<long>>>>
89+
utils::ConcurrentHashMap<
90+
std::string,
91+
std::shared_ptr<std::atomic<long>>>
9192
inflightBytesSizePerAddress_;
92-
std::optional<utils::ConcurrentHashMap<int, int>> inflightBatchBytesSizes_;
93+
utils::ConcurrentHashMap<int, int> inflightBatchBytesSizes_;
9394
folly::Synchronized<std::unique_ptr<std::exception>> exception_;
94-
volatile bool cleaned_{false};
95+
std::atomic<bool> cleaned_{false};
9596
};
9697

9798
} // namespace client

cpp/celeborn/conf/CelebornConf.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,8 @@ int CelebornConf::clientPushBufferMaxSize() const {
277277
}
278278

279279
bool CelebornConf::clientPushMaxBytesSizeInFlightEnabled() const {
280-
return optionalProperty(kClientPushMaxBytesSizeInFlightEnabled).value() ==
281-
"true";
280+
return optionalProperty<bool>(kClientPushMaxBytesSizeInFlightEnabled)
281+
.value_or(false);
282282
}
283283

284284
long CelebornConf::clientPushMaxBytesSizeInFlightTotal() const {

0 commit comments

Comments
 (0)