Skip to content

Commit 4b82278

Browse files
committed
fix: improve CS behavior with very low bgjobscnt
Signed-off-by: Dave <dave@leil.io>
1 parent de7104e commit 4b82278

File tree

10 files changed

+90
-44
lines changed

10 files changed

+90
-44
lines changed

src/chunkserver/bgjobs.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,10 @@ uint32_t JobPool::getJobCount() const {
185185
return jobsQueue->elements();
186186
}
187187

188+
bool JobPool::isFull() const {
189+
return jobsQueue->isFull();
190+
}
191+
188192
void JobPool::disableAndChangeCallbackAll(const JobCallback &callback, uint32_t listenerId) {
189193
// Check if the listenerId is valid
190194
if (listenerId >= listenerInfos_.size()) {

src/chunkserver/bgjobs.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,9 @@ class JobPool {
142142
/// @brief Gets the number of jobs in the JobPool.
143143
uint32_t getJobCount() const;
144144

145+
/// @brief Checks if the JobPool is full.
146+
bool isFull() const;
147+
145148
/// @brief Disables all jobs and changes their callback function.
146149
///
147150
/// @param callback The new callback function to be set for all jobs.

src/chunkserver/chunk_high_level_ops.cc

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,8 @@ void ReadHighLevelOp::readFinishedCallback(uint8_t status, void *buffer) {
172172
isChunkOpen_ = false;
173173
}
174174

175-
// after sending status even if there was an error it's possible to
176-
// receive new requests on the same connection
177-
setParentState(ChunkserverEntry::State::Idle);
175+
// Send status and close connection, no need to go through the normal idle state
176+
setParentState(ChunkserverEntry::State::IOFinish);
178177
LOG_AVG_STOP(readOperationTimer_);
179178
}
180179
}
@@ -225,7 +224,11 @@ void ReadHighLevelOp::readContinue(uint16_t callMaxParallelHddReadJobs) {
225224
isChunkOpen_ = false;
226225
// no error - do not disconnect - go direct to the IDLE state, ready for
227226
// requests on the same connection
228-
setParentState(ChunkserverEntry::State::Idle);
227+
if (workerJobPool()->isFull()) {
228+
setParentState(ChunkserverEntry::State::IOFinish);
229+
} else {
230+
setParentState(ChunkserverEntry::State::Idle);
231+
}
229232
LOG_AVG_STOP(readOperationTimer_);
230233
} else {
231234
std::vector<uint8_t> readDataPrefix;
@@ -341,7 +344,7 @@ void WriteHighLevelOp::startOpenWriteJob() {
341344
void WriteHighLevelOp::updateUsingWriteStatusAndReply(uint8_t status, uint32_t writeId) {
342345
if (status != SAUNAFS_STATUS_OK) {
343346
createAttachedWriteStatus(chunkId_, status, writeId);
344-
setParentState(ChunkserverEntry::State::WriteFinish);
347+
setParentState(ChunkserverEntry::State::IOFinish);
345348
return;
346349
}
347350

@@ -351,7 +354,7 @@ void WriteHighLevelOp::updateUsingWriteStatusAndReply(uint8_t status, uint32_t w
351354
return;
352355
}
353356

354-
// state is WriteForward or WriteFinish
357+
// state is WriteForward or IOFinish
355358
if (partiallyCompletedWrites_.count(writeId) > 0) {
356359
// found - it means that it was added by status_receive, ie. next
357360
// chunkserver from a chain finished writing before our worker

src/chunkserver/chunkserver_entry.cc

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ void ChunkserverEntry::fwdError() {
128128
uint8_t status =
129129
(state == State::Connecting ? SAUNAFS_ERROR_CANTCONNECT : SAUNAFS_ERROR_DISCONNECTED);
130130
createAttachedWriteStatus(chunkId, status, 0);
131-
state = State::WriteFinish;
131+
state = State::IOFinish;
132132
}
133133

134134
int ChunkserverEntry::initConnection() {
@@ -334,7 +334,7 @@ void ChunkserverEntry::writeInit(const uint8_t *data, PacketHeader::Type type,
334334

335335
if (initConnection() < kInitConnectionOK) {
336336
createAttachedWriteStatus(chunkId, SAUNAFS_ERROR_CANTCONNECT, 0);
337-
state = State::WriteFinish;
337+
state = State::IOFinish;
338338
return;
339339
}
340340
} else {
@@ -373,7 +373,7 @@ void ChunkserverEntry::writeData(const uint8_t *data, PacketHeader::Type type,
373373

374374
if (status != SAUNAFS_STATUS_OK) {
375375
createAttachedWriteStatus(opChunkId, status, writeId);
376-
state = State::WriteFinish;
376+
state = State::IOFinish;
377377
return;
378378
}
379379

@@ -413,15 +413,15 @@ void ChunkserverEntry::writeEnd(const uint8_t *data, uint32_t length) {
413413
cltocs::writeEnd::deserialize(data, length, opChunkId);
414414
} catch (IncorrectDeserializationException&) {
415415
safs::log_info("Received malformed WRITE_END message (length: {})", length);
416-
state = State::WriteFinish;
416+
state = State::IOFinish;
417417
return;
418418
}
419419

420420
if (opChunkId != chunkId) {
421421
safs::log_info(
422422
"Received malformed WRITE_END message (got chunkId={:016X}, expected {:016X})",
423423
opChunkId, chunkId);
424-
state = State::WriteFinish;
424+
state = State::IOFinish;
425425
return;
426426
}
427427

@@ -434,7 +434,7 @@ void ChunkserverEntry::writeEnd(const uint8_t *data, uint32_t length) {
434434
// TODO(msulikowski) temporary syslog message. May be useful until this
435435
// code is fully tested
436436
safs::log_info("Received WRITE_END message too early");
437-
state = State::WriteFinish;
437+
state = State::IOFinish;
438438
return;
439439
}
440440

@@ -445,7 +445,15 @@ void ChunkserverEntry::writeEnd(const uint8_t *data, uint32_t length) {
445445

446446
// All went fine, cleanup
447447
writeHLO_->cleanup();
448-
state = State::Idle;
448+
if (workerJobPool->isFull()) {
449+
// If the worker job pool is full, let's try not to accept more requests until it has some
450+
// free slots. IOFinish is just in case there are some packets pending to be sent (which
451+
// should not be the case).
452+
state = State::IOFinish;
453+
} else {
454+
// Ready for new requests, reset state
455+
state = State::Idle;
456+
}
449457
}
450458

451459
void ChunkserverEntry::sauGetChunkBlocks(const uint8_t *data, uint32_t length) {
@@ -676,13 +684,13 @@ void ChunkserverEntry::gotPacket(uint32_t type, const uint8_t *data,
676684
state = State::Close;
677685
break;
678686
}
679-
} else if (state == State::WriteFinish) {
687+
} else if (state == State::IOFinish) {
680688
switch (type) {
681689
case SAU_CLTOCS_WRITE_DATA:
682690
case SAU_CLTOCS_WRITE_END:
683691
return;
684692
default:
685-
safs::log_info("Got invalid message in WriteFinish state (type:{})", type);
693+
safs::log_info("Got invalid message in IOFinish state (type:{})", type);
686694
state = State::Close;
687695
}
688696
} else {

src/chunkserver/chunkserver_entry.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ struct ChunkserverEntry {
9898
Connecting, // connecting to other chunkserver to form a writing chain
9999
WriteInit, // sending packet forming a chain to the next chunkserver
100100
WriteForward, // ready for writing data; will be forwarded to other CSs
101-
WriteFinish, // write error, will be closed after sending error status
101+
IOFinish, // error in some IO operation, will be closed after sending error status
102102
Close, // close request, will change to CloseWait or Closed
103103
CloseWait, // waits for a worker to finish a job, then will be Closed
104104
Closed // ready to be deleted
@@ -211,12 +211,12 @@ struct ChunkserverEntry {
211211
bool fromForward);
212212

213213
/// Handles forwarding errors by setting the appropriate error status and
214-
/// transitioning the connection state to `WriteFinish`.
214+
/// transitioning the connection state to `IOFinish`.
215215
///
216216
/// This function is called when an error occurs during forwarding
217217
/// operations, such as read or write errors on the forwarding socket. It
218218
/// serializes an error status message and attaches it to the packet, then
219-
/// sets the state to `WriteFinish` to indicate that the connection should
219+
/// sets the state to `IOFinish` to indicate that the connection should
220220
/// be closed after sending the error status.
221221
void fwdError();
222222

src/chunkserver/network_worker_thread.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ void NetworkWorkerThread::preparePollFds(bool isTerminating) {
200200
pdesc.back().events |= POLLOUT;
201201
}
202202
break;
203-
case ChunkserverEntry::State::WriteFinish:
203+
case ChunkserverEntry::State::IOFinish:
204204
if (!entry.outputPackets.empty()) {
205205
pdesc.emplace_back(pollfd(entry.sock, POLLOUT, 0));
206206
entry.pDescPos = pdesc.size() - 1;
@@ -237,7 +237,7 @@ void NetworkWorkerThread::servePoll() {
237237
if (lstate == ChunkserverEntry::State::Idle ||
238238
lstate == ChunkserverEntry::State::Read ||
239239
lstate == ChunkserverEntry::State::WriteLast ||
240-
lstate == ChunkserverEntry::State::WriteFinish ||
240+
lstate == ChunkserverEntry::State::IOFinish ||
241241
lstate == ChunkserverEntry::State::GetBlock) {
242242
if (entry.pDescPos >= 0 &&
243243
(pdesc[entry.pDescPos].revents & POLLIN)) {
@@ -292,7 +292,7 @@ void NetworkWorkerThread::servePoll() {
292292
eptr->writeToSocket();
293293
}
294294
}
295-
if (entry.state == ChunkserverEntry::State::WriteFinish &&
295+
if (entry.state == ChunkserverEntry::State::IOFinish &&
296296
entry.outputPackets.empty()) {
297297
entry.state = ChunkserverEntry::State::Close;
298298
}

src/common/pcqueue.cc

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ bool ProducerConsumerQueue::isFull() const {
6666
uint32_t ProducerConsumerQueue::sizeLeft() const {
6767
TRACETHIS();
6868
std::lock_guard<std::mutex> lock(mutex_);
69-
return maxSize_ > 0 ? maxSize_ - currentSize_ : UINT32_MAX;
69+
return maxSize_ > 0 ? (currentSize_ <= maxSize_ ? maxSize_ - currentSize_ : 0) : UINT32_MAX;
7070
}
7171

7272
uint32_t ProducerConsumerQueue::elements() const {
@@ -75,23 +75,13 @@ uint32_t ProducerConsumerQueue::elements() const {
7575
return currentElements_;
7676
}
7777

78-
bool ProducerConsumerQueue::put(uint32_t jobId, uint32_t jobType, uint8_t *data, uint32_t length,
78+
void ProducerConsumerQueue::put(uint32_t jobId, uint32_t jobType, uint8_t *data, uint32_t length,
7979
uint8_t priority) {
8080
TRACETHIS();
8181
std::unique_lock<std::mutex> lock(mutex_);
82-
notFull_.wait(lock, [this, length] {
83-
return maxSize_ == 0 || currentSize_ + length <= maxSize_;
84-
});
85-
86-
if (maxSize_ > 0 && length > maxSize_) {
87-
errno = EDEADLK;
88-
return false;
89-
}
90-
9182
put_(jobId, jobType, data, length, priority);
9283

9384
notEmpty_.notify_one();
94-
return true;
9585
}
9686

9787
bool ProducerConsumerQueue::tryPut(uint32_t jobId, uint32_t jobType, uint8_t *data, uint32_t length,
@@ -123,7 +113,6 @@ bool ProducerConsumerQueue::get(uint32_t *jobId, uint32_t *jobType,
123113
notEmpty_.wait(lock, [this] { return currentSize_ > 0; });
124114

125115
get_(jobId, jobType, data, length);
126-
notFull_.notify_one();
127116
return true;
128117
}
129118

@@ -141,7 +130,6 @@ bool ProducerConsumerQueue::tryGet(uint32_t *jobId, uint32_t *jobType,
141130
}
142131

143132
get_(jobId, jobType, data, length);
144-
notFull_.notify_one();
145133
return true;
146134
}
147135

src/common/pcqueue.h

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ inline void deleterDummy(uint8_t * /*unused*/) {}
4040
///
4141
/// Can be configured to support several priority levels. Final interface is queue-like,
4242
/// but preferring higher priority items and preserving order within each priority level.
43+
/// The maxSize parameter can be used to limit the number of items the queue should hold, but
44+
/// won't block put() calls and is just going to return false for tryPut() calls when the limit is
45+
/// reached.
4346
///
4447
/// This class provides a thread-safe queue implementation that allows multiple
4548
/// producers and consumers to add and remove items concurrently. It uses a
@@ -87,7 +90,7 @@ class ProducerConsumerQueue {
8790
/// and deleter.
8891
///
8992
/// @param priorityLevels The number of priority levels. Default is 1 (no priorities).
90-
/// @param maxSize The maximum number of elements the queue can hold.
93+
/// @param maxSize The maximum number of elements the queue should hold.
9194
/// Default is 0 (unlimited).
9295
/// @param deleter A callable type that defines how to delete the data
9396
/// stored in the queue. Default is deleterDummy.
@@ -120,14 +123,16 @@ class ProducerConsumerQueue {
120123

121124
/// @brief Adds an element to the queue.
122125
///
126+
/// It is not blocked by the maxSize limit, but will return false for tryPut() if the limit is
127+
/// reached.
128+
///
123129
/// @param jobId The job ID associated with the element.
124130
/// @param jobType The job type associated with the element.
125131
/// @param data A pointer to the data to be added.
126132
/// @param length The length of the data to be added.
127133
/// @param priority The priority level of the element (0 is the highest
128134
/// priority). Default is 0.
129-
/// @return true if the element was added successfully, false otherwise.
130-
bool put(uint32_t jobId, uint32_t jobType, uint8_t *data, uint32_t length,
135+
void put(uint32_t jobId, uint32_t jobType, uint8_t *data, uint32_t length,
131136
uint8_t priority = 0);
132137

133138
/// @brief Tries to add an element to the queue without blocking.
@@ -215,8 +220,6 @@ class ProducerConsumerQueue {
215220
uint32_t currentSize_;
216221
///< Mutex for synchronizing access to the queue.
217222
mutable std::mutex mutex_;
218-
///< Condition variable to signal when the queue is not full.
219-
std::condition_variable notFull_;
220223
///< Condition variable to signal when the queue is not empty.
221224
std::condition_variable notEmpty_;
222225
///< The deleter function used to delete the data stored in the queue.

src/common/pcqueue_unittest.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ const uint32_t kMaxQueueSize = 100;
3333
TEST(ProducerConsumerQueueTests, SingleElement) {
3434
ProducerConsumerQueue queue(1, kMaxSize, customDeleter);
3535
auto *data = new uint8_t[kMaxLength];
36-
EXPECT_TRUE(queue.put(1, 1, data, kMaxLength));
36+
EXPECT_TRUE(queue.tryPut(1, 1, data, kMaxLength));
3737

3838
uint32_t jobId = 0;
3939
uint32_t jobType = 0;
@@ -52,7 +52,7 @@ TEST(ProducerConsumerQueueTests, MultipleElements) {
5252
ProducerConsumerQueue queue(1, kMaxQueueSize, customDeleter);
5353
for (int i = 0; i < kMaxSize; ++i) {
5454
auto *data = new uint8_t[kMaxLength];
55-
EXPECT_TRUE(queue.put(i, i, data, kMaxLength));
55+
EXPECT_TRUE(queue.tryPut(i, i, data, kMaxLength));
5656
}
5757

5858
for (uint32_t i = 0; i < kMaxSize; ++i) {
@@ -74,8 +74,8 @@ TEST(ProducerConsumerQueueTests, QueueFull) {
7474
auto *data1 = new uint8_t[kMaxLength];
7575
auto *data2 = new uint8_t[kMaxLength];
7676
auto *data3 = new uint8_t[kMaxLength];
77-
EXPECT_TRUE(queue.put(1, 1, data1, 1));
78-
EXPECT_TRUE(queue.put(2, 2, data2, 1));
77+
EXPECT_TRUE(queue.tryPut(1, 1, data1, 1));
78+
EXPECT_TRUE(queue.tryPut(2, 2, data2, 1));
7979
EXPECT_FALSE(queue.tryPut(3, 3, data3, kMaxLength));
8080
delete[] data3;
8181
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
timeout_set 1 minute
2+
3+
CHUNKSERVERS=8 \
4+
MOUNT_EXTRA_CONFIG="sfscachemode=NEVER,sfswriteworkers=100,sfsioretries=13" \
5+
CHUNKSERVER_EXTRA_CONFIG="NR_OF_NETWORK_WORKERS = 1|NR_OF_HDD_WORKERS_PER_NETWORK_WORKER = 1|`
6+
`BGJOBSCNT_PER_NETWORK_WORKER = 10" \
7+
MASTER_CUSTOM_GOALS="8 ec62: \$ec(6,2)"
8+
setup_local_empty_saunafs info
9+
10+
cd ${info[mount0]}
11+
12+
number_of_files=500
13+
14+
for i in $(seq 1 ${number_of_files}); do
15+
dd if=/dev/random of=${TEMP_DIR}/file_$i bs=64K count=6 conv=fsync &> /dev/null
16+
done
17+
18+
mkdir dir
19+
saunafs setgoal ec62 dir
20+
saunafs settrashtime 0 dir
21+
22+
for i in $(seq 1 ${number_of_files}); do
23+
(assert_success dd if="${TEMP_DIR}/file_${i}" of="dir/file_${i}" bs=384K count=1 \
24+
status=none &> /dev/null) &
25+
done
26+
27+
wait
28+
echo "All files written"
29+
30+
saunafs_chunkserver_daemon 0 stop
31+
saunafs_chunkserver_daemon 1 stop
32+
33+
for i in $(seq 1 ${number_of_files}); do
34+
assert_success dd if="dir/file_$i" of=/dev/null bs=384K count=1 status=none
35+
cmp "${TEMP_DIR}/file_$i" "dir/file_$i" || \
36+
{ echo "File $i is different after reading back"; exit 1; }
37+
done

0 commit comments

Comments
 (0)