Skip to content

Commit 654e9b8

Browse files
committed
fix: improve CS behavior with very low bgjobscnt
The current implementation of ProducerConsumerQueue::put blocks the caller thread until the queue is not full. One of such callers are the network workers of the chunkserver. This can cause a significant decrease in the system responsiveness when the parameter BGJOBSCNT_PER_NETWORK_WORKER is low given its workload. The changes proposed are the following: - make the put function non-blocking, i.e the maxSize parameter of the pcqueues can be violated. - check if the job pool is full at the instant of finishing some high level operation. If the job pool is full close the connection (csentry) after sending pending statuses or continue using the connection (idle state). The intended idea is to allow the pcqueue to have its maxSize limit violated but not much. The other instances of that class don't set any element limit. Side changes: - rename the WriteFinish state to IOFinish, to better reflect it now comprises other cases. Signed-off-by: Dave <dave@leil.io>
1 parent 50e48c2 commit 654e9b8

File tree

10 files changed

+95
-45
lines changed

10 files changed

+95
-45
lines changed

src/chunkserver/bgjobs.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,10 @@ uint32_t JobPool::getJobCount() const {
200200
return jobsQueue->elements();
201201
}
202202

203+
bool JobPool::isFull() const {
204+
return jobsQueue->isFull();
205+
}
206+
203207
void JobPool::disableAndChangeCallbackAll(const JobCallback &callback, uint32_t listenerId) {
204208
// Check if the listenerId is valid
205209
if (listenerId >= listenerInfos_.size()) {

src/chunkserver/bgjobs.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,9 @@ class JobPool {
153153
/// @brief Gets the number of jobs in the JobPool.
154154
uint32_t getJobCount() const;
155155

156+
/// @brief Checks if the JobPool is full.
157+
bool isFull() const;
158+
156159
/// @brief Disables all jobs and changes their callback function.
157160
///
158161
/// @param callback The new callback function to be set for all jobs.

src/chunkserver/chunk_high_level_ops.cc

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,8 @@ void ReadHighLevelOp::readFinishedCallback(uint8_t status, void *buffer) {
172172
isChunkOpen_ = false;
173173
}
174174

175-
// after sending status even if there was an error it's possible to
176-
// receive new requests on the same connection
177-
setParentState(ChunkserverEntry::State::Idle);
175+
// Send status and close connection
176+
setParentState(ChunkserverEntry::State::IOFinish);
178177
LOG_AVG_STOP(readOperationTimer_);
179178
}
180179
}
@@ -225,7 +224,15 @@ void ReadHighLevelOp::readContinue(uint16_t callMaxParallelHddReadJobs) {
225224
isChunkOpen_ = false;
226225
// no error - do not disconnect - go direct to the IDLE state, ready for
227226
// requests on the same connection
228-
setParentState(ChunkserverEntry::State::Idle);
227+
if (workerJobPool()->isFull()) {
228+
// If the worker job pool is full (best-effort check), try not to accept
229+
// more requests until it has free slots. Note: the pool state may change
230+
// after this check, but this serves as backpressure heuristic.
231+
setParentState(ChunkserverEntry::State::IOFinish);
232+
} else {
233+
// Ready for new requests, reset state
234+
setParentState(ChunkserverEntry::State::Idle);
235+
}
229236
LOG_AVG_STOP(readOperationTimer_);
230237
} else {
231238
std::vector<uint8_t> readDataPrefix;
@@ -341,7 +348,7 @@ void WriteHighLevelOp::startOpenWriteJob() {
341348
void WriteHighLevelOp::updateUsingWriteStatusAndReply(uint8_t status, uint32_t writeId) {
342349
if (status != SAUNAFS_STATUS_OK) {
343350
createAttachedWriteStatus(chunkId_, status, writeId);
344-
setParentState(ChunkserverEntry::State::WriteFinish);
351+
setParentState(ChunkserverEntry::State::IOFinish);
345352
return;
346353
}
347354

@@ -351,7 +358,7 @@ void WriteHighLevelOp::updateUsingWriteStatusAndReply(uint8_t status, uint32_t w
351358
return;
352359
}
353360

354-
// state is WriteForward or WriteFinish
361+
// state is WriteForward or IOFinish
355362
if (partiallyCompletedWrites_.count(writeId) > 0) {
356363
// found - it means that it was added by status_receive, ie. next
357364
// chunkserver from a chain finished writing before our worker

src/chunkserver/chunkserver_entry.cc

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ void ChunkserverEntry::fwdError() {
128128
uint8_t status =
129129
(state == State::Connecting ? SAUNAFS_ERROR_CANTCONNECT : SAUNAFS_ERROR_DISCONNECTED);
130130
createAttachedWriteStatus(chunkId, status, 0);
131-
state = State::WriteFinish;
131+
state = State::IOFinish;
132132
}
133133

134134
int ChunkserverEntry::initConnection() {
@@ -334,7 +334,7 @@ void ChunkserverEntry::writeInit(const uint8_t *data, PacketHeader::Type type,
334334

335335
if (initConnection() < kInitConnectionOK) {
336336
createAttachedWriteStatus(chunkId, SAUNAFS_ERROR_CANTCONNECT, 0);
337-
state = State::WriteFinish;
337+
state = State::IOFinish;
338338
return;
339339
}
340340
} else {
@@ -373,7 +373,7 @@ void ChunkserverEntry::writeData(const uint8_t *data, PacketHeader::Type type,
373373

374374
if (status != SAUNAFS_STATUS_OK) {
375375
createAttachedWriteStatus(opChunkId, status, writeId);
376-
state = State::WriteFinish;
376+
state = State::IOFinish;
377377
return;
378378
}
379379

@@ -413,15 +413,15 @@ void ChunkserverEntry::writeEnd(const uint8_t *data, uint32_t length) {
413413
cltocs::writeEnd::deserialize(data, length, opChunkId);
414414
} catch (IncorrectDeserializationException&) {
415415
safs::log_info("Received malformed WRITE_END message (length: {})", length);
416-
state = State::WriteFinish;
416+
state = State::IOFinish;
417417
return;
418418
}
419419

420420
if (opChunkId != chunkId) {
421421
safs::log_info(
422422
"Received malformed WRITE_END message (got chunkId={:016X}, expected {:016X})",
423423
opChunkId, chunkId);
424-
state = State::WriteFinish;
424+
state = State::IOFinish;
425425
return;
426426
}
427427

@@ -434,7 +434,7 @@ void ChunkserverEntry::writeEnd(const uint8_t *data, uint32_t length) {
434434
// TODO(msulikowski) temporary syslog message. May be useful until this
435435
// code is fully tested
436436
safs::log_info("Received WRITE_END message too early");
437-
state = State::WriteFinish;
437+
state = State::IOFinish;
438438
return;
439439
}
440440

@@ -445,7 +445,15 @@ void ChunkserverEntry::writeEnd(const uint8_t *data, uint32_t length) {
445445

446446
// All went fine, cleanup
447447
writeHLO_->cleanup();
448-
state = State::Idle;
448+
if (workerJobPool->isFull()) {
449+
// If the worker job pool is full (best-effort check), try not to accept
450+
// more requests until it has free slots. Note: the pool state may change
451+
// after this check, but this serves as backpressure heuristic.
452+
state = State::IOFinish;
453+
} else {
454+
// Ready for new requests, reset state
455+
state = State::Idle;
456+
}
449457
}
450458

451459
void ChunkserverEntry::sauGetChunkBlocks(const uint8_t *data, uint32_t length) {
@@ -676,13 +684,13 @@ void ChunkserverEntry::gotPacket(uint32_t type, const uint8_t *data,
676684
state = State::Close;
677685
break;
678686
}
679-
} else if (state == State::WriteFinish) {
687+
} else if (state == State::IOFinish) {
680688
switch (type) {
681689
case SAU_CLTOCS_WRITE_DATA:
682690
case SAU_CLTOCS_WRITE_END:
683691
return;
684692
default:
685-
safs::log_info("Got invalid message in WriteFinish state (type:{})", type);
693+
safs::log_info("Got invalid message in IOFinish state (type:{})", type);
686694
state = State::Close;
687695
}
688696
} else {

src/chunkserver/chunkserver_entry.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,9 @@ struct ChunkserverEntry {
9898
Connecting, // connecting to other chunkserver to form a writing chain
9999
WriteInit, // sending packet forming a chain to the next chunkserver
100100
WriteForward, // ready for writing data; will be forwarded to other CSs
101-
WriteFinish, // write error, will be closed after sending error status
102-
Close, // close request, will change to CloseWait or Closed
101+
IOFinish, // closing a connection after finishing IO, but before sending the final status
102+
// to the client
103+
Close, // close request, will change to CloseWait or Closed
103104
CloseWait, // waits for a worker to finish a job, then will be Closed
104105
Closed // ready to be deleted
105106
};
@@ -211,12 +212,12 @@ struct ChunkserverEntry {
211212
bool fromForward);
212213

213214
/// Handles forwarding errors by setting the appropriate error status and
214-
/// transitioning the connection state to `WriteFinish`.
215+
/// transitioning the connection state to `IOFinish`.
215216
///
216217
/// This function is called when an error occurs during forwarding
217218
/// operations, such as read or write errors on the forwarding socket. It
218219
/// serializes an error status message and attaches it to the packet, then
219-
/// sets the state to `WriteFinish` to indicate that the connection should
220+
/// sets the state to `IOFinish` to indicate that the connection should
220221
/// be closed after sending the error status.
221222
void fwdError();
222223

src/chunkserver/network_worker_thread.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ void NetworkWorkerThread::preparePollFds(bool isTerminating) {
205205
pdesc.back().events |= POLLOUT;
206206
}
207207
break;
208-
case ChunkserverEntry::State::WriteFinish:
208+
case ChunkserverEntry::State::IOFinish:
209209
if (!entry.outputPackets.empty()) {
210210
pdesc.emplace_back(pollfd(entry.sock, POLLOUT, 0));
211211
entry.pDescPos = pdesc.size() - 1;
@@ -242,7 +242,7 @@ void NetworkWorkerThread::servePoll() {
242242
if (lstate == ChunkserverEntry::State::Idle ||
243243
lstate == ChunkserverEntry::State::Read ||
244244
lstate == ChunkserverEntry::State::WriteLast ||
245-
lstate == ChunkserverEntry::State::WriteFinish ||
245+
lstate == ChunkserverEntry::State::IOFinish ||
246246
lstate == ChunkserverEntry::State::GetBlock) {
247247
if (entry.pDescPos >= 0 &&
248248
(pdesc[entry.pDescPos].revents & POLLIN)) {
@@ -297,7 +297,7 @@ void NetworkWorkerThread::servePoll() {
297297
eptr->writeToSocket();
298298
}
299299
}
300-
if (entry.state == ChunkserverEntry::State::WriteFinish &&
300+
if (entry.state == ChunkserverEntry::State::IOFinish &&
301301
entry.outputPackets.empty()) {
302302
entry.state = ChunkserverEntry::State::Close;
303303
}

src/common/pcqueue.cc

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ bool ProducerConsumerQueue::isFull() const {
6666
uint32_t ProducerConsumerQueue::sizeLeft() const {
6767
TRACETHIS();
6868
std::lock_guard<std::mutex> lock(mutex_);
69-
return maxSize_ > 0 ? maxSize_ - currentSize_ : UINT32_MAX;
69+
return maxSize_ > 0 ? (currentSize_ <= maxSize_ ? maxSize_ - currentSize_ : 0) : UINT32_MAX;
7070
}
7171

7272
uint32_t ProducerConsumerQueue::elements() const {
@@ -75,23 +75,13 @@ uint32_t ProducerConsumerQueue::elements() const {
7575
return currentElements_;
7676
}
7777

78-
bool ProducerConsumerQueue::put(uint32_t jobId, uint32_t jobType, uint8_t *data, uint32_t length,
78+
void ProducerConsumerQueue::put(uint32_t jobId, uint32_t jobType, uint8_t *data, uint32_t length,
7979
uint8_t priority) {
8080
TRACETHIS();
8181
std::unique_lock<std::mutex> lock(mutex_);
82-
notFull_.wait(lock, [this, length] {
83-
return maxSize_ == 0 || currentSize_ + length <= maxSize_;
84-
});
85-
86-
if (maxSize_ > 0 && length > maxSize_) {
87-
errno = EDEADLK;
88-
return false;
89-
}
90-
9182
put_(jobId, jobType, data, length, priority);
9283

9384
notEmpty_.notify_one();
94-
return true;
9585
}
9686

9787
bool ProducerConsumerQueue::tryPut(uint32_t jobId, uint32_t jobType, uint8_t *data, uint32_t length,
@@ -123,7 +113,6 @@ bool ProducerConsumerQueue::get(uint32_t *jobId, uint32_t *jobType,
123113
notEmpty_.wait(lock, [this] { return currentSize_ > 0; });
124114

125115
get_(jobId, jobType, data, length);
126-
notFull_.notify_one();
127116
return true;
128117
}
129118

@@ -141,7 +130,6 @@ bool ProducerConsumerQueue::tryGet(uint32_t *jobId, uint32_t *jobType,
141130
}
142131

143132
get_(jobId, jobType, data, length);
144-
notFull_.notify_one();
145133
return true;
146134
}
147135

src/common/pcqueue.h

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ inline void deleterDummy(uint8_t * /*unused*/) {}
4040
///
4141
/// Can be configured to support several priority levels. Final interface is queue-like,
4242
/// but preferring higher priority items and preserving order within each priority level.
43+
/// The maxSize parameter can be used to limit the number of items the queue should hold, but
44+
/// won't block put() calls and is just going to return false for tryPut() calls when the limit is
45+
/// reached.
4346
///
4447
/// This class provides a thread-safe queue implementation that allows multiple
4548
/// producers and consumers to add and remove items concurrently. It uses a
@@ -87,7 +90,7 @@ class ProducerConsumerQueue {
8790
/// and deleter.
8891
///
8992
/// @param priorityLevels The number of priority levels. Default is 1 (no priorities).
90-
/// @param maxSize The maximum number of elements the queue can hold.
93+
/// @param maxSize The maximum number of elements the queue should hold.
9194
/// Default is 0 (unlimited).
9295
/// @param deleter A callable type that defines how to delete the data
9396
/// stored in the queue. Default is deleterDummy.
@@ -120,14 +123,15 @@ class ProducerConsumerQueue {
120123

121124
/// @brief Adds an element to the queue.
122125
///
126+
/// @note This method is not blocked by the maxSize limit.
127+
///
123128
/// @param jobId The job ID associated with the element.
124129
/// @param jobType The job type associated with the element.
125130
/// @param data A pointer to the data to be added.
126131
/// @param length The length of the data to be added.
127132
/// @param priority The priority level of the element (0 is the highest
128133
/// priority). Default is 0.
129-
/// @return true if the element was added successfully, false otherwise.
130-
bool put(uint32_t jobId, uint32_t jobType, uint8_t *data, uint32_t length,
134+
void put(uint32_t jobId, uint32_t jobType, uint8_t *data, uint32_t length,
131135
uint8_t priority = 0);
132136

133137
/// @brief Tries to add an element to the queue without blocking.
@@ -215,8 +219,6 @@ class ProducerConsumerQueue {
215219
uint32_t currentSize_;
216220
///< Mutex for synchronizing access to the queue.
217221
mutable std::mutex mutex_;
218-
///< Condition variable to signal when the queue is not full.
219-
std::condition_variable notFull_;
220222
///< Condition variable to signal when the queue is not empty.
221223
std::condition_variable notEmpty_;
222224
///< The deleter function used to delete the data stored in the queue.

src/common/pcqueue_unittest.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ const uint32_t kMaxQueueSize = 100;
3333
TEST(ProducerConsumerQueueTests, SingleElement) {
3434
ProducerConsumerQueue queue(1, kMaxSize, customDeleter);
3535
auto *data = new uint8_t[kMaxLength];
36-
EXPECT_TRUE(queue.put(1, 1, data, kMaxLength));
36+
EXPECT_TRUE(queue.tryPut(1, 1, data, kMaxLength));
3737

3838
uint32_t jobId = 0;
3939
uint32_t jobType = 0;
@@ -52,7 +52,7 @@ TEST(ProducerConsumerQueueTests, MultipleElements) {
5252
ProducerConsumerQueue queue(1, kMaxQueueSize, customDeleter);
5353
for (int i = 0; i < kMaxSize; ++i) {
5454
auto *data = new uint8_t[kMaxLength];
55-
EXPECT_TRUE(queue.put(i, i, data, kMaxLength));
55+
EXPECT_TRUE(queue.tryPut(i, i, data, kMaxLength));
5656
}
5757

5858
for (uint32_t i = 0; i < kMaxSize; ++i) {
@@ -74,8 +74,8 @@ TEST(ProducerConsumerQueueTests, QueueFull) {
7474
auto *data1 = new uint8_t[kMaxLength];
7575
auto *data2 = new uint8_t[kMaxLength];
7676
auto *data3 = new uint8_t[kMaxLength];
77-
EXPECT_TRUE(queue.put(1, 1, data1, 1));
78-
EXPECT_TRUE(queue.put(2, 2, data2, 1));
77+
EXPECT_TRUE(queue.tryPut(1, 1, data1, 1));
78+
EXPECT_TRUE(queue.tryPut(2, 2, data2, 1));
7979
EXPECT_FALSE(queue.tryPut(3, 3, data3, kMaxLength));
8080
delete[] data3;
8181
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
timeout_set 1 minute
2+
3+
CHUNKSERVERS=8 \
4+
MOUNT_EXTRA_CONFIG="sfscachemode=NEVER,sfswriteworkers=100,sfsioretries=13" \
5+
CHUNKSERVER_EXTRA_CONFIG="NR_OF_NETWORK_WORKERS = 1|NR_OF_HDD_WORKERS_PER_NETWORK_WORKER = 1|`
6+
`BGJOBSCNT_PER_NETWORK_WORKER = 10" \
7+
MASTER_CUSTOM_GOALS="8 ec62: \$ec(6,2)"
8+
setup_local_empty_saunafs info
9+
10+
cd ${info[mount0]}
11+
12+
number_of_files=500
13+
14+
for i in $(seq 1 ${number_of_files}); do
15+
dd if=/dev/random of=${TEMP_DIR}/file_$i bs=64K count=6 conv=fsync &> /dev/null
16+
done
17+
18+
mkdir dir
19+
saunafs setgoal ec62 dir
20+
saunafs settrashtime 0 dir
21+
22+
for i in $(seq 1 ${number_of_files}); do
23+
(assert_success dd if="${TEMP_DIR}/file_${i}" of="dir/file_${i}" bs=384K count=1 \
24+
status=none &> /dev/null) &
25+
done
26+
27+
wait
28+
echo "All files written"
29+
30+
saunafs_chunkserver_daemon 0 stop
31+
saunafs_chunkserver_daemon 1 stop
32+
33+
for i in $(seq 1 ${number_of_files}); do
34+
assert_success dd if="dir/file_$i" of=/dev/null bs=384K count=1 status=none
35+
cmp "${TEMP_DIR}/file_$i" "dir/file_$i" || \
36+
{ echo "File $i is different after reading back"; exit 1; }
37+
done

0 commit comments

Comments
 (0)