Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions doc/sfschunkserver.cfg.5.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,13 @@ config value LOG_LEVEL to determine logging level. Valid log levels are
- 'crit' or 'critical'
- 'off'

*IO_PRIORITY_MODE (EXPERIMENTAL)*:: Define IO jobs, i.e reads and writes from drives,
prioritization scheme. The available modes are:
- 'FIFO': prioritizes the IO jobs first enqueued.
- 'SWITCH': alternates read and write jobs (interleaving) to prevent one class of IO from
starving the other.
Not reloadable. (Default: FIFO)

*TLS_CERT_FILE (EXPERIMENTAL)*::
Path to the TLS certificate file the chunk server will use for TLS connections
(there is no default value).
Expand Down
1 change: 1 addition & 0 deletions src/admin/dump_config_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ const static std::unordered_map<std::string, std::string> defaultOptionsCS = {
{"NR_OF_NETWORK_WORKERS", "4"},
{"NR_OF_HDD_WORKERS_PER_NETWORK_WORKER", "16"},
{"BGJOBSCNT_PER_NETWORK_WORKER", "4000"},
{"IO_PRIORITY_MODE", "FIFO"},
{"MAX_BLOCKS_PER_HDD_WRITE_JOB", "16"},
{"MAX_BLOCKS_PER_HDD_READ_JOB", "16"},
{"MAX_PARALLEL_HDD_READ_JOBS_PER_CS_ENTRY", "1"},
Expand Down
449 changes: 264 additions & 185 deletions src/chunkserver/bgjobs.cc

Large diffs are not rendered by default.

419 changes: 283 additions & 136 deletions src/chunkserver/bgjobs.h

Large diffs are not rendered by default.

147 changes: 126 additions & 21 deletions src/chunkserver/bgjobs_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,42 @@ void mockJobCallback(uint8_t /*status*/, void *extra) {
counter->fetch_add(1);
}

void mockJobCallbackDoubling(uint8_t /*status*/, void *extra) {
auto *counter = static_cast<std::atomic<int> *>(extra);
counter->fetch_add(counter->load());
}

constexpr uint32_t kWaitTimeMs = 100;

// Test fixture for JobPool tests
class JobPoolTest : public ::testing::Test {
private:
void servePoll(uint32_t listenerId) {
struct pollfd wakeupDescPollFd = {wakeupDescVec[listenerId], POLLIN, 0};
std::vector<pollfd> wakeupDescPollFDs = {
pollfd{wakeupDescVec[listenerId], POLLIN, 0},
pollfd{masterWakeupDescVec[listenerId], POLLIN, 0},
pollfd{clientSwitchModeWakeupDescVec[listenerId], POLLIN, 0},
pollfd{clientFifoModeWakeupDescVec[listenerId], POLLIN, 0}};

while (!terminate) {
int ret = poll(&wakeupDescPollFd, 1, kWaitTimeMs);
int ret = poll(&wakeupDescPollFDs[0], wakeupDescPollFDs.size(), kWaitTimeMs);
if (ret > 0) {
if (wakeupDescPollFd.revents & POLLIN) {
if (wakeupDescPollFDs[0].revents & POLLIN) {
processingCount[listenerId].fetch_add(1);
jobPool->processCompletedJobs(listenerId);
}
if (wakeupDescPollFDs[1].revents & POLLIN) {
processingCount[listenerId].fetch_add(1);
masterJobPool->processCompletedJobs(listenerId);
}
if (wakeupDescPollFDs[2].revents & POLLIN) {
processingCount[listenerId].fetch_add(1);
clientJobPoolSwitch->processCompletedJobs(listenerId);
}
if (wakeupDescPollFDs[3].revents & POLLIN) {
processingCount[listenerId].fetch_add(1);
clientJobPoolFifo->processCompletedJobs(listenerId);
}
}
}
}
Expand All @@ -54,6 +76,24 @@ class JobPoolTest : public ::testing::Test {
// Let's create some listeners for the JobPool
wakeupDescVec.resize(kNrListeners);
jobPool = std::make_unique<JobPool>("TestPool", 4, 10, kNrListeners, wakeupDescVec);
jobPool->startWorkers();

masterWakeupDescVec.resize(kNrListeners);
masterJobPool = std::make_unique<MasterJobPool>("TestMasterPool", 4, 10, kNrListeners,
masterWakeupDescVec);

// For the ClientJobPool, we want to test both IOPriorityMode::Fifo and
// IOPriorityMode::Switch, so we will initialize it in the test cases instead of here. For
// now, we can just initialize it with Switch mode as default.
clientSwitchModeWakeupDescVec.resize(kNrListeners);
clientJobPoolSwitch =
std::make_unique<ClientJobPool>("TestClientPoolSwitch", 1, 10, kNrListeners,
clientSwitchModeWakeupDescVec, IOPriorityMode::Switch);
clientFifoModeWakeupDescVec.resize(kNrListeners);
clientJobPoolFifo =
std::make_unique<ClientJobPool>("TestClientPoolFifo", 1, 10, kNrListeners,
clientFifoModeWakeupDescVec, IOPriorityMode::Fifo);

for (uint32_t i = 0; i < kNrListeners; ++i) {
processingCount[i] = 0;
}
Expand Down Expand Up @@ -89,11 +129,19 @@ class JobPoolTest : public ::testing::Test {
}

JobPool::ProcessJobCallback mockProcessJob = []() -> uint8_t {
usleep(1000); // Simulate some work by sleeping for 1ms
return 0; // Return success status
};

std::unique_ptr<JobPool> jobPool;
std::vector<int> wakeupDescVec;
std::unique_ptr<MasterJobPool> masterJobPool;
std::vector<int> masterWakeupDescVec;
std::unique_ptr<ClientJobPool> clientJobPoolSwitch;
std::vector<int> clientSwitchModeWakeupDescVec;
std::unique_ptr<ClientJobPool> clientJobPoolFifo;
std::vector<int> clientFifoModeWakeupDescVec;

std::vector<std::thread> servePollThreads;
bool terminate;
std::mutex mutex_;
Expand Down Expand Up @@ -233,52 +281,109 @@ TEST_F(JobPoolTest, ChunkLocking) {
ChunkWithType chunkWithType3{chunkId3, chunkType};
ChunkWithType chunkWithType4{chunkId4, chunkType};

jobPool->startChunkLock(mockJobCallback, &counters[0], chunkId1, chunkType, 0);
jobPool->startChunkLock(mockJobCallback, &counters[0], chunkId2, chunkType, 0);
jobPool->startChunkLock(mockJobCallback, nullptr, chunkId4, chunkType, 0);
jobPool->enforceChunkLock(chunkId1, chunkType);
jobPool->enforceChunkLock(chunkId4, chunkType);
masterJobPool->startChunkLock(mockJobCallback, &counters[0], chunkId1, chunkType, 0);
masterJobPool->startChunkLock(mockJobCallback, &counters[0], chunkId2, chunkType, 0);
masterJobPool->startChunkLock(mockJobCallback, nullptr, chunkId4, chunkType, 0);
masterJobPool->enforceChunkLock(chunkId1, chunkType);
masterJobPool->enforceChunkLock(chunkId4, chunkType);

jobPool->addJobIfNotLocked(chunkWithType1, JobPool::ChunkOperation::Read, mockJobCallback,
&counters[0], mockProcessJob);
masterJobPool->addJobIfNotLocked(chunkWithType1, JobPool::ChunkOperation::Read, mockJobCallback,
&counters[0], mockProcessJob);
std::this_thread::sleep_for(std::chrono::milliseconds(kWaitTimeMs));
// Job should not be processed because chunk is locked and enforced
EXPECT_EQ(counters[0].load(), 0);

jobPool->addJobIfNotLocked(chunkWithType2, JobPool::ChunkOperation::Read, mockJobCallback,
&counters[0], mockProcessJob);
masterJobPool->addJobIfNotLocked(chunkWithType2, JobPool::ChunkOperation::Read, mockJobCallback,
&counters[0], mockProcessJob);
std::this_thread::sleep_for(std::chrono::milliseconds(kWaitTimeMs));
// Job should be processed because chunk is locked but not enforced
EXPECT_EQ(counters[0].load(), 1);

jobPool->addJobIfNotLocked(chunkWithType3, JobPool::ChunkOperation::Read, mockJobCallback,
&counters[0], mockProcessJob);
masterJobPool->addJobIfNotLocked(chunkWithType3, JobPool::ChunkOperation::Read, mockJobCallback,
&counters[0], mockProcessJob);
std::this_thread::sleep_for(std::chrono::milliseconds(kWaitTimeMs));
// Job should be processed because chunk is not locked
EXPECT_EQ(counters[0].load(), 2);

jobPool->addJobIfNotLocked(chunkWithType4, JobPool::ChunkOperation::Read, mockJobCallback,
&counters[0], mockProcessJob);
jobPool->addJobIfNotLocked(chunkWithType4, JobPool::ChunkOperation::Read, mockJobCallback,
&counters[0], mockProcessJob);
masterJobPool->addJobIfNotLocked(chunkWithType4, JobPool::ChunkOperation::Read, mockJobCallback,
&counters[0], mockProcessJob);
masterJobPool->addJobIfNotLocked(chunkWithType4, JobPool::ChunkOperation::Read, mockJobCallback,
&counters[0], mockProcessJob);
std::this_thread::sleep_for(std::chrono::milliseconds(kWaitTimeMs));
// Job should not be processed because chunk is locked and enforced
EXPECT_EQ(counters[0].load(), 2);

jobPool->endChunkLock(chunkId1, chunkType, SAUNAFS_STATUS_OK);
masterJobPool->endChunkLock(chunkId1, chunkType, SAUNAFS_STATUS_OK);
std::this_thread::sleep_for(std::chrono::milliseconds(kWaitTimeMs));
// Now the job for chunkWithType1 should be processed because the lock is released and the
// callback for the lock job should be called
EXPECT_EQ(counters[0].load(), 4);

jobPool->endChunkLock(chunkId2, chunkType, SAUNAFS_STATUS_OK);
masterJobPool->endChunkLock(chunkId2, chunkType, SAUNAFS_STATUS_OK);
std::this_thread::sleep_for(std::chrono::milliseconds(kWaitTimeMs));
// Now the callback for the lock job of chunkWithType2 should be called
EXPECT_EQ(counters[0].load(), 5);

jobPool->eraseChunkLock(chunkId4, chunkType);
masterJobPool->eraseChunkLock(chunkId4, chunkType);
std::this_thread::sleep_for(std::chrono::milliseconds(kWaitTimeMs));
// Now the jobs for chunkWithType4 should be processed because the lock is released, but the
// callback for the lock job should not be called
EXPECT_EQ(counters[0].load(), 7);
}

TEST_F(JobPoolTest, IOPriorityMode) {
const int kInitialSwitchCounterValue = 5;
const int kInitialFifoCounterValue = 7;
counters[0] = kInitialSwitchCounterValue;
counters[1] = kInitialFifoCounterValue;

// Add a series of Read and Write jobs to the ClientJobPool in Switch mode and check the
// processing order by the final value.
for (int i = 0; i < 5; ++i) {
clientJobPoolSwitch->addJob(JobPool::ChunkOperation::Read, mockJobCallback, &counters[0],
mockProcessJob);
}
for (int i = 0; i < 5; ++i) {
clientJobPoolSwitch->addJob(JobPool::ChunkOperation::Write, mockJobCallbackDoubling,
&counters[0], mockProcessJob);
}

std::this_thread::sleep_for(std::chrono::milliseconds(kWaitTimeMs));

// In Switch mode, the Read and Write jobs should be processed in an alternating manner, so
// the final value should reflect the interleaving of the operations.
int switchCounterExpectedFinalValue = kInitialSwitchCounterValue;
for (int i = 0; i < 5; ++i) {
// Each Read job adds 1, and each Write job doubles the current value.
switchCounterExpectedFinalValue = (switchCounterExpectedFinalValue + 1) * 2;
}

EXPECT_EQ(counters[0].load(), switchCounterExpectedFinalValue);

// Add a series of Read and Write jobs to the ClientJobPool in Fifo mode and check the
// processing order by the final value.
for (int i = 0; i < 5; ++i) {
clientJobPoolFifo->addJob(JobPool::ChunkOperation::Read, mockJobCallback, &counters[1],
mockProcessJob);
}
for (int i = 0; i < 5; ++i) {
clientJobPoolFifo->addJob(JobPool::ChunkOperation::Write, mockJobCallbackDoubling,
&counters[1], mockProcessJob);
}

std::this_thread::sleep_for(std::chrono::milliseconds(kWaitTimeMs));

// In Fifo mode, the Read jobs should be processed first followed by the Write jobs, so the
// final value should reflect the sequential processing of the operations.
int fifoCounterExpectedFinalValue = kInitialFifoCounterValue;
for (int i = 0; i < 5; ++i) {
// Each Read job adds 1.
fifoCounterExpectedFinalValue++;
}
for (int i = 0; i < 5; ++i) {
// Each Write job doubles the current value.
fifoCounterExpectedFinalValue *= 2;
}

EXPECT_EQ(counters[1].load(), fifoCounterExpectedFinalValue);
}
4 changes: 2 additions & 2 deletions src/chunkserver/chunk_high_level_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
#include "devtools/request_log.h"
#include "protocol/cltocs.h"

class JobPool;
class ClientJobPool;

inline constexpr uint8_t kSauWriteDataPrefixSize = cltocs::writeData::kPrefixSize;
// For forwarding: size of SAU_CLTOCS_WRITE_DATA prefix plus the packet header.
Expand Down Expand Up @@ -60,7 +60,7 @@ class HighLevelOp {
void setParentState(ChunkserverEntry::State newState) { parent_->state = newState; }

/// Job pool of the network worker handling the parent ChunkserverEntry.
JobPool *workerJobPool() const { return parent_->workerJobPool; }
ClientJobPool *workerJobPool() const { return parent_->workerJobPool; }

/// Checks and applies closing on the parent ChunkserverEntry.
void checkAndApplyClosedOnParent() const { parent_->checkAndApplyClosed(); }
Expand Down
2 changes: 1 addition & 1 deletion src/chunkserver/chunkserver_entry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ static ConnectionPool gForwardConnectionPool;
static constexpr uint32_t kMaxPacketSize = 100000 + SFSBLOCKSIZE;
static constexpr uint8_t kConnectRetries = 10;

ChunkserverEntry::ChunkserverEntry(int socket, JobPool *workerJobPool,
ChunkserverEntry::ChunkserverEntry(int socket, ClientJobPool *workerJobPool,
uint16_t maxBlocksPerHddReadJob, uint16_t maxParallelHddReadJobs,
uint16_t maxBlocksPerHddWriteJob)
: workerJobPool(workerJobPool), sock(socket) {
Expand Down
4 changes: 2 additions & 2 deletions src/chunkserver/chunkserver_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ struct ChunkserverEntry {
static constexpr uint32_t kGenerateChartExpectedPacketSize =
sizeof(uint32_t);

JobPool *workerJobPool; // Job pool assigned to a given network worker thread
ClientJobPool *workerJobPool; // Job pool assigned to a given network worker thread

ChunkserverEntry::State state = ChunkserverEntry::State::Idle;
ChunkserverEntry::Mode mode = ChunkserverEntry::Mode::Header;
Expand Down Expand Up @@ -141,7 +141,7 @@ struct ChunkserverEntry {
uint32_t chunkVersion = 0; // R+W
ChunkPartType chunkType = slice_traits::standard::ChunkPartType(); // R

ChunkserverEntry(int socket, JobPool *workerJobPool, uint16_t maxBlocksPerHddReadJob,
ChunkserverEntry(int socket, ClientJobPool *workerJobPool, uint16_t maxBlocksPerHddReadJob,
uint16_t maxParallelHddReadJobs, uint16_t maxBlocksPerHddWriteJob);

// Disallow copying and moving to avoid misuse.
Expand Down
11 changes: 6 additions & 5 deletions src/chunkserver/master_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ inline std::string gLabel;
inline uint32_t gTimeout_ms;

// Forward declaration
class JobPool;
class MasterJobPool;

/// @brief Enum representing the connection mode to the Metadata Server (MDS).
enum class ConnectionMode : std::uint8_t {
Expand All @@ -70,8 +70,8 @@ enum class RegistrationStatus : std::uint8_t {
class MasterConn {
public:
explicit MasterConn(const std::string &masterHostStr, const std::string &masterPortStr,
const std::string &clusterId, const std::shared_ptr<JobPool> &jobPool,
const std::shared_ptr<JobPool> &replicationJobPool)
const std::string &clusterId, const std::shared_ptr<MasterJobPool> &jobPool,
const std::shared_ptr<MasterJobPool> &replicationJobPool)
: masterHostStr_(masterHostStr),
masterPortStr_(masterPortStr),
clusterId_(clusterId),
Expand Down Expand Up @@ -231,8 +231,9 @@ class MasterConn {
std::string masterPortStr_; ///< Port of the master server.
uint32_t version_{saunafsVersion(0, 0, 0)}; ///< Version of the master server.
std::string clusterId_; ///< Cluster ID for this connection.
std::shared_ptr<JobPool> jobPool_; ///< Shared reference to the JobPool.
std::shared_ptr<JobPool> replicationJobPool_; ///< Shared reference to the ReplicationJobPool.
std::shared_ptr<MasterJobPool> jobPool_; ///< Shared reference to the JobPool.
/// Shared reference to the ReplicationJobPool.
std::shared_ptr<MasterJobPool> replicationJobPool_;

// For compatibility with old masters (version < 5.0)
void handleRegistrationAttempt();
Expand Down
14 changes: 7 additions & 7 deletions src/chunkserver/masterconn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ static bool gEnableLoadFactor;
static const uint64_t kSendStatusDelay = 5;

// JobPool shared between all connections to MDSs
static std::shared_ptr<JobPool> gJobPool;
static std::shared_ptr<JobPool> gReplicationJobPool;
static std::shared_ptr<MasterJobPool> gJobPool;
static std::shared_ptr<MasterJobPool> gReplicationJobPool;

// Singleton for the MasterConn instance (will become a list of connections in the future)
static std::unique_ptr<MasterConn> gMasterConnSingleton = nullptr;
Expand Down Expand Up @@ -140,7 +140,7 @@ void masterconn_unwantedjobfinished(uint8_t status, void *packet) {
MasterConn::deletePacket(packet);
}

JobPool* masterconn_get_job_pool() {
MasterJobPool* masterconn_get_job_pool() {
return gJobPool.get();
}

Expand Down Expand Up @@ -334,8 +334,8 @@ int masterconn_init_threads(void) {
// Create the JobPool instance with the specified number of workers, it would be serving
// only this master network thread, thus the number of listeners is 1.
std::vector<int> bgJobPoolFDs(1);
gJobPool = std::make_shared<JobPool>("ma", gNumberOfWorkers, kMaxBackgroundJobsCount, 1,
bgJobPoolFDs);
gJobPool = std::make_shared<MasterJobPool>("ma", gNumberOfWorkers, kMaxBackgroundJobsCount,
1, bgJobPoolFDs);
gJobFD = bgJobPoolFDs[0];
} catch (const std::exception &e) {
safs::log_err("masterconn_init_threads: Failed to create JobPool instance: {}", e.what());
Expand All @@ -357,8 +357,8 @@ int masterconn_init_threads(void) {
// serving only this master network thread, thus the number of listeners is 1.
std::vector<int> replicationJobPoolFDs(1);
gReplicationJobPool =
std::make_shared<JobPool>("ma_repl", gReplicationNumberOfWorkers,
kMaxBackgroundJobsCount, 1, replicationJobPoolFDs);
std::make_shared<MasterJobPool>("ma_repl", gReplicationNumberOfWorkers,
kMaxBackgroundJobsCount, 1, replicationJobPoolFDs);
gReplicationJobFD = replicationJobPoolFDs[0];
} catch (const std::exception &e) {
safs::log_err("masterconn_init_threads: Failed to create ReplicationJobPool instance: {}",
Expand Down
2 changes: 1 addition & 1 deletion src/chunkserver/masterconn.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@
void masterconn_stats(uint64_t *bin, uint64_t *bout, uint32_t *maxjobscnt);
int masterconn_init(void);
int masterconn_init_threads(void);
JobPool* masterconn_get_job_pool();
MasterJobPool* masterconn_get_job_pool();
bool masterconn_canexit();
11 changes: 11 additions & 0 deletions src/chunkserver/network_main_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,17 @@ int mainNetworkThreadInit(void) {
gBgjobsCountPerNetworkWorker = cfg_get_minvalue<uint32_t>(
"BGJOBSCNT_PER_NETWORK_WORKER",
NetworkWorkerThread::kDefaultMaxBackgroundJobsPerNetworkWorker, 10);
std::string ioPriorityModeStr = cfg_getstring("IO_PRIORITY_MODE", "FIFO");
if (ioPriorityModeStr == "SWITCH") {
// Must clearly say that the mode is Switch, otherwise it will be Fifo. This is because Fifo
// is the default and more tested mode.
gIOPriorityMode = IOPriorityMode::Switch;
} else {
gIOPriorityMode = IOPriorityMode::Fifo;
if (ioPriorityModeStr != "FIFO") {
safs::log_warn("Invalid IO_PRIORITY_MODE '{}', defaulting to FIFO", ioPriorityModeStr);
}
}

gMaxBlocksPerHddWriteJob = cfg_get_minmaxvalue<uint16_t>(
"MAX_BLOCKS_PER_HDD_WRITE_JOB", NetworkWorkerThread::kDefaultMaxBlocksPerHddWriteJob,
Expand Down
4 changes: 2 additions & 2 deletions src/chunkserver/network_worker_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ NetworkWorkerThread::NetworkWorkerThread(uint32_t id, uint32_t nrOfBgjobsWorkers
// Create the JobPool instance with the specified number of workers. It would be serving
// only this network worker thread, thus the number of listeners is 1.
std::vector<int> bgJobPoolWakeUpFds(1);
bgJobPool_ =
std::make_unique<JobPool>(name_, nrOfBgjobsWorkers, bgjobsCount, 1, bgJobPoolWakeUpFds);
bgJobPool_ = std::make_unique<ClientJobPool>(name_, nrOfBgjobsWorkers, bgjobsCount, 1,
bgJobPoolWakeUpFds, gIOPriorityMode);
bgJobPoolWakeUpFd_ = bgJobPoolWakeUpFds[0];
} catch (const std::exception &e) {
safs::log_err("NetworkWorkerThread: Failed to create JobPool instance: {}", e.what());
Expand Down
Loading