Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/chunkserver/bgjobs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ uint32_t JobPool::getJobCount() const {
return jobsQueue->elements();
}

bool JobPool::isFull() const {
return jobsQueue->isFull();
}

void JobPool::disableAndChangeCallbackAll(const JobCallback &callback, uint32_t listenerId) {
// Check if the listenerId is valid
if (listenerId >= listenerInfos_.size()) {
Expand Down
3 changes: 3 additions & 0 deletions src/chunkserver/bgjobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ class JobPool {
/// @brief Gets the number of jobs in the JobPool.
uint32_t getJobCount() const;

/// @brief Checks if the JobPool is full.
bool isFull() const;

/// @brief Disables all jobs and changes their callback function.
///
/// @param callback The new callback function to be set for all jobs.
Expand Down
19 changes: 13 additions & 6 deletions src/chunkserver/chunk_high_level_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,8 @@ void ReadHighLevelOp::readFinishedCallback(uint8_t status, void *buffer) {
isChunkOpen_ = false;
}

// after sending status even if there was an error it's possible to
// receive new requests on the same connection
setParentState(ChunkserverEntry::State::Idle);
// Send status and close connection
setParentState(ChunkserverEntry::State::IOFinish);
LOG_AVG_STOP(readOperationTimer_);
}
}
Expand Down Expand Up @@ -225,7 +224,15 @@ void ReadHighLevelOp::readContinue(uint16_t callMaxParallelHddReadJobs) {
isChunkOpen_ = false;
// no error - do not disconnect - go direct to the IDLE state, ready for
// requests on the same connection
setParentState(ChunkserverEntry::State::Idle);
if (workerJobPool()->isFull()) {
// If the worker job pool is full (best-effort check), try not to accept
// more requests until it has free slots. Note: the pool state may change
// after this check, but this serves as backpressure heuristic.
setParentState(ChunkserverEntry::State::IOFinish);
} else {
// Ready for new requests, reset state
setParentState(ChunkserverEntry::State::Idle);
}
LOG_AVG_STOP(readOperationTimer_);
} else {
std::vector<uint8_t> readDataPrefix;
Expand Down Expand Up @@ -341,7 +348,7 @@ void WriteHighLevelOp::startOpenWriteJob() {
void WriteHighLevelOp::updateUsingWriteStatusAndReply(uint8_t status, uint32_t writeId) {
if (status != SAUNAFS_STATUS_OK) {
createAttachedWriteStatus(chunkId_, status, writeId);
setParentState(ChunkserverEntry::State::WriteFinish);
setParentState(ChunkserverEntry::State::IOFinish);
return;
}

Expand All @@ -351,7 +358,7 @@ void WriteHighLevelOp::updateUsingWriteStatusAndReply(uint8_t status, uint32_t w
return;
}

// state is WriteForward or WriteFinish
// state is WriteForward or IOFinish
if (partiallyCompletedWrites_.count(writeId) > 0) {
// found - it means that it was added by status_receive, ie. next
// chunkserver from a chain finished writing before our worker
Expand Down
26 changes: 17 additions & 9 deletions src/chunkserver/chunkserver_entry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void ChunkserverEntry::fwdError() {
uint8_t status =
(state == State::Connecting ? SAUNAFS_ERROR_CANTCONNECT : SAUNAFS_ERROR_DISCONNECTED);
createAttachedWriteStatus(chunkId, status, 0);
state = State::WriteFinish;
state = State::IOFinish;
}

int ChunkserverEntry::initConnection() {
Expand Down Expand Up @@ -334,7 +334,7 @@ void ChunkserverEntry::writeInit(const uint8_t *data, PacketHeader::Type type,

if (initConnection() < kInitConnectionOK) {
createAttachedWriteStatus(chunkId, SAUNAFS_ERROR_CANTCONNECT, 0);
state = State::WriteFinish;
state = State::IOFinish;
return;
}
} else {
Expand Down Expand Up @@ -373,7 +373,7 @@ void ChunkserverEntry::writeData(const uint8_t *data, PacketHeader::Type type,

if (status != SAUNAFS_STATUS_OK) {
createAttachedWriteStatus(opChunkId, status, writeId);
state = State::WriteFinish;
state = State::IOFinish;
return;
}

Expand Down Expand Up @@ -413,15 +413,15 @@ void ChunkserverEntry::writeEnd(const uint8_t *data, uint32_t length) {
cltocs::writeEnd::deserialize(data, length, opChunkId);
} catch (IncorrectDeserializationException&) {
safs::log_info("Received malformed WRITE_END message (length: {})", length);
state = State::WriteFinish;
state = State::IOFinish;
return;
}

if (opChunkId != chunkId) {
safs::log_info(
"Received malformed WRITE_END message (got chunkId={:016X}, expected {:016X})",
opChunkId, chunkId);
state = State::WriteFinish;
state = State::IOFinish;
return;
}

Expand All @@ -434,7 +434,7 @@ void ChunkserverEntry::writeEnd(const uint8_t *data, uint32_t length) {
// TODO(msulikowski) temporary syslog message. May be useful until this
// code is fully tested
safs::log_info("Received WRITE_END message too early");
state = State::WriteFinish;
state = State::IOFinish;
return;
}

Expand All @@ -445,7 +445,15 @@ void ChunkserverEntry::writeEnd(const uint8_t *data, uint32_t length) {

// All went fine, cleanup
writeHLO_->cleanup();
state = State::Idle;
if (workerJobPool->isFull()) {
// If the worker job pool is full (best-effort check), try not to accept
// more requests until it has free slots. Note: the pool state may change
// after this check, but this serves as backpressure heuristic.
state = State::IOFinish;
} else {
// Ready for new requests, reset state
state = State::Idle;
}
}

void ChunkserverEntry::sauGetChunkBlocks(const uint8_t *data, uint32_t length) {
Expand Down Expand Up @@ -676,13 +684,13 @@ void ChunkserverEntry::gotPacket(uint32_t type, const uint8_t *data,
state = State::Close;
break;
}
} else if (state == State::WriteFinish) {
} else if (state == State::IOFinish) {
switch (type) {
case SAU_CLTOCS_WRITE_DATA:
case SAU_CLTOCS_WRITE_END:
return;
default:
safs::log_info("Got invalid message in WriteFinish state (type:{})", type);
safs::log_info("Got invalid message in IOFinish state (type:{})", type);
state = State::Close;
}
} else {
Expand Down
9 changes: 5 additions & 4 deletions src/chunkserver/chunkserver_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ struct ChunkserverEntry {
Connecting, // connecting to other chunkserver to form a writing chain
WriteInit, // sending packet forming a chain to the next chunkserver
WriteForward, // ready for writing data; will be forwarded to other CSs
WriteFinish, // write error, will be closed after sending error status
Close, // close request, will change to CloseWait or Closed
IOFinish, // closing a connection after finishing IO, but before sending the final status
// to the client
Close, // close request, will change to CloseWait or Closed
CloseWait, // waits for a worker to finish a job, then will be Closed
Closed // ready to be deleted
};
Expand Down Expand Up @@ -211,12 +212,12 @@ struct ChunkserverEntry {
bool fromForward);

/// Handles forwarding errors by setting the appropriate error status and
/// transitioning the connection state to `WriteFinish`.
/// transitioning the connection state to `IOFinish`.
///
/// This function is called when an error occurs during forwarding
/// operations, such as read or write errors on the forwarding socket. It
/// serializes an error status message and attaches it to the packet, then
/// sets the state to `WriteFinish` to indicate that the connection should
/// sets the state to `IOFinish` to indicate that the connection should
/// be closed after sending the error status.
void fwdError();

Expand Down
6 changes: 3 additions & 3 deletions src/chunkserver/network_worker_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ void NetworkWorkerThread::preparePollFds(bool isTerminating) {
pdesc.back().events |= POLLOUT;
}
break;
case ChunkserverEntry::State::WriteFinish:
case ChunkserverEntry::State::IOFinish:
if (!entry.outputPackets.empty()) {
pdesc.emplace_back(pollfd(entry.sock, POLLOUT, 0));
entry.pDescPos = pdesc.size() - 1;
Expand Down Expand Up @@ -242,7 +242,7 @@ void NetworkWorkerThread::servePoll() {
if (lstate == ChunkserverEntry::State::Idle ||
lstate == ChunkserverEntry::State::Read ||
lstate == ChunkserverEntry::State::WriteLast ||
lstate == ChunkserverEntry::State::WriteFinish ||
lstate == ChunkserverEntry::State::IOFinish ||
lstate == ChunkserverEntry::State::GetBlock) {
if (entry.pDescPos >= 0 &&
(pdesc[entry.pDescPos].revents & POLLIN)) {
Expand Down Expand Up @@ -297,7 +297,7 @@ void NetworkWorkerThread::servePoll() {
eptr->writeToSocket();
}
}
if (entry.state == ChunkserverEntry::State::WriteFinish &&
if (entry.state == ChunkserverEntry::State::IOFinish &&
entry.outputPackets.empty()) {
entry.state = ChunkserverEntry::State::Close;
}
Expand Down
16 changes: 2 additions & 14 deletions src/common/pcqueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ bool ProducerConsumerQueue::isFull() const {
uint32_t ProducerConsumerQueue::sizeLeft() const {
TRACETHIS();
std::lock_guard<std::mutex> lock(mutex_);
return maxSize_ > 0 ? maxSize_ - currentSize_ : UINT32_MAX;
return maxSize_ > 0 ? (currentSize_ <= maxSize_ ? maxSize_ - currentSize_ : 0) : UINT32_MAX;
}

uint32_t ProducerConsumerQueue::elements() const {
Expand All @@ -75,23 +75,13 @@ uint32_t ProducerConsumerQueue::elements() const {
return currentElements_;
}

bool ProducerConsumerQueue::put(uint32_t jobId, uint32_t jobType, uint8_t *data, uint32_t length,
void ProducerConsumerQueue::put(uint32_t jobId, uint32_t jobType, uint8_t *data, uint32_t length,
uint8_t priority) {
TRACETHIS();
std::unique_lock<std::mutex> lock(mutex_);
notFull_.wait(lock, [this, length] {
return maxSize_ == 0 || currentSize_ + length <= maxSize_;
});

if (maxSize_ > 0 && length > maxSize_) {
errno = EDEADLK;
return false;
}

put_(jobId, jobType, data, length, priority);

notEmpty_.notify_one();
return true;
}

bool ProducerConsumerQueue::tryPut(uint32_t jobId, uint32_t jobType, uint8_t *data, uint32_t length,
Expand Down Expand Up @@ -123,7 +113,6 @@ bool ProducerConsumerQueue::get(uint32_t *jobId, uint32_t *jobType,
notEmpty_.wait(lock, [this] { return currentSize_ > 0; });

get_(jobId, jobType, data, length);
notFull_.notify_one();
return true;
}

Expand All @@ -141,7 +130,6 @@ bool ProducerConsumerQueue::tryGet(uint32_t *jobId, uint32_t *jobType,
}

get_(jobId, jobType, data, length);
notFull_.notify_one();
return true;
}

Expand Down
12 changes: 7 additions & 5 deletions src/common/pcqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ inline void deleterDummy(uint8_t * /*unused*/) {}
///
/// Can be configured to support several priority levels. Final interface is queue-like,
/// but preferring higher priority items and preserving order within each priority level.
/// The maxSize parameter can be used to limit the number of items the queue should hold, but
/// won't block put() calls and is just going to return false for tryPut() calls when the limit is
/// reached.
///
/// This class provides a thread-safe queue implementation that allows multiple
/// producers and consumers to add and remove items concurrently. It uses a
Expand Down Expand Up @@ -87,7 +90,7 @@ class ProducerConsumerQueue {
/// and deleter.
///
/// @param priorityLevels The number of priority levels. Default is 1 (no priorities).
/// @param maxSize The maximum number of elements the queue can hold.
/// @param maxSize The maximum number of elements the queue should hold.
/// Default is 0 (unlimited).
/// @param deleter A callable type that defines how to delete the data
/// stored in the queue. Default is deleterDummy.
Expand Down Expand Up @@ -120,14 +123,15 @@ class ProducerConsumerQueue {

/// @brief Adds an element to the queue.
///
/// @note This method is not blocked by the maxSize limit.
///
/// @param jobId The job ID associated with the element.
/// @param jobType The job type associated with the element.
/// @param data A pointer to the data to be added.
/// @param length The length of the data to be added.
/// @param priority The priority level of the element (0 is the highest
/// priority). Default is 0.
/// @return true if the element was added successfully, false otherwise.
bool put(uint32_t jobId, uint32_t jobType, uint8_t *data, uint32_t length,
void put(uint32_t jobId, uint32_t jobType, uint8_t *data, uint32_t length,
uint8_t priority = 0);

/// @brief Tries to add an element to the queue without blocking.
Expand Down Expand Up @@ -215,8 +219,6 @@ class ProducerConsumerQueue {
uint32_t currentSize_;
///< Mutex for synchronizing access to the queue.
mutable std::mutex mutex_;
///< Condition variable to signal when the queue is not full.
std::condition_variable notFull_;
///< Condition variable to signal when the queue is not empty.
std::condition_variable notEmpty_;
///< The deleter function used to delete the data stored in the queue.
Expand Down
8 changes: 4 additions & 4 deletions src/common/pcqueue_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const uint32_t kMaxQueueSize = 100;
TEST(ProducerConsumerQueueTests, SingleElement) {
ProducerConsumerQueue queue(1, kMaxSize, customDeleter);
auto *data = new uint8_t[kMaxLength];
EXPECT_TRUE(queue.put(1, 1, data, kMaxLength));
EXPECT_TRUE(queue.tryPut(1, 1, data, kMaxLength));

uint32_t jobId = 0;
uint32_t jobType = 0;
Expand All @@ -52,7 +52,7 @@ TEST(ProducerConsumerQueueTests, MultipleElements) {
ProducerConsumerQueue queue(1, kMaxQueueSize, customDeleter);
for (int i = 0; i < kMaxSize; ++i) {
auto *data = new uint8_t[kMaxLength];
EXPECT_TRUE(queue.put(i, i, data, kMaxLength));
EXPECT_TRUE(queue.tryPut(i, i, data, kMaxLength));
}

for (uint32_t i = 0; i < kMaxSize; ++i) {
Expand All @@ -74,8 +74,8 @@ TEST(ProducerConsumerQueueTests, QueueFull) {
auto *data1 = new uint8_t[kMaxLength];
auto *data2 = new uint8_t[kMaxLength];
auto *data3 = new uint8_t[kMaxLength];
EXPECT_TRUE(queue.put(1, 1, data1, 1));
EXPECT_TRUE(queue.put(2, 2, data2, 1));
EXPECT_TRUE(queue.tryPut(1, 1, data1, 1));
EXPECT_TRUE(queue.tryPut(2, 2, data2, 1));
EXPECT_FALSE(queue.tryPut(3, 3, data3, kMaxLength));
delete[] data3;
}
Expand Down
37 changes: 37 additions & 0 deletions tests/test_suites/ShortSystemTests/test_cs_very_low_bgjobscnt.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
timeout_set 1 minute

CHUNKSERVERS=8 \
MOUNT_EXTRA_CONFIG="sfscachemode=NEVER,sfswriteworkers=100,sfsioretries=13" \
CHUNKSERVER_EXTRA_CONFIG="NR_OF_NETWORK_WORKERS = 1|NR_OF_HDD_WORKERS_PER_NETWORK_WORKER = 1|`
`BGJOBSCNT_PER_NETWORK_WORKER = 10" \
MASTER_CUSTOM_GOALS="8 ec62: \$ec(6,2)"
setup_local_empty_saunafs info

cd ${info[mount0]}

number_of_files=500

for i in $(seq 1 ${number_of_files}); do
dd if=/dev/random of=${TEMP_DIR}/file_$i bs=64K count=6 conv=fsync &> /dev/null
done

mkdir dir
saunafs setgoal ec62 dir
saunafs settrashtime 0 dir

for i in $(seq 1 ${number_of_files}); do
(assert_success dd if="${TEMP_DIR}/file_${i}" of="dir/file_${i}" bs=384K count=1 \
status=none &> /dev/null) &
done

wait
echo "All files written"

saunafs_chunkserver_daemon 0 stop
saunafs_chunkserver_daemon 1 stop

for i in $(seq 1 ${number_of_files}); do
assert_success dd if="dir/file_$i" of=/dev/null bs=384K count=1 status=none
cmp "${TEMP_DIR}/file_$i" "dir/file_$i" || \
{ echo "File $i is different after reading back"; exit 1; }
done