diff --git a/doc/sfschunkserver.cfg.5.adoc b/doc/sfschunkserver.cfg.5.adoc index dfc804f5d..ce7383f45 100644 --- a/doc/sfschunkserver.cfg.5.adoc +++ b/doc/sfschunkserver.cfg.5.adoc @@ -216,6 +216,13 @@ config value LOG_LEVEL to determine logging level. Valid log levels are - 'crit' or 'critical' - 'off' +*IO_PRIORITY_MODE (EXPERIMENTAL)*:: Define IO jobs, i.e reads and writes from drives, +prioritization scheme. The available modes are: +- 'FIFO': prioritizes the IO jobs first enqueued. +- 'SWITCH': alternates read and write jobs (interleaving) to prevent one class of IO from +starving the other. +Not reloadable. (Default: FIFO) + *TLS_CERT_FILE (EXPERIMENTAL)*:: Path to the TLS certificate file the chunk server will use for TLS connections (there is no default value). diff --git a/src/admin/dump_config_command.cc b/src/admin/dump_config_command.cc index d46e9c237..a1c94c3b0 100644 --- a/src/admin/dump_config_command.cc +++ b/src/admin/dump_config_command.cc @@ -171,6 +171,7 @@ const static std::unordered_map defaultOptionsCS = { {"NR_OF_NETWORK_WORKERS", "4"}, {"NR_OF_HDD_WORKERS_PER_NETWORK_WORKER", "16"}, {"BGJOBSCNT_PER_NETWORK_WORKER", "4000"}, + {"IO_PRIORITY_MODE", "FIFO"}, {"MAX_BLOCKS_PER_HDD_WRITE_JOB", "16"}, {"MAX_BLOCKS_PER_HDD_READ_JOB", "16"}, {"MAX_PARALLEL_HDD_READ_JOBS_PER_CS_ENTRY", "1"}, diff --git a/src/chunkserver/bgjobs.cc b/src/chunkserver/bgjobs.cc index 7965b27ea..c4743a407 100644 --- a/src/chunkserver/bgjobs.cc +++ b/src/chunkserver/bgjobs.cc @@ -49,10 +49,10 @@ constexpr auto kInvalidJob = nullptr; JobPool::JobPool(const std::string &name, uint8_t workers, uint32_t maxJobs, uint32_t nrListeners, - std::vector &wakeupFDs) + std::vector &wakeupFDs, uint8_t numPriorities) // EFD_NONBLOCK to prevent blocking reads/writes - : listenerInfos_(nrListeners), name_(name), workers(workers) { - nrListeners = std::max(nrListeners, 1u); // Ensure at least one listener + : listenerInfos_(std::max(nrListeners, 1U)), name_(name), workers(workers) { + nrListeners = std::max(nrListeners, 1U); // Ensure at least one listener if (wakeupFDs.size() != nrListeners) { safs::log_warn( @@ -72,27 +72,29 @@ JobPool::JobPool(const std::string &name, uint8_t workers, uint32_t maxJobs, uin listenerInfos_[i].nextJobId = 1; } - // Initialize the job queue with a maximum size and two priority levels - jobsQueue = std::make_unique(2, maxJobs); + // Initialize the job queue with a maximum size + if (numPriorities > 1) { + jobsQueue = std::make_unique(numPriorities, maxJobs); + } else { + jobsQueue = std::make_unique(maxJobs); + } + + // NOTE: worker threads are NOT started here. Call start() after the fully-derived + // object is constructed to avoid a vtable-pointer race condition. +} +void JobPool::startWorkers() { for (uint8_t i = 0; i < workers; ++i) { workerThreads.emplace_back(&JobPool::workerThread, this, name_, i); } } JobPool::~JobPool() { - for (uint8_t i = 0; i < workers; ++i) { - // Use priority 1 to ensure exit jobs are processed after all other jobs - jobsQueue->put(0, JobPool::ChunkOperation::Exit, nullptr, 1, 1); - } - - for (auto &thread : workerThreads) { - if (thread.joinable()) { thread.join(); } - } - - for (size_t i = 0; i < listenerInfos_.size(); ++i) { - if (!listenerInfos_[i].statusQueue.empty()) { processCompletedJobs(i); } - } + // stop() could already have been called (e.g. from a derived-class destructor). + // The call here is a safety net; if the pool was never stopped the virtual + // putExitJobToQueue() will resolve to JobPool's version. + // If stop() was already called, this will be a no-op due to the stopped_ guard. + stop(); jobsQueue.reset(); @@ -101,29 +103,22 @@ JobPool::~JobPool() { for (auto &listenerInfo : listenerInfos_) { close(listenerInfo.notifierFD); } } -uint32_t JobPool::addJobIfNotLocked(ChunkWithType chunkWithType, ChunkOperation operation, - JobCallback callback, void *extra, - ProcessJobCallback processJob, uint32_t listenerId) { - std::unique_lock lockedChunkLock(chunkToJobReplyMapMutex_); - auto it = chunkToJobReplyMap_.find(chunkWithType); - if (it != chunkToJobReplyMap_.end() && it->second.writeInitReceived) { - // Chunk is locked, store the job to be added later - auto &lockedChunkData = it->second; +void JobPool::stop() { + if (stopped_.exchange(true, std::memory_order_acq_rel)) { + return; // Already stopped. + } - lockedChunkData.pendingAddJobs.emplace_back( - [this, operation, extra, listenerId, callback = std::move(callback), - processJob = std::move(processJob)]() mutable -> uint32_t { - return addJob(operation, std::move(callback), extra, std::move(processJob), - listenerId); - }); + for (uint8_t i = 0; i < workers; ++i) { + putExitJobToQueue(); + } - return lockedChunkData.lockJobId; // Lock guard is released here + for (auto &thread : workerThreads) { + if (thread.joinable()) { thread.join(); } } - lockedChunkLock.unlock(); // Release the lock before adding the job to avoid deadlocks - // Chunk is not locked, add the job immediately - return JobPool::addJob(operation, std::move(callback), extra, std::move(processJob), - listenerId); + for (size_t i = 0; i < listenerInfos_.size(); ++i) { + if (!listenerInfos_[i].statusQueue.empty()) { processCompletedJobs(i); } + } } uint32_t JobPool::addJob(ChunkOperation operation, JobCallback callback, void *extra, @@ -146,38 +141,9 @@ uint32_t JobPool::addJob(ChunkOperation operation, JobCallback callback, void *e job->state = JobPool::State::Enabled; job->listenerId = listenerId; listenerInfo.jobHash[jobId] = std::move(job); - // Use higher priority (0) for Open, Close and GetBlocks operations - uint8_t priority = - (operation == ChunkOperation::Open || operation == ChunkOperation::GetBlocks || - operation == ChunkOperation::Close) - ? 0 - : 1; - jobsQueue->put(jobId, operation, reinterpret_cast(listenerInfo.jobHash[jobId].get()), - 1, priority); - unprocessedJobs_.fetch_add(1, std::memory_order_relaxed); - return jobId; -} -uint32_t JobPool::addLockJob(JobCallback callback, void *extra, uint32_t listenerId) { - // Check if the listenerId is valid - if (listenerId >= listenerInfos_.size()) { - safs::log_warn("JobPool: {}: Invalid listenerId {} for operation LOCK, resetting to 0", - __func__, listenerId); - listenerId = 0; // Reset to the first listener - } - - auto &listenerInfo = listenerInfos_[listenerId]; - std::unique_lock lock(listenerInfo.jobsMutex); - uint32_t jobId = listenerInfo.nextJobId++; - auto job = std::make_unique(); - job->jobId = jobId; - job->callback = std::move(callback); - // No processJob for lock jobs, as they are just markers for locked chunks - job->extra = extra; - job->state = JobPool::State::Enabled; - job->listenerId = listenerId; - listenerInfo.jobHash[jobId] = std::move(job); - // Not an actual job, but a marker for a locked chunk, so never inserted into the job queue. + putToJobQueue(jobId, operation, reinterpret_cast(listenerInfo.jobHash[jobId].get())); + unprocessedJobs_.fetch_add(1, std::memory_order_relaxed); return jobId; } @@ -331,7 +297,7 @@ void JobPool::workerThread(const std::string &poolName, uint8_t workerId) { uint8_t status = SAUNAFS_STATUS_OK; while (true) { - jobsQueue->get(&jobId, &operation, &jobPtrArg, nullptr); + getFromJobQueue(&jobId, &operation, &jobPtrArg); if (operation == ChunkOperation::Exit) { break; } @@ -410,7 +376,216 @@ bool JobPool::receiveStatus(uint32_t &jobId, uint8_t &status, uint32_t listenerI return true; } -uint32_t job_open(JobPool &jobPool, JobPool::JobCallback callback, uint64_t chunkId, +void JobPool::putExitJobToQueue() { + jobsQueue->put(0, JobPool::ChunkOperation::Exit, nullptr, 1); +} + +void JobPool::putToJobQueue(uint32_t jobId, uint32_t operation, uint8_t *jobPtrArg) { + jobsQueue->put(jobId, operation, jobPtrArg, 1); +} + +void JobPool::getFromJobQueue(uint32_t *jobId, uint32_t *operation, uint8_t **jobPtrArg) { + jobsQueue->get(jobId, operation, jobPtrArg, nullptr); +} + +uint32_t MasterJobPool::addJobIfNotLocked(ChunkWithType chunkWithType, ChunkOperation operation, + JobCallback callback, void *extra, + ProcessJobCallback processJob, uint32_t listenerId) { + std::unique_lock lockedChunkLock(chunkToJobReplyMapMutex_); + auto it = chunkToJobReplyMap_.find(chunkWithType); + if (it != chunkToJobReplyMap_.end() && it->second.writeInitReceived) { + // Chunk is locked, store the job to be added later + auto &lockedChunkData = it->second; + + lockedChunkData.pendingAddJobs.emplace_back( + [this, operation, extra, listenerId, callback = std::move(callback), + processJob = std::move(processJob)]() mutable -> uint32_t { + return addJob(operation, std::move(callback), extra, std::move(processJob), + listenerId); + }); + + return lockedChunkData.lockJobId; // Lock guard is released here + } + lockedChunkLock.unlock(); // Release the lock before adding the job to avoid deadlocks + + // Chunk is not locked, add the job immediately + return addJob(operation, std::move(callback), extra, std::move(processJob), listenerId); +} + +uint32_t MasterJobPool::addLockJob(JobCallback callback, void *extra, uint32_t listenerId) { + // Check if the listenerId is valid + if (listenerId >= listenerInfos_.size()) { + safs::log_warn("{} job pool: {}: Invalid listenerId {} for operation LOCK, resetting to 0", + name_, __func__, listenerId); + listenerId = 0; // Reset to the first listener + } + + auto &listenerInfo = listenerInfos_[listenerId]; + std::unique_lock lock(listenerInfo.jobsMutex); + uint32_t jobId = listenerInfo.nextJobId++; + auto job = std::make_unique(); + job->jobId = jobId; + job->callback = std::move(callback); + // No processJob for lock jobs, as they are just markers for locked chunks + job->extra = extra; + job->state = JobPool::State::Enabled; + job->listenerId = listenerId; + listenerInfo.jobHash[jobId] = std::move(job); + // Not an actual job, but a marker for a locked chunk, so never inserted into the job queue. + return jobId; +} + +bool MasterJobPool::startChunkLock(const JobPool::JobCallback &callback, void *packet, + uint64_t chunkId, ChunkPartType chunkType, uint32_t listenerId) { + std::unique_lock lock(chunkToJobReplyMapMutex_); + if (chunkToJobReplyMap_.contains(ChunkWithType{chunkId, chunkType})) { + lock.unlock(); // Release the lock before logging to avoid extra contention + + safs::log_warn( + "{}: Chunk lock job already exists for chunkId {:016X}, chunkType {}. Treating request " + "as retransmission; not adding a new lock job.", + __func__, chunkId, chunkType.toString()); + return false; + } + + chunkToJobReplyMap_[ChunkWithType{chunkId, chunkType}] = + LockedChunkData(addLockJob(callback, packet, listenerId), listenerId); + return true; +} + +bool MasterJobPool::enforceChunkLock(uint64_t chunkId, ChunkPartType chunkType) { + std::unique_lock lock(chunkToJobReplyMapMutex_); + auto entry = chunkToJobReplyMap_.find(ChunkWithType{chunkId, chunkType}); + if (entry == chunkToJobReplyMap_.end()) { + lock.unlock(); // Release the lock before logging to avoid extra contention + + // Master was not waiting for this lock, just log and return + safs::log_trace("{}: No chunk lock job found for chunkId {:016X}, chunkType {}. Ignoring.", + __func__, chunkId, chunkType.toString()); + return false; + } + + entry->second.writeInitReceived = true; + return true; +} + +bool MasterJobPool::releaseChunkLockEntry(uint64_t chunkId, ChunkPartType chunkType, + const char *callerName, uint32_t &lockJobId, + uint32_t &listenerId, + std::vector &pendingAddJobs) { + std::unique_lock lock(chunkToJobReplyMapMutex_); + auto entry = chunkToJobReplyMap_.find(ChunkWithType{chunkId, chunkType}); + if (entry == chunkToJobReplyMap_.end()) { + lock.unlock(); // Release the lock before logging to avoid extra contention + + // Master was not waiting for this lock, just log and return + safs::log_warn("{}: No chunk lock job found for chunkId {:016X}, chunkType {}. Ignoring.", + callerName, chunkId, chunkType.toString()); + return false; + } + + lockJobId = entry->second.lockJobId; + listenerId = entry->second.listenerId; + pendingAddJobs = std::move(entry->second.pendingAddJobs); + chunkToJobReplyMap_.erase(entry); + return true; +} + +void MasterJobPool::endChunkLock(uint64_t chunkId, ChunkPartType chunkType, uint8_t status) { + uint32_t lockJobId; + uint32_t listenerId; + std::vector pendingAddJobs; + + if (!releaseChunkLockEntry(chunkId, chunkType, __func__, lockJobId, listenerId, + pendingAddJobs)) { + return; // No lock job found, just return + } + + for (const auto &addJobFunc : pendingAddJobs) { addJobFunc(); } + + sendStatus(lockJobId, status, listenerId); +} + +void MasterJobPool::eraseChunkLock(uint64_t chunkId, ChunkPartType chunkType) { + uint32_t lockJobId; + uint32_t listenerId; + std::vector pendingAddJobs; + + if (!releaseChunkLockEntry(chunkId, chunkType, __func__, lockJobId, listenerId, + pendingAddJobs)) { + return; // No lock job found, just return + } + + for (const auto &addJobFunc : pendingAddJobs) { addJobFunc(); } + + // Remove the lock job and the related packet + auto &listenerInfo = listenerInfos_[listenerId]; + std::unique_lock jobsUniqueLock(listenerInfo.jobsMutex); + auto lockJobIterator = listenerInfo.jobHash.find(lockJobId); + + if (lockJobIterator != listenerInfo.jobHash.end()) { + auto *outputPacket = reinterpret_cast(lockJobIterator->second->extra); + delete outputPacket; + listenerInfo.jobHash.erase(lockJobIterator); + } +} + +ClientJobPool::~ClientJobPool() { + // Call stop() while this derived object is fully alive so that virtual + // dispatch resolves to ClientJobPool::putExitJobToQueue(), enqueuing Exit + // at the correct lowest priority and allowing workers to drain the queue. + stop(); +} + +void ClientJobPool::putExitJobToQueue() { + // Enqueue EXIT at the lowest priority for the active I/O priority mode + jobsQueue->put(0, ChunkOperation::Exit, nullptr, 1, getJobPriority(ChunkOperation::Exit)); +} + +uint8_t ClientJobPool::getJobPriority(ChunkOperation operation) { + switch (operation) { + case ChunkOperation::Open: + case ChunkOperation::Close: + case ChunkOperation::GetBlocks: + // Use higher priority (0) for Open, Close and GetBlocks operations + return 0; + case ChunkOperation::Read: + case ChunkOperation::Prefetch: + return kReadLevel; // Read jobs + case ChunkOperation::Exit: + case ChunkOperation::Write: + // Use the lowest priority for Write and Exit operations, with an extra level if in Switch + // mode + return (ioPriorityMode_ == IOPriorityMode::Switch) ? kWriteLevelSwitchMode + : kWriteLevelFifoMode; + default: + return 0; // Default priority for other jobs + } +} + +void ClientJobPool::putToJobQueue(uint32_t jobId, uint32_t operation, uint8_t *jobPtrArg) { + jobsQueue->put(jobId, operation, jobPtrArg, 1, + getJobPriority(static_cast(operation))); +} + +void ClientJobPool::getFromJobQueue(uint32_t *jobId, uint32_t *operation, uint8_t **jobPtrArg) { + if (ioPriorityMode_ == IOPriorityMode::Fifo || stopped_.load()) { + // If we don't have a preferred IO type or if we're stopping, just get the next job without + // custom priority considerations. In switch mode, the priority levels in the queue will + // still ensure that Exit and Write jobs are processed correctly. + jobsQueue->get(jobId, operation, jobPtrArg, nullptr); + return; + } + + // The IOPriorityMode must be Switch if preferredIOType_ is not kPreferAny, so we can use the + // priority feature of the queue to prefer the specified IO type. + uint8_t preferredIOType = preferredIOType_.fetch_xor(kSwitchValue, std::memory_order_relaxed); + uint8_t otherIOType = preferredIOType ^ kSwitchValue; + std::array priorityLevelsToCheck = {0, preferredIOType, otherIOType}; + jobsQueue->getUsingCustomPriority(jobId, operation, jobPtrArg, nullptr, priorityLevelsToCheck); +} + +uint32_t job_open(ClientJobPool &jobPool, JobPool::JobCallback callback, uint64_t chunkId, ChunkPartType chunkType, uint32_t listenerId) { JobPool::ProcessJobCallback processJob = [=]() -> uint8_t { return hddOpen(chunkId, chunkType); @@ -419,7 +594,7 @@ uint32_t job_open(JobPool &jobPool, JobPool::JobCallback callback, uint64_t chun processJob, listenerId); } -uint32_t job_close(JobPool &jobPool, JobPool::JobCallback callback, uint64_t chunkId, +uint32_t job_close(ClientJobPool &jobPool, JobPool::JobCallback callback, uint64_t chunkId, ChunkPartType chunkType, uint32_t listenerId) { JobPool::ProcessJobCallback processJob = [=]() -> uint8_t { return hddClose(chunkId, chunkType); @@ -428,7 +603,7 @@ uint32_t job_close(JobPool &jobPool, JobPool::JobCallback callback, uint64_t chu processJob, listenerId); } -uint32_t job_read(JobPool &jobPool, JobPool::JobCallback callback, uint64_t chunkId, +uint32_t job_read(ClientJobPool &jobPool, JobPool::JobCallback callback, uint64_t chunkId, uint32_t version, ChunkPartType chunkType, uint32_t offset, uint32_t size, uint32_t maxBlocksToBeReadBehind, uint32_t blocksToBeReadAhead, OutputBuffer *outputBuffer, bool performHddOpen, uint32_t listenerId) { @@ -437,9 +612,7 @@ uint32_t job_read(JobPool &jobPool, JobPool::JobCallback callback, uint64_t chun uint8_t status = SAUNAFS_STATUS_OK; if (performHddOpen) { status = hddOpen(chunkId, chunkType); - if (status != SAUNAFS_STATUS_OK) { - return status; - } + if (status != SAUNAFS_STATUS_OK) { return status; } } status = hddRead(chunkId, version, chunkType, offset, size, maxBlocksToBeReadBehind, @@ -458,7 +631,7 @@ uint32_t job_read(JobPool &jobPool, JobPool::JobCallback callback, uint64_t chun processJob, listenerId); } -uint32_t job_prefetch(JobPool &jobPool, uint64_t chunkId, ChunkPartType chunkType, +uint32_t job_prefetch(ClientJobPool &jobPool, uint64_t chunkId, ChunkPartType chunkType, uint32_t firstBlockToBePrefetched, uint32_t blocksToBePrefetched, uint32_t listenerId) { JobPool::ProcessJobCallback processJob = [=]() -> uint8_t { @@ -469,7 +642,7 @@ uint32_t job_prefetch(JobPool &jobPool, uint64_t chunkId, ChunkPartType chunkTyp processJob, listenerId); } -uint32_t job_write(JobPool &jobPool, JobPool::JobCallback callback, uint64_t chunkId, +uint32_t job_write(ClientJobPool &jobPool, JobPool::JobCallback callback, uint64_t chunkId, uint32_t chunkVersion, ChunkPartType chunkType, InputBuffer *inputBuffer, uint32_t listenerId) { JobPool::ProcessJobCallback processJob = [=]() -> uint8_t { @@ -531,7 +704,7 @@ uint32_t job_write(JobPool &jobPool, JobPool::JobCallback callback, uint64_t chu if (op.startBlock != op.endBlock) { safs::log_warn( "job_write: startBlock {} != endBlock {} for chunk id {} when not full " - "blocks. This is not supported, doing first block only.", + "blocks. This is not supported, doing first block only.", op.startBlock, op.endBlock, chunkId); } @@ -539,9 +712,7 @@ uint32_t job_write(JobPool &jobPool, JobPool::JobCallback callback, uint64_t chu op.startBlock, op.offset, op.size, op.crcs[0], op.buffer)); - if (statuses.back() != SAUNAFS_STATUS_OK) { - break; - } + if (statuses.back() != SAUNAFS_STATUS_OK) { break; } } } @@ -558,7 +729,7 @@ uint32_t job_write(JobPool &jobPool, JobPool::JobCallback callback, uint64_t chu processJob, listenerId); } -uint32_t job_get_blocks(JobPool &jobPool, JobPool::JobCallback callback, uint64_t chunkId, +uint32_t job_get_blocks(ClientJobPool &jobPool, JobPool::JobCallback callback, uint64_t chunkId, uint32_t version, ChunkPartType chunkType, uint16_t *blocks, uint32_t listenerId) { JobPool::ProcessJobCallback processJob = [=]() -> uint8_t { @@ -568,7 +739,7 @@ uint32_t job_get_blocks(JobPool &jobPool, JobPool::JobCallback callback, uint64_ processJob, listenerId); } -uint32_t job_replicate(JobPool &jobPool, JobPool::JobCallback callback, void *extra, +uint32_t job_replicate(MasterJobPool &jobPool, JobPool::JobCallback callback, void *extra, uint64_t chunkId, uint32_t chunkVersion, ChunkPartType chunkType, uint32_t sourcesBufferSize, const uint8_t *sourcesBufferPtr, uint32_t listenerId) { @@ -592,15 +763,16 @@ uint32_t job_replicate(JobPool &jobPool, JobPool::JobCallback callback, void *ex processJob, listenerId); } -uint32_t job_invalid(JobPool &jobPool, JobPool::JobCallback callback, void *extra, +uint32_t job_invalid(MasterJobPool &jobPool, JobPool::JobCallback callback, void *extra, uint32_t listenerId) { JobPool::ProcessJobCallback processJob = [=]() -> uint8_t { return SAUNAFS_ERROR_EINVAL; }; return jobPool.addJob(JobPool::ChunkOperation::Invalid, std::move(callback), extra, processJob, listenerId); } -uint32_t job_delete(JobPool &jobPool, JobPool::JobCallback callback, void *extra, uint64_t chunkId, - uint32_t chunkVersion, ChunkPartType chunkType, uint32_t listenerId) { +uint32_t job_delete(MasterJobPool &jobPool, JobPool::JobCallback callback, void *extra, + uint64_t chunkId, uint32_t chunkVersion, ChunkPartType chunkType, + uint32_t listenerId) { JobPool::ProcessJobCallback processJob = [=]() -> uint8_t { return hddInternalDelete(chunkId, chunkVersion, chunkType); }; @@ -609,8 +781,9 @@ uint32_t job_delete(JobPool &jobPool, JobPool::JobCallback callback, void *extra processJob, listenerId); } -uint32_t job_create(JobPool &jobPool, JobPool::JobCallback callback, void *extra, uint64_t chunkId, - uint32_t chunkVersion, ChunkPartType chunkType, uint32_t listenerId) { +uint32_t job_create(MasterJobPool &jobPool, JobPool::JobCallback callback, void *extra, + uint64_t chunkId, uint32_t chunkVersion, ChunkPartType chunkType, + uint32_t listenerId) { JobPool::ProcessJobCallback processJob = [=]() -> uint8_t { return hddInternalCreate(chunkId, chunkVersion, chunkType); }; @@ -618,7 +791,7 @@ uint32_t job_create(JobPool &jobPool, JobPool::JobCallback callback, void *extra listenerId); } -uint32_t job_version(JobPool &jobPool, const JobPool::JobCallback &callback, void *extra, +uint32_t job_version(MasterJobPool &jobPool, const JobPool::JobCallback &callback, void *extra, uint64_t chunkId, uint32_t chunkVersion, ChunkPartType chunkType, uint32_t newChunkVersion, uint32_t listenerId) { if (newChunkVersion > 0) { @@ -632,7 +805,7 @@ uint32_t job_version(JobPool &jobPool, const JobPool::JobCallback &callback, voi return job_invalid(jobPool, callback, extra, listenerId); } -uint32_t job_truncate(JobPool &jobPool, const JobPool::JobCallback &callback, void *extra, +uint32_t job_truncate(MasterJobPool &jobPool, const JobPool::JobCallback &callback, void *extra, uint64_t chunkId, ChunkPartType chunkType, uint32_t chunkVersion, uint32_t newChunkVersion, uint32_t length, uint32_t listenerId) { if (newChunkVersion > 0) { @@ -646,7 +819,7 @@ uint32_t job_truncate(JobPool &jobPool, const JobPool::JobCallback &callback, vo return job_invalid(jobPool, callback, extra, listenerId); } -uint32_t job_duplicate(JobPool &jobPool, const JobPool::JobCallback &callback, void *extra, +uint32_t job_duplicate(MasterJobPool &jobPool, const JobPool::JobCallback &callback, void *extra, uint64_t chunkId, uint32_t chunkVersion, uint32_t newChunkVersion, ChunkPartType chunkType, uint64_t chunkIdCopy, uint32_t chunkVersionCopy, uint32_t listenerId) { @@ -662,7 +835,7 @@ uint32_t job_duplicate(JobPool &jobPool, const JobPool::JobCallback &callback, v return job_invalid(jobPool, callback, extra, listenerId); } -uint32_t job_duptrunc(JobPool &jobPool, const JobPool::JobCallback &callback, void *extra, +uint32_t job_duptrunc(MasterJobPool &jobPool, const JobPool::JobCallback &callback, void *extra, uint64_t chunkId, uint32_t chunkVersion, uint32_t newChunkVersion, ChunkPartType chunkType, uint64_t chunkIdCopy, uint32_t chunkVersionCopy, uint32_t length, uint32_t listenerId) { @@ -677,97 +850,3 @@ uint32_t job_duptrunc(JobPool &jobPool, const JobPool::JobCallback &callback, vo } return job_invalid(jobPool, callback, extra, listenerId); } - -bool JobPool::startChunkLock(const JobPool::JobCallback &callback, void *packet, uint64_t chunkId, - ChunkPartType chunkType, uint32_t listenerId) { - std::unique_lock lock(chunkToJobReplyMapMutex_); - if (chunkToJobReplyMap_.contains(ChunkWithType{chunkId, chunkType})) { - lock.unlock(); // Release the lock before logging to avoid extra contention - - safs::log_warn( - "{}: Chunk lock job already exists for chunkId {:016X}, chunkType {}. Treating request " - "as retransmission; not adding a new lock job.", - __func__, chunkId, chunkType.toString()); - return false; - } - - chunkToJobReplyMap_[ChunkWithType{chunkId, chunkType}] = - LockedChunkData(addLockJob(callback, packet, listenerId), listenerId); - return true; -} - -bool JobPool::enforceChunkLock(uint64_t chunkId, ChunkPartType chunkType) { - std::unique_lock lock(chunkToJobReplyMapMutex_); - auto entry = chunkToJobReplyMap_.find(ChunkWithType{chunkId, chunkType}); - if (entry == chunkToJobReplyMap_.end()) { - lock.unlock(); // Release the lock before logging to avoid extra contention - - // Master was not waiting for this lock, just log and return - safs::log_trace("{}: No chunk lock job found for chunkId {:016X}, chunkType {}. Ignoring.", - __func__, chunkId, chunkType.toString()); - return false; - } - - entry->second.writeInitReceived = true; - return true; -} - -bool JobPool::releaseChunkLockEntry(uint64_t chunkId, ChunkPartType chunkType, - const char *callerName, uint32_t &lockJobId, - uint32_t &listenerId, std::vector &pendingAddJobs) { - std::unique_lock lock(chunkToJobReplyMapMutex_); - auto entry = chunkToJobReplyMap_.find(ChunkWithType{chunkId, chunkType}); - if (entry == chunkToJobReplyMap_.end()) { - lock.unlock(); // Release the lock before logging to avoid extra contention - - // Master was not waiting for this lock, just log and return - safs::log_warn("{}: No chunk lock job found for chunkId {:016X}, chunkType {}. Ignoring.", - callerName, chunkId, chunkType.toString()); - return false; - } - - lockJobId = entry->second.lockJobId; - listenerId = entry->second.listenerId; - pendingAddJobs = std::move(entry->second.pendingAddJobs); - chunkToJobReplyMap_.erase(entry); - return true; -} - -void JobPool::endChunkLock(uint64_t chunkId, ChunkPartType chunkType, uint8_t status) { - uint32_t lockJobId; - uint32_t listenerId; - std::vector pendingAddJobs; - - if (!releaseChunkLockEntry(chunkId, chunkType, __func__, lockJobId, listenerId, - pendingAddJobs)) { - return; // No lock job found, just return - } - - for (const auto &addJobFunc : pendingAddJobs) { addJobFunc(); } - - sendStatus(lockJobId, status, listenerId); -} - -void JobPool::eraseChunkLock(uint64_t chunkId, ChunkPartType chunkType) { - uint32_t lockJobId; - uint32_t listenerId; - std::vector pendingAddJobs; - - if (!releaseChunkLockEntry(chunkId, chunkType, __func__, lockJobId, listenerId, - pendingAddJobs)) { - return; // No lock job found, just return - } - - for (const auto &addJobFunc : pendingAddJobs) { addJobFunc(); } - - // Remove the lock job and the related packet - auto &listenerInfo = listenerInfos_[listenerId]; - std::unique_lock jobsUniqueLock(listenerInfo.jobsMutex); - auto lockJobIterator = listenerInfo.jobHash.find(lockJobId); - - if (lockJobIterator != listenerInfo.jobHash.end()) { - auto *outputPacket = reinterpret_cast(lockJobIterator->second->extra); - delete outputPacket; - listenerInfo.jobHash.erase(lockJobIterator); - } -} diff --git a/src/chunkserver/bgjobs.h b/src/chunkserver/bgjobs.h index 1a76f7f39..45a5f1a05 100644 --- a/src/chunkserver/bgjobs.h +++ b/src/chunkserver/bgjobs.h @@ -38,6 +38,14 @@ constexpr auto kEmptyCallback = nullptr; constexpr auto kEmptyExtra = nullptr; +/// @brief Mode for consumer threads to pick IO jobs from the JobPool. +enum class IOPriorityMode : uint8_t { + Fifo, ///< Process IO jobs in the order they were added, regardless of type. + Switch ///< Switch between read and write jobs to prevent starvation of either type. +}; + +inline IOPriorityMode gIOPriorityMode; + /** * @class JobPool * @brief Manages and processes background jobs in a thread pool. @@ -59,22 +67,22 @@ class JobPool { /// @enum ChunkOperation /// @brief Represents the type of operation to be performed on a chunk. - enum ChunkOperation { - Exit, - Invalid, - ChangeVersion, - Duplicate, - Truncate, - DuplicateTruncate, - Delete, - Create, - Open, - Close, - Read, - Prefetch, - Write, - Replicate, - GetBlocks + enum ChunkOperation : uint8_t { + Exit, ///< Special operation to signal worker threads to exit. Master and client. + Invalid, ///< Invalid operation, used for testing and error handling. Master and client. + ChangeVersion, ///< Change the version of a chunk. Master only. + Duplicate, ///< Duplicate a chunk. Master only. + Truncate, ///< Truncate a chunk. Master only. + DuplicateTruncate, ///< Duplicate and truncate a chunk. Master only. + Delete, ///< Delete a chunk. Master only. + Create, ///< Create a chunk. Master only. + Replicate, ///< Replicate a chunk. Master only. + Open, ///< Open a chunk for reading or writing. Client only. + Close, ///< Close a chunk. Client only. + GetBlocks, ///< Get the blocks of a chunk. Client (actually other CS) only. + Read, ///< Read data from a chunk. Client only. + Prefetch, ///< Prefetch data from a chunk. Client only. + Write ///< Write data to a chunk. Client only. }; /// @brief Callback function type for job completion. @@ -100,12 +108,31 @@ class JobPool { /// @param maxJobs The maximum number of jobs that can be queued. /// @param nrListeners The number of listeners that will use this JobPool. /// @param wakeupFDs A vector of file descriptors for wakeup notifications. + /// @param numPriorities The number of priority levels for the job queue. /// @throws std::runtime_error If the pipe creation fails. + /// @note After construction, call start() to spawn worker threads. This two-phase + /// init avoids a vtable-pointer race when constructing derived classes. JobPool(const std::string &name, uint8_t workers, uint32_t maxJobs, uint32_t nrListeners, - std::vector &wakeupFDs); + std::vector &wakeupFDs, uint8_t numPriorities = 1); + + /// @brief Spawns the worker threads. + /// + /// Must be called once, after the fully-derived object has been constructed, so + /// that virtual dispatch in the worker threads resolves correctly. Calling it + /// more than once or before the object is fully constructed is undefined behaviour. + void startWorkers(); /// @brief Destructor for JobPool. - ~JobPool(); + /// @note stop() must have been called before destruction (e.g. from a derived class destructor) + /// to ensure worker threads are shut down correctly. ~JobPool() only releases resources. + virtual ~JobPool(); + + /// @brief Shuts down all worker threads and drains pending status. + /// + /// Enqueues one Exit job per worker (via the virtual putExitJobToQueue() so + /// derived classes use the correct priority), joins all threads, and drains + /// any remaining status queues. Safe to call more than once. + void stop(); /// @brief Adds a job to the JobPool. /// @@ -118,27 +145,6 @@ class JobPool { uint32_t addJob(ChunkOperation operation, JobCallback callback, void *extra, ProcessJobCallback processJob, uint32_t listenerId = 0); - /// @brief Adds a job to the JobPool if the chunk is not locked. - /// If the chunk is locked, the job will be stored and added once the lock is released. - /// @param chunkWithType The chunk and its type associated with the job. - /// @param operation The type of operation to be performed on the chunk. - /// @param callback The callback function to be called upon job completion. - /// @param extra Additional data to be passed to the callback. - /// @param processJob The callback function to process the job. - /// @param listenerId The ID of the listener associated with the job. - /// @return The ID of the added job, or the lock job ID if the chunk is locked. - uint32_t addJobIfNotLocked(ChunkWithType chunkWithType, ChunkOperation operation, - JobCallback callback, void *extra, ProcessJobCallback processJob, - uint32_t listenerId = 0); - - /// @brief Adds a lock job to the JobPool for a specific chunk and type. - /// This function is used to create a lock job that will be associated with a locked chunk. - /// @param callback The callback function to be called when the lock is released. - /// @param extra Additional data to be passed to the callback. - /// @param listenerId The ID of the listener associated with the job. - /// @return The ID of the added lock job. - uint32_t addLockJob(JobCallback callback, void *extra, uint32_t listenerId = 0); - /// @brief Returns whether all jobs in the JobPool have been processed by the worker threads. /// This is a very accurate way to check if there are pending jobs in the JobPool, as it counts /// the number of jobs that have been added but not yet passed by processCompletedJobs. Must @@ -196,6 +202,109 @@ class JobPool { void changeCallback(std::list &jobIds, const JobCallback &callback, uint32_t listenerId = 0); +protected: + /// @brief Represents a job in the JobPool. + struct Job { + uint32_t jobId; // The ID of the job. + JobCallback callback; // The callback function to be called upon job completion. + ProcessJobCallback processJob; // The callback function to process the job. + void *extra; // Additional data for the callback. + JobPool::State state; // The state of the job. + uint32_t listenerId; // The ID of the listener associated with the job. + }; + + /// @brief Structure to hold information about a listener. + struct ListenerInfo { + int notifierFD; /// File descriptor for notifications. + std::mutex notifierMutex; /// Mutex for event notifications. + std::mutex jobsMutex; /// Mutex for job operations. + std::queue> statusQueue; /// Queue for job statuses. + std::unordered_map> jobHash; /// Hash map of job. + uint32_t nextJobId; /// Next job ID to be assigned. + }; + + /// @brief Worker thread function. + /// @param poolName Parent pool name, used to name the specific thread. + /// @param workerId Worker index in this pool, used to name the thread. + void workerThread(const std::string &poolName, uint8_t workerId); + + /// @brief Sends the status of a job. + /// + /// @param jobId The ID of the job. + /// @param status The status of the job. + /// @param listenerId The ID of the listener associated with the job. + void sendStatus(uint32_t jobId, uint8_t status, uint32_t listenerId = 0); + + /// @brief Receives the status of a job. + /// + /// @param jobId The ID of the job. + /// @param status The status of the job. + /// @param listenerId The ID of the listener associated with the job. + /// @return 1 if a status is not the last one, 0 if it is the last status. + bool receiveStatus(uint32_t &jobId, uint8_t &status, uint32_t listenerId = 0); + + /// @brief Puts an exit job into the job queue. + virtual void putExitJobToQueue(); + + /// @brief Puts a job into the job queue. + /// @param jobId The ID of the job to be added. + /// @param operation The type of operation to be performed on the chunk. + /// @param jobPtrArg A pointer to the data of the job to be added. + virtual void putToJobQueue(uint32_t jobId, uint32_t operation, uint8_t *jobPtrArg); + + /// @brief Gets a job from the job queue. + /// @param jobId The ID of the job to be retrieved. + /// @param operation The type of operation to be performed on the chunk. + /// @param jobPtrArg A pointer to the data of the job to be retrieved. + virtual void getFromJobQueue(uint32_t *jobId, uint32_t *operation, uint8_t **jobPtrArg); + + std::vector listenerInfos_; /// Vector of listener information. + std::string name_; /// Human readable id of the JobPool. + uint8_t workers; /// Number of worker threads in the pool. + std::vector workerThreads; /// Vector of worker threads. + std::unique_ptr jobsQueue; /// Queue for jobs. + /// Counter for unprocessed jobs, i.e jobs that have been added to the JobPool but have not yet + /// been passed by processCompletedJobs and had their callbacks called. This is used to make + /// sure the JobPool is truly empty when stopping the chunkserver. + std::atomic unprocessedJobs_{0}; + /// Guards against double-shutdown (stop() called more than once or from destructor). + std::atomic stopped_{false}; +}; + +/** + * @class MasterJobPool + * @brief Specialized JobPool for managing master server related jobs with chunk lock handling. + * + * The MasterJobPool class extends the JobPool class to provide additional functionality for + * managing jobs related to master server operations, including handling chunk locks. + */ +class MasterJobPool : public JobPool { +public: + /// @brief Constructor for MasterJobPool. + /// @param name Human readable name for this pool, useful for debugging. + /// @param workers The number of worker threads in the pool. + /// @param maxJobs The maximum number of jobs that can be queued. + /// @param nrListeners The number of listeners that will use this JobPool. + /// @param wakeupFDs A vector of file descriptors for wakeup notifications. + MasterJobPool(const std::string &name, uint8_t workers, uint32_t maxJobs, uint32_t nrListeners, + std::vector &wakeupFDs) + : JobPool(name, workers, maxJobs, nrListeners, wakeupFDs) { + startWorkers(); + } + + /// @brief Adds a job to the JobPool if the chunk is not locked. + /// If the chunk is locked, the job will be stored and added once the lock is released. + /// @param chunkWithType The chunk and its type associated with the job. + /// @param operation The type of operation to be performed on the chunk. + /// @param callback The callback function to be called upon job completion. + /// @param extra Additional data to be passed to the callback. + /// @param processJob The callback function to process the job. + /// @param listenerId The ID of the listener associated with the job. + /// @return The ID of the added job, or the lock job ID if the chunk is locked. + uint32_t addJobIfNotLocked(ChunkWithType chunkWithType, ChunkOperation operation, + JobCallback callback, void *extra, ProcessJobCallback processJob, + uint32_t listenerId = 0); + /// @brief Starts a chunk lock job for a specific chunk and type. /// This function is triggered when the master server sends a chunk lock request for a chunk /// that is not currently locked. It adds a lock job to the JobPool and associates it with the @@ -220,20 +329,6 @@ class JobPool { /// chunk. bool enforceChunkLock(uint64_t chunkId, ChunkPartType chunkType); - /// @brief Releases a chunk lock entry for a specific chunk and type. - /// This is a helper function that returns the lock job ID, listener ID, and pending jobs - /// associated with a locked chunk if found, and removes the lock entry from the internal map. - /// @param chunkId The ID of the chunk to release the lock on. - /// @param chunkType The type of the chunk to release the lock on. - /// @param callerName The name of the function calling this helper, used for logging. - /// @param lockJobId The ID of the lock job associated with the chunk. - /// @param listenerId The ID of the listener associated with the lock job. - /// @param pendingAddJobs The list of pending jobs associated with the locked chunk. - /// @return true if the lock entry was found and released, false otherwise. - bool releaseChunkLockEntry(uint64_t chunkId, ChunkPartType chunkType, const char *callerName, - uint32_t &lockJobId, uint32_t &listenerId, - std::vector &pendingAddJobs); - /// @brief Ends a chunk lock for a specific chunk and type. /// This function is called when cleaning up the write operation on the locked chunk, to end the /// lock and allow any pending jobs to be added and processed. It sends the write end status to @@ -252,46 +347,6 @@ class JobPool { void eraseChunkLock(uint64_t chunkId, ChunkPartType chunkType); private: - /// @brief Represents a job in the JobPool. - struct Job { - uint32_t jobId; // The ID of the job. - JobCallback callback; // The callback function to be called upon job completion. - ProcessJobCallback processJob; // The callback function to process the job. - void *extra; // Additional data for the callback. - JobPool::State state; // The state of the job. - uint32_t listenerId; // The ID of the listener associated with the job. - }; - - /// @brief Worker thread function. - /// @param poolName Parent pool name, used to name the specific thread. - /// @param workerId Worker index in this pool, used to name the thread. - void workerThread(const std::string &poolName, uint8_t workerId); - - /// @brief Sends the status of a job. - /// - /// @param jobId The ID of the job. - /// @param status The status of the job. - /// @param listenerId The ID of the listener associated with the job. - void sendStatus(uint32_t jobId, uint8_t status, uint32_t listenerId = 0); - - /// @brief Receives the status of a job. - /// - /// @param jobId The ID of the job. - /// @param status The status of the job. - /// @param listenerId The ID of the listener associated with the job. - /// @return 1 if a status is not the last one, 0 if it is the last status. - bool receiveStatus(uint32_t &jobId, uint8_t &status, uint32_t listenerId = 0); - - /// @brief Structure to hold information about a listener. - struct ListenerInfo { - int notifierFD; /// File descriptor for notifications. - std::mutex notifierMutex; /// Mutex for event notifications. - std::mutex jobsMutex; /// Mutex for job operations. - std::queue> statusQueue; /// Queue for job statuses. - std::unordered_map> jobHash; /// Hash map of job. - uint32_t nextJobId; /// Next job ID to be assigned. - }; - /// @brief Structure to hold information about a locked chunk. struct LockedChunkData { uint32_t lockJobId; /// The ID of the lock job associated with the locked chunk. @@ -307,31 +362,121 @@ class JobPool { LockedChunkData() = default; }; + /// @brief Adds a lock job to the JobPool for a specific chunk and type. + /// This function is used to create a lock job that will be associated with a locked chunk. + /// @param callback The callback function to be called when the lock is released. + /// @param extra Additional data to be passed to the callback. + /// @param listenerId The ID of the listener associated with the job. + /// @return The ID of the added lock job. + uint32_t addLockJob(JobCallback callback, void *extra, uint32_t listenerId = 0); + + /// @brief Releases a chunk lock entry for a specific chunk and type. + /// This is a helper function that returns the lock job ID, listener ID, and pending jobs + /// associated with a locked chunk if found, and removes the lock entry from the internal map. + /// @param chunkId The ID of the chunk to release the lock on. + /// @param chunkType The type of the chunk to release the lock on. + /// @param callerName The name of the function calling this helper, used for logging. + /// @param lockJobId The ID of the lock job associated with the chunk. + /// @param listenerId The ID of the listener associated with the lock job. + /// @param pendingAddJobs The list of pending jobs associated with the locked chunk. + /// @return true if the lock entry was found and released, false otherwise. + bool releaseChunkLockEntry(uint64_t chunkId, ChunkPartType chunkType, const char *callerName, + uint32_t &lockJobId, uint32_t &listenerId, + std::vector &pendingAddJobs); + /// Mutex to protect access to the chunkToJobReplyMap_. std::mutex chunkToJobReplyMapMutex_; /// Map to associate locked chunks with their corresponding lock job and pending jobs. std::unordered_map chunkToJobReplyMap_; - std::vector listenerInfos_; /// Vector of listener information. - std::string name_; /// Human readable id of the JobPool. - uint8_t workers; /// Number of worker threads in the pool. - std::vector workerThreads; /// Vector of worker threads. - std::unique_ptr jobsQueue; /// Queue for jobs. - /// Counter for unprocessed jobs, i.e jobs that have been added to the JobPool but have not yet - /// been passed by processCompletedJobs and had their callbacks called. This is used to make - /// sure the JobPool is truly empty when stopping the chunkserver. - std::atomic unprocessedJobs_{0}; }; -/// @brief Adds an open job to the JobPool. +/** + * @class ClientJobPool + * @brief Specialized JobPool for managing client server related jobs. + * + * The ClientJobPool class extends the JobPool class to provide additional functionality for + * managing jobs related to client server operations. It includes a mechanism to prioritize read and + * write jobs based on the IOPriorityMode. + */ +class ClientJobPool : public JobPool { +public: + /// @brief Constructor for ClientJobPool. + /// @param name Human readable name for this pool, useful for debugging. + /// @param workers The number of worker threads in the pool. + /// @param maxJobs The maximum number of jobs that can be queued. + /// @param nrListeners The number of listeners that will use this JobPool. + /// @param wakeupFDs A vector of file descriptors for wakeup notifications. + ClientJobPool(const std::string &name, uint8_t workers, uint32_t maxJobs, uint32_t nrListeners, + std::vector &wakeupFDs, IOPriorityMode ioPriorityMode) + : JobPool(name, workers, maxJobs, nrListeners, wakeupFDs, + ioPriorityMode == IOPriorityMode::Fifo ? 2 : 3), + ioPriorityMode_(ioPriorityMode) { + if (ioPriorityMode_ == IOPriorityMode::Switch) { + preferredIOType_.store(kPreferRead); + } else { + preferredIOType_.store(kPreferAny); + } + + startWorkers(); + } + + /// @brief Destructor for ClientJobPool. + /// Calls stop() while the derived object is still alive so that + /// putExitJobToQueue() virtual dispatch resolves to the correct override, + /// enqueuing Exit at the right (lowest) priority. + ~ClientJobPool() override; + +private: + constexpr static uint8_t kPreferAny = 0; + constexpr static uint8_t kReadLevel = 1; + constexpr static uint8_t kWriteLevelFifoMode = kReadLevel; + constexpr static uint8_t kWriteLevelSwitchMode = 2; + + constexpr static uint8_t kPreferRead = kReadLevel; + constexpr static uint8_t kPreferWrite = kWriteLevelSwitchMode; + constexpr static uint8_t kSwitchValue = kPreferRead ^ kPreferWrite; + + /// @brief Gets the job priority based on the operation type. + /// @param operation The type of operation to be performed on the chunk. + /// @return The priority of the job, where lower values indicate higher priority. + uint8_t getJobPriority(ChunkOperation operation); + + /// @brief Puts an exit job into the client job queue. + void putExitJobToQueue() override; + + /// @brief Puts a job into the client job queue. + /// @note The ClientJobPool uses the priority parameter of the put function to give higher + /// priority to Open, Close and GetBlocks operations. + /// @param jobId The ID of the job to be added. + /// @param operation The type of operation to be performed on the chunk. + /// @param jobPtrArg A pointer to the data of the job to be added. + void putToJobQueue(uint32_t jobId, uint32_t operation, uint8_t *jobPtrArg) override; + + /// @brief Gets a job from the client job queue. + /// @note The ClientJobPool uses the preferredIOType_ member to switch between preferring read + /// and write jobs when the IOPriorityMode is set to Switch. The preferredIOType_ is updated + /// every time a job is retrieved from the queue if in switch mode, to give more balanced access + /// to read and write operations. + /// @param jobId The ID of the job to be retrieved. + /// @param operation The type of operation to be performed on the chunk. + /// @param jobPtrArg A pointer to the data of the job to be retrieved. + void getFromJobQueue(uint32_t *jobId, uint32_t *operation, uint8_t **jobPtrArg) override; + + /// The preferred IO type for the Switch mode, used to switch between read and write jobs. + std::atomic preferredIOType_; + IOPriorityMode ioPriorityMode_; +}; + +/// @brief Adds an open job to the ClientJobPool. /// -/// @param jobPool The JobPool instance. +/// @param jobPool The ClientJobPool instance. /// @param callback The callback function to be called upon job completion. /// @param chunkId The ID of the chunk. /// @param chunkType The type of the chunk. /// @param listenerId The ID of the listener associated with the job. /// @return The ID of the added job. -uint32_t job_open(JobPool &jobPool, JobPool::JobCallback callback, uint64_t chunkId, +uint32_t job_open(ClientJobPool &jobPool, JobPool::JobCallback callback, uint64_t chunkId, ChunkPartType chunkType, uint32_t listenerId = 0); /// @brief Adds a close job to the JobPool. @@ -342,12 +487,12 @@ uint32_t job_open(JobPool &jobPool, JobPool::JobCallback callback, uint64_t chun /// @param chunkType The type of the chunk. /// @param listenerId The ID of the listener associated with the job. /// @return The ID of the added job. -uint32_t job_close(JobPool &jobPool, JobPool::JobCallback callback, uint64_t chunkId, +uint32_t job_close(ClientJobPool &jobPool, JobPool::JobCallback callback, uint64_t chunkId, ChunkPartType chunkType, uint32_t listenerId = 0); -/// @brief Adds a read job to the JobPool. +/// @brief Adds a read job to the ClientJobPool. /// -/// @param jobPool The JobPool instance. +/// @param jobPool The ClientJobPool instance. /// @param callback The callback function to be called upon job completion. /// @param chunkId The ID of the chunk. /// @param version The version of the chunk. @@ -360,27 +505,27 @@ uint32_t job_close(JobPool &jobPool, JobPool::JobCallback callback, uint64_t chu /// @param performHddOpen Whether to perform HDD open. /// @param listenerId The ID of the listener associated with the job. /// @return The ID of the added job. -uint32_t job_read(JobPool &jobPool, JobPool::JobCallback callback, uint64_t chunkId, +uint32_t job_read(ClientJobPool &jobPool, JobPool::JobCallback callback, uint64_t chunkId, uint32_t version, ChunkPartType chunkType, uint32_t offset, uint32_t size, uint32_t maxBlocksToBeReadBehind, uint32_t blocksToBeReadAhead, OutputBuffer *outputBuffer, bool performHddOpen, uint32_t listenerId = 0); -/// @brief Adds a prefetch job to the JobPool. +/// @brief Adds a prefetch job to the ClientJobPool. /// -/// @param jobPool The JobPool instance. +/// @param jobPool The ClientJobPool instance. /// @param chunkId The ID of the chunk. /// @param chunkType The type of the chunk. /// @param firstBlockToBePrefetched The first block to be prefetched. /// @param blocksToBePrefetched The number of blocks to be prefetched. /// @param listenerId The ID of the listener associated with the job. /// @return The ID of the added job. -uint32_t job_prefetch(JobPool &jobPool, uint64_t chunkId, ChunkPartType chunkType, +uint32_t job_prefetch(ClientJobPool &jobPool, uint64_t chunkId, ChunkPartType chunkType, uint32_t firstBlockToBePrefetched, uint32_t blocksToBePrefetched, uint32_t listenerId = 0); -/// @brief Adds a write job to the JobPool. +/// @brief Adds a write job to the ClientJobPool. /// -/// @param jobPool The JobPool instance. +/// @param jobPool The ClientJobPool instance. /// @param callback The callback function to be called upon job completion. /// @param chunkId The ID of the chunk. /// @param chunkVersion The version of the chunk. @@ -392,13 +537,13 @@ uint32_t job_prefetch(JobPool &jobPool, uint64_t chunkId, ChunkPartType chunkTyp /// @param buffer The data buffer to write. /// @param listenerId The ID of the listener associated with the job. /// @return The ID of the added job. -uint32_t job_write(JobPool &jobPool, JobPool::JobCallback callback, uint64_t chunkId, +uint32_t job_write(ClientJobPool &jobPool, JobPool::JobCallback callback, uint64_t chunkId, uint32_t chunkVersion, ChunkPartType chunkType, InputBuffer *inputBuffer, uint32_t listenerId = 0); -/// @brief Adds a get blocks job to the JobPool. +/// @brief Adds a get blocks job to the ClientJobPool. /// -/// @param jobPool The JobPool instance. +/// @param jobPool The ClientJobPool instance. /// @param callback The callback function to be called upon job completion. /// @param chunkId The ID of the chunk. /// @param version The version of the chunk. @@ -406,13 +551,13 @@ uint32_t job_write(JobPool &jobPool, JobPool::JobCallback callback, uint64_t chu /// @param blocks The blocks to get. /// @param listenerId The ID of the listener associated with the job. /// @return The ID of the added job. -uint32_t job_get_blocks(JobPool &jobPool, JobPool::JobCallback callback, uint64_t chunkId, +uint32_t job_get_blocks(ClientJobPool &jobPool, JobPool::JobCallback callback, uint64_t chunkId, uint32_t version, ChunkPartType chunkType, uint16_t *blocks, uint32_t listenerId = 0); /// @brief Adds a replicate job to the JobPool. /// -/// @param jobPool The JobPool instance. +/// @param jobPool The MasterJobPool instance. /// @param callback The callback function to be called upon job completion. /// @param extra Additional data to be passed to the callback. /// @param chunkId The ID of the chunk. @@ -422,24 +567,24 @@ uint32_t job_get_blocks(JobPool &jobPool, JobPool::JobCallback callback, uint64_ /// @param sourcesBuffer The sources buffer. /// @param listenerId The ID of the listener associated with the job. /// @return The ID of the added job. -uint32_t job_replicate(JobPool &jobPool, JobPool::JobCallback callback, void *extra, +uint32_t job_replicate(MasterJobPool &jobPool, JobPool::JobCallback callback, void *extra, uint64_t chunkId, uint32_t chunkVersion, ChunkPartType chunkType, uint32_t sourcesBufferSize, const uint8_t *sourcesBuffer, uint32_t listenerId = 0); /// @brief Adds an invalid job to the JobPool. /// -/// @param jobPool The JobPool instance. +/// @param jobPool The MasterJobPool instance. /// @param callback The callback function to be called upon job completion. /// @param extra Additional data to be passed to the callback. /// @param listenerId The ID of the listener associated with the job. /// @return The ID of the added job. -uint32_t job_invalid(JobPool &jobPool, JobPool::JobCallback callback, void *extra, +uint32_t job_invalid(MasterJobPool &jobPool, JobPool::JobCallback callback, void *extra, uint32_t listenerId = 0); /// @brief Adds a delete job to the JobPool. /// -/// @param jobPool The JobPool instance. +/// @param jobPool The MasterJobPool instance. /// @param callback The callback function to be called upon job completion. /// @param extra Additional data to be passed to the callback. /// @param chunkId The ID of the chunk. @@ -447,12 +592,13 @@ uint32_t job_invalid(JobPool &jobPool, JobPool::JobCallback callback, void *extr /// @param chunkType The type of the chunk. /// @param listenerId The ID of the listener associated with the job. /// @return The ID of the added job. -uint32_t job_delete(JobPool &jobPool, JobPool::JobCallback callback, void *extra, uint64_t chunkId, - uint32_t chunkVersion, ChunkPartType chunkType, uint32_t listenerId = 0); +uint32_t job_delete(MasterJobPool &jobPool, JobPool::JobCallback callback, void *extra, + uint64_t chunkId, uint32_t chunkVersion, ChunkPartType chunkType, + uint32_t listenerId = 0); /// @brief Adds a create job to the JobPool. /// -/// @param jobPool The JobPool instance. +/// @param jobPool The MasterJobPool instance. /// @param callback The callback function to be called upon job completion. /// @param extra Additional data to be passed to the callback. /// @param chunkId The ID of the chunk. @@ -460,12 +606,13 @@ uint32_t job_delete(JobPool &jobPool, JobPool::JobCallback callback, void *extra /// @param chunkType The type of the chunk. /// @param listenerId The ID of the listener associated with the job. /// @return The ID of the added job. -uint32_t job_create(JobPool &jobPool, JobPool::JobCallback callback, void *extra, uint64_t chunkId, - uint32_t chunkVersion, ChunkPartType chunkType, uint32_t listenerId = 0); +uint32_t job_create(MasterJobPool &jobPool, JobPool::JobCallback callback, void *extra, + uint64_t chunkId, uint32_t chunkVersion, ChunkPartType chunkType, + uint32_t listenerId = 0); /// @brief Adds a change version job to the JobPool. /// -/// @param jobPool The JobPool instance. +/// @param jobPool The MasterJobPool instance. /// @param callback The callback function to be called upon job completion. /// @param extra Additional data to be passed to the callback. /// @param chunkId The ID of the chunk. @@ -474,13 +621,13 @@ uint32_t job_create(JobPool &jobPool, JobPool::JobCallback callback, void *extra /// @param newChunkVersion The new version of the chunk. /// @param listenerId The ID of the listener associated with the job. /// @return The ID of the added job. -uint32_t job_version(JobPool &jobPool, const JobPool::JobCallback &callback, void *extra, +uint32_t job_version(MasterJobPool &jobPool, const JobPool::JobCallback &callback, void *extra, uint64_t chunkId, uint32_t chunkVersion, ChunkPartType chunkType, uint32_t newChunkVersion, uint32_t listenerId = 0); /// @brief Adds a truncate job to the JobPool. /// -/// @param jobPool The JobPool instance. +/// @param jobPool The MasterJobPool instance. /// @param callback The callback function to be called upon job completion. /// @param extra Additional data to be passed to the callback. /// @param chunkId The ID of the chunk. @@ -490,13 +637,13 @@ uint32_t job_version(JobPool &jobPool, const JobPool::JobCallback &callback, voi /// @param length The length to truncate to. /// @param listenerId The ID of the listener associated with the job. /// @return The ID of the added job. -uint32_t job_truncate(JobPool &jobPool, const JobPool::JobCallback &callback, void *extra, +uint32_t job_truncate(MasterJobPool &jobPool, const JobPool::JobCallback &callback, void *extra, uint64_t chunkId, ChunkPartType chunkType, uint32_t chunkVersion, uint32_t newChunkVersion, uint32_t length, uint32_t listenerId = 0); /// @brief Adds a duplicate job to the JobPool. /// -/// @param jobPool The JobPool instance. +/// @param jobPool The MasterJobPool instance. /// @param callback The callback function to be called upon job completion. /// @param extra Additional data to be passed to the callback. /// @param chunkId The ID of the chunk. @@ -507,14 +654,14 @@ uint32_t job_truncate(JobPool &jobPool, const JobPool::JobCallback &callback, vo /// @param chunkVersionCopy The version of the chunk to copy. /// @param listenerId The ID of the listener associated with the job. /// @return The ID of the added job. -uint32_t job_duplicate(JobPool &jobPool, const JobPool::JobCallback &callback, void *extra, +uint32_t job_duplicate(MasterJobPool &jobPool, const JobPool::JobCallback &callback, void *extra, uint64_t chunkId, uint32_t chunkVersion, uint32_t newChunkVersion, ChunkPartType chunkType, uint64_t chunkIdCopy, uint32_t chunkVersionCopy, uint32_t listenerId = 0); /// @brief Adds a duplicate and truncate job to the JobPool. /// -/// @param jobPool The JobPool instance. +/// @param jobPool The MasterJobPool instance. /// @param callback The callback function to be called upon job completion. /// @param extra Additional data to be passed to the callback. /// @param chunkId The ID of the chunk. @@ -526,7 +673,7 @@ uint32_t job_duplicate(JobPool &jobPool, const JobPool::JobCallback &callback, v /// @param length The length to truncate to. /// @param listenerId The ID of the listener associated with the job. /// @return The ID of the added job. -uint32_t job_duptrunc(JobPool &jobPool, const JobPool::JobCallback &callback, void *extra, +uint32_t job_duptrunc(MasterJobPool &jobPool, const JobPool::JobCallback &callback, void *extra, uint64_t chunkId, uint32_t chunkVersion, uint32_t newChunkVersion, ChunkPartType chunkType, uint64_t chunkIdCopy, uint32_t chunkVersionCopy, uint32_t length, uint32_t listenerId = 0); diff --git a/src/chunkserver/bgjobs_unittest.cc b/src/chunkserver/bgjobs_unittest.cc index c153ec64d..32301051f 100644 --- a/src/chunkserver/bgjobs_unittest.cc +++ b/src/chunkserver/bgjobs_unittest.cc @@ -31,20 +31,42 @@ void mockJobCallback(uint8_t /*status*/, void *extra) { counter->fetch_add(1); } +void mockJobCallbackDoubling(uint8_t /*status*/, void *extra) { + auto *counter = static_cast *>(extra); + counter->fetch_add(counter->load()); +} + constexpr uint32_t kWaitTimeMs = 100; // Test fixture for JobPool tests class JobPoolTest : public ::testing::Test { private: void servePoll(uint32_t listenerId) { - struct pollfd wakeupDescPollFd = {wakeupDescVec[listenerId], POLLIN, 0}; + std::vector wakeupDescPollFDs = { + pollfd{wakeupDescVec[listenerId], POLLIN, 0}, + pollfd{masterWakeupDescVec[listenerId], POLLIN, 0}, + pollfd{clientSwitchModeWakeupDescVec[listenerId], POLLIN, 0}, + pollfd{clientFifoModeWakeupDescVec[listenerId], POLLIN, 0}}; + while (!terminate) { - int ret = poll(&wakeupDescPollFd, 1, kWaitTimeMs); + int ret = poll(&wakeupDescPollFDs[0], wakeupDescPollFDs.size(), kWaitTimeMs); if (ret > 0) { - if (wakeupDescPollFd.revents & POLLIN) { + if (wakeupDescPollFDs[0].revents & POLLIN) { processingCount[listenerId].fetch_add(1); jobPool->processCompletedJobs(listenerId); } + if (wakeupDescPollFDs[1].revents & POLLIN) { + processingCount[listenerId].fetch_add(1); + masterJobPool->processCompletedJobs(listenerId); + } + if (wakeupDescPollFDs[2].revents & POLLIN) { + processingCount[listenerId].fetch_add(1); + clientJobPoolSwitch->processCompletedJobs(listenerId); + } + if (wakeupDescPollFDs[3].revents & POLLIN) { + processingCount[listenerId].fetch_add(1); + clientJobPoolFifo->processCompletedJobs(listenerId); + } } } } @@ -54,6 +76,24 @@ class JobPoolTest : public ::testing::Test { // Let's create some listeners for the JobPool wakeupDescVec.resize(kNrListeners); jobPool = std::make_unique("TestPool", 4, 10, kNrListeners, wakeupDescVec); + jobPool->startWorkers(); + + masterWakeupDescVec.resize(kNrListeners); + masterJobPool = std::make_unique("TestMasterPool", 4, 10, kNrListeners, + masterWakeupDescVec); + + // For the ClientJobPool, we want to test both IOPriorityMode::Fifo and + // IOPriorityMode::Switch, so we will initialize it in the test cases instead of here. For + // now, we can just initialize it with Switch mode as default. + clientSwitchModeWakeupDescVec.resize(kNrListeners); + clientJobPoolSwitch = + std::make_unique("TestClientPoolSwitch", 1, 10, kNrListeners, + clientSwitchModeWakeupDescVec, IOPriorityMode::Switch); + clientFifoModeWakeupDescVec.resize(kNrListeners); + clientJobPoolFifo = + std::make_unique("TestClientPoolFifo", 1, 10, kNrListeners, + clientFifoModeWakeupDescVec, IOPriorityMode::Fifo); + for (uint32_t i = 0; i < kNrListeners; ++i) { processingCount[i] = 0; } @@ -89,11 +129,19 @@ class JobPoolTest : public ::testing::Test { } JobPool::ProcessJobCallback mockProcessJob = []() -> uint8_t { + usleep(1000); // Simulate some work by sleeping for 1ms return 0; // Return success status }; std::unique_ptr jobPool; std::vector wakeupDescVec; + std::unique_ptr masterJobPool; + std::vector masterWakeupDescVec; + std::unique_ptr clientJobPoolSwitch; + std::vector clientSwitchModeWakeupDescVec; + std::unique_ptr clientJobPoolFifo; + std::vector clientFifoModeWakeupDescVec; + std::vector servePollThreads; bool terminate; std::mutex mutex_; @@ -233,52 +281,109 @@ TEST_F(JobPoolTest, ChunkLocking) { ChunkWithType chunkWithType3{chunkId3, chunkType}; ChunkWithType chunkWithType4{chunkId4, chunkType}; - jobPool->startChunkLock(mockJobCallback, &counters[0], chunkId1, chunkType, 0); - jobPool->startChunkLock(mockJobCallback, &counters[0], chunkId2, chunkType, 0); - jobPool->startChunkLock(mockJobCallback, nullptr, chunkId4, chunkType, 0); - jobPool->enforceChunkLock(chunkId1, chunkType); - jobPool->enforceChunkLock(chunkId4, chunkType); + masterJobPool->startChunkLock(mockJobCallback, &counters[0], chunkId1, chunkType, 0); + masterJobPool->startChunkLock(mockJobCallback, &counters[0], chunkId2, chunkType, 0); + masterJobPool->startChunkLock(mockJobCallback, nullptr, chunkId4, chunkType, 0); + masterJobPool->enforceChunkLock(chunkId1, chunkType); + masterJobPool->enforceChunkLock(chunkId4, chunkType); - jobPool->addJobIfNotLocked(chunkWithType1, JobPool::ChunkOperation::Read, mockJobCallback, - &counters[0], mockProcessJob); + masterJobPool->addJobIfNotLocked(chunkWithType1, JobPool::ChunkOperation::Read, mockJobCallback, + &counters[0], mockProcessJob); std::this_thread::sleep_for(std::chrono::milliseconds(kWaitTimeMs)); // Job should not be processed because chunk is locked and enforced EXPECT_EQ(counters[0].load(), 0); - jobPool->addJobIfNotLocked(chunkWithType2, JobPool::ChunkOperation::Read, mockJobCallback, - &counters[0], mockProcessJob); + masterJobPool->addJobIfNotLocked(chunkWithType2, JobPool::ChunkOperation::Read, mockJobCallback, + &counters[0], mockProcessJob); std::this_thread::sleep_for(std::chrono::milliseconds(kWaitTimeMs)); // Job should be processed because chunk is locked but not enforced EXPECT_EQ(counters[0].load(), 1); - jobPool->addJobIfNotLocked(chunkWithType3, JobPool::ChunkOperation::Read, mockJobCallback, - &counters[0], mockProcessJob); + masterJobPool->addJobIfNotLocked(chunkWithType3, JobPool::ChunkOperation::Read, mockJobCallback, + &counters[0], mockProcessJob); std::this_thread::sleep_for(std::chrono::milliseconds(kWaitTimeMs)); // Job should be processed because chunk is not locked EXPECT_EQ(counters[0].load(), 2); - jobPool->addJobIfNotLocked(chunkWithType4, JobPool::ChunkOperation::Read, mockJobCallback, - &counters[0], mockProcessJob); - jobPool->addJobIfNotLocked(chunkWithType4, JobPool::ChunkOperation::Read, mockJobCallback, - &counters[0], mockProcessJob); + masterJobPool->addJobIfNotLocked(chunkWithType4, JobPool::ChunkOperation::Read, mockJobCallback, + &counters[0], mockProcessJob); + masterJobPool->addJobIfNotLocked(chunkWithType4, JobPool::ChunkOperation::Read, mockJobCallback, + &counters[0], mockProcessJob); std::this_thread::sleep_for(std::chrono::milliseconds(kWaitTimeMs)); // Job should not be processed because chunk is locked and enforced EXPECT_EQ(counters[0].load(), 2); - jobPool->endChunkLock(chunkId1, chunkType, SAUNAFS_STATUS_OK); + masterJobPool->endChunkLock(chunkId1, chunkType, SAUNAFS_STATUS_OK); std::this_thread::sleep_for(std::chrono::milliseconds(kWaitTimeMs)); // Now the job for chunkWithType1 should be processed because the lock is released and the // callback for the lock job should be called EXPECT_EQ(counters[0].load(), 4); - jobPool->endChunkLock(chunkId2, chunkType, SAUNAFS_STATUS_OK); + masterJobPool->endChunkLock(chunkId2, chunkType, SAUNAFS_STATUS_OK); std::this_thread::sleep_for(std::chrono::milliseconds(kWaitTimeMs)); // Now the callback for the lock job of chunkWithType2 should be called EXPECT_EQ(counters[0].load(), 5); - jobPool->eraseChunkLock(chunkId4, chunkType); + masterJobPool->eraseChunkLock(chunkId4, chunkType); std::this_thread::sleep_for(std::chrono::milliseconds(kWaitTimeMs)); // Now the jobs for chunkWithType4 should be processed because the lock is released, but the // callback for the lock job should not be called EXPECT_EQ(counters[0].load(), 7); } + +TEST_F(JobPoolTest, IOPriorityMode) { + const int kInitialSwitchCounterValue = 5; + const int kInitialFifoCounterValue = 7; + counters[0] = kInitialSwitchCounterValue; + counters[1] = kInitialFifoCounterValue; + + // Add a series of Read and Write jobs to the ClientJobPool in Switch mode and check the + // processing order by the final value. + for (int i = 0; i < 5; ++i) { + clientJobPoolSwitch->addJob(JobPool::ChunkOperation::Read, mockJobCallback, &counters[0], + mockProcessJob); + } + for (int i = 0; i < 5; ++i) { + clientJobPoolSwitch->addJob(JobPool::ChunkOperation::Write, mockJobCallbackDoubling, + &counters[0], mockProcessJob); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(kWaitTimeMs)); + + // In Switch mode, the Read and Write jobs should be processed in an alternating manner, so + // the final value should reflect the interleaving of the operations. + int switchCounterExpectedFinalValue = kInitialSwitchCounterValue; + for (int i = 0; i < 5; ++i) { + // Each Read job adds 1, and each Write job doubles the current value. + switchCounterExpectedFinalValue = (switchCounterExpectedFinalValue + 1) * 2; + } + + EXPECT_EQ(counters[0].load(), switchCounterExpectedFinalValue); + + // Add a series of Read and Write jobs to the ClientJobPool in Fifo mode and check the + // processing order by the final value. + for (int i = 0; i < 5; ++i) { + clientJobPoolFifo->addJob(JobPool::ChunkOperation::Read, mockJobCallback, &counters[1], + mockProcessJob); + } + for (int i = 0; i < 5; ++i) { + clientJobPoolFifo->addJob(JobPool::ChunkOperation::Write, mockJobCallbackDoubling, + &counters[1], mockProcessJob); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(kWaitTimeMs)); + + // In Fifo mode, the Read jobs should be processed first followed by the Write jobs, so the + // final value should reflect the sequential processing of the operations. + int fifoCounterExpectedFinalValue = kInitialFifoCounterValue; + for (int i = 0; i < 5; ++i) { + // Each Read job adds 1. + fifoCounterExpectedFinalValue++; + } + for (int i = 0; i < 5; ++i) { + // Each Write job doubles the current value. + fifoCounterExpectedFinalValue *= 2; + } + + EXPECT_EQ(counters[1].load(), fifoCounterExpectedFinalValue); +} diff --git a/src/chunkserver/chunk_high_level_ops.h b/src/chunkserver/chunk_high_level_ops.h index ad28bcce8..a5dd44725 100644 --- a/src/chunkserver/chunk_high_level_ops.h +++ b/src/chunkserver/chunk_high_level_ops.h @@ -31,7 +31,7 @@ #include "devtools/request_log.h" #include "protocol/cltocs.h" -class JobPool; +class ClientJobPool; inline constexpr uint8_t kSauWriteDataPrefixSize = cltocs::writeData::kPrefixSize; // For forwarding: size of SAU_CLTOCS_WRITE_DATA prefix plus the packet header. @@ -60,7 +60,7 @@ class HighLevelOp { void setParentState(ChunkserverEntry::State newState) { parent_->state = newState; } /// Job pool of the network worker handling the parent ChunkserverEntry. - JobPool *workerJobPool() const { return parent_->workerJobPool; } + ClientJobPool *workerJobPool() const { return parent_->workerJobPool; } /// Checks and applies closing on the parent ChunkserverEntry. void checkAndApplyClosedOnParent() const { parent_->checkAndApplyClosed(); } diff --git a/src/chunkserver/chunkserver_entry.cc b/src/chunkserver/chunkserver_entry.cc index 5f7ae63de..e42b4b6ca 100644 --- a/src/chunkserver/chunkserver_entry.cc +++ b/src/chunkserver/chunkserver_entry.cc @@ -57,7 +57,7 @@ static ConnectionPool gForwardConnectionPool; static constexpr uint32_t kMaxPacketSize = 100000 + SFSBLOCKSIZE; static constexpr uint8_t kConnectRetries = 10; -ChunkserverEntry::ChunkserverEntry(int socket, JobPool *workerJobPool, +ChunkserverEntry::ChunkserverEntry(int socket, ClientJobPool *workerJobPool, uint16_t maxBlocksPerHddReadJob, uint16_t maxParallelHddReadJobs, uint16_t maxBlocksPerHddWriteJob) : workerJobPool(workerJobPool), sock(socket) { diff --git a/src/chunkserver/chunkserver_entry.h b/src/chunkserver/chunkserver_entry.h index cac13ccfb..6ceef4330 100644 --- a/src/chunkserver/chunkserver_entry.h +++ b/src/chunkserver/chunkserver_entry.h @@ -112,7 +112,7 @@ struct ChunkserverEntry { static constexpr uint32_t kGenerateChartExpectedPacketSize = sizeof(uint32_t); - JobPool *workerJobPool; // Job pool assigned to a given network worker thread + ClientJobPool *workerJobPool; // Job pool assigned to a given network worker thread ChunkserverEntry::State state = ChunkserverEntry::State::Idle; ChunkserverEntry::Mode mode = ChunkserverEntry::Mode::Header; @@ -141,7 +141,7 @@ struct ChunkserverEntry { uint32_t chunkVersion = 0; // R+W ChunkPartType chunkType = slice_traits::standard::ChunkPartType(); // R - ChunkserverEntry(int socket, JobPool *workerJobPool, uint16_t maxBlocksPerHddReadJob, + ChunkserverEntry(int socket, ClientJobPool *workerJobPool, uint16_t maxBlocksPerHddReadJob, uint16_t maxParallelHddReadJobs, uint16_t maxBlocksPerHddWriteJob); // Disallow copying and moving to avoid misuse. diff --git a/src/chunkserver/master_connection.h b/src/chunkserver/master_connection.h index 5d098e98a..eb2227f91 100644 --- a/src/chunkserver/master_connection.h +++ b/src/chunkserver/master_connection.h @@ -45,7 +45,7 @@ inline std::string gLabel; inline uint32_t gTimeout_ms; // Forward declaration -class JobPool; +class MasterJobPool; /// @brief Enum representing the connection mode to the Metadata Server (MDS). enum class ConnectionMode : std::uint8_t { @@ -70,8 +70,8 @@ enum class RegistrationStatus : std::uint8_t { class MasterConn { public: explicit MasterConn(const std::string &masterHostStr, const std::string &masterPortStr, - const std::string &clusterId, const std::shared_ptr &jobPool, - const std::shared_ptr &replicationJobPool) + const std::string &clusterId, const std::shared_ptr &jobPool, + const std::shared_ptr &replicationJobPool) : masterHostStr_(masterHostStr), masterPortStr_(masterPortStr), clusterId_(clusterId), @@ -231,8 +231,9 @@ class MasterConn { std::string masterPortStr_; ///< Port of the master server. uint32_t version_{saunafsVersion(0, 0, 0)}; ///< Version of the master server. std::string clusterId_; ///< Cluster ID for this connection. - std::shared_ptr jobPool_; ///< Shared reference to the JobPool. - std::shared_ptr replicationJobPool_; ///< Shared reference to the ReplicationJobPool. + std::shared_ptr jobPool_; ///< Shared reference to the JobPool. + /// Shared reference to the ReplicationJobPool. + std::shared_ptr replicationJobPool_; // For compatibility with old masters (version < 5.0) void handleRegistrationAttempt(); diff --git a/src/chunkserver/masterconn.cc b/src/chunkserver/masterconn.cc index eec53a6a5..fc0f68048 100644 --- a/src/chunkserver/masterconn.cc +++ b/src/chunkserver/masterconn.cc @@ -58,8 +58,8 @@ static bool gEnableLoadFactor; static const uint64_t kSendStatusDelay = 5; // JobPool shared between all connections to MDSs -static std::shared_ptr gJobPool; -static std::shared_ptr gReplicationJobPool; +static std::shared_ptr gJobPool; +static std::shared_ptr gReplicationJobPool; // Singleton for the MasterConn instance (will become a list of connections in the future) static std::unique_ptr gMasterConnSingleton = nullptr; @@ -140,7 +140,7 @@ void masterconn_unwantedjobfinished(uint8_t status, void *packet) { MasterConn::deletePacket(packet); } -JobPool* masterconn_get_job_pool() { +MasterJobPool* masterconn_get_job_pool() { return gJobPool.get(); } @@ -334,8 +334,8 @@ int masterconn_init_threads(void) { // Create the JobPool instance with the specified number of workers, it would be serving // only this master network thread, thus the number of listeners is 1. std::vector bgJobPoolFDs(1); - gJobPool = std::make_shared("ma", gNumberOfWorkers, kMaxBackgroundJobsCount, 1, - bgJobPoolFDs); + gJobPool = std::make_shared("ma", gNumberOfWorkers, kMaxBackgroundJobsCount, + 1, bgJobPoolFDs); gJobFD = bgJobPoolFDs[0]; } catch (const std::exception &e) { safs::log_err("masterconn_init_threads: Failed to create JobPool instance: {}", e.what()); @@ -357,8 +357,8 @@ int masterconn_init_threads(void) { // serving only this master network thread, thus the number of listeners is 1. std::vector replicationJobPoolFDs(1); gReplicationJobPool = - std::make_shared("ma_repl", gReplicationNumberOfWorkers, - kMaxBackgroundJobsCount, 1, replicationJobPoolFDs); + std::make_shared("ma_repl", gReplicationNumberOfWorkers, + kMaxBackgroundJobsCount, 1, replicationJobPoolFDs); gReplicationJobFD = replicationJobPoolFDs[0]; } catch (const std::exception &e) { safs::log_err("masterconn_init_threads: Failed to create ReplicationJobPool instance: {}", diff --git a/src/chunkserver/masterconn.h b/src/chunkserver/masterconn.h index a43bce3fa..a91843f20 100644 --- a/src/chunkserver/masterconn.h +++ b/src/chunkserver/masterconn.h @@ -28,5 +28,5 @@ void masterconn_stats(uint64_t *bin, uint64_t *bout, uint32_t *maxjobscnt); int masterconn_init(void); int masterconn_init_threads(void); -JobPool* masterconn_get_job_pool(); +MasterJobPool* masterconn_get_job_pool(); bool masterconn_canexit(); diff --git a/src/chunkserver/network_main_thread.cc b/src/chunkserver/network_main_thread.cc index e948f2b4a..8c2ce238a 100644 --- a/src/chunkserver/network_main_thread.cc +++ b/src/chunkserver/network_main_thread.cc @@ -283,6 +283,17 @@ int mainNetworkThreadInit(void) { gBgjobsCountPerNetworkWorker = cfg_get_minvalue( "BGJOBSCNT_PER_NETWORK_WORKER", NetworkWorkerThread::kDefaultMaxBackgroundJobsPerNetworkWorker, 10); + std::string ioPriorityModeStr = cfg_getstring("IO_PRIORITY_MODE", "FIFO"); + if (ioPriorityModeStr == "SWITCH") { + // Must clearly say that the mode is Switch, otherwise it will be Fifo. This is because Fifo + // is the default and more tested mode. + gIOPriorityMode = IOPriorityMode::Switch; + } else { + gIOPriorityMode = IOPriorityMode::Fifo; + if (ioPriorityModeStr != "FIFO") { + safs::log_warn("Invalid IO_PRIORITY_MODE '{}', defaulting to FIFO", ioPriorityModeStr); + } + } gMaxBlocksPerHddWriteJob = cfg_get_minmaxvalue( "MAX_BLOCKS_PER_HDD_WRITE_JOB", NetworkWorkerThread::kDefaultMaxBlocksPerHddWriteJob, diff --git a/src/chunkserver/network_worker_thread.cc b/src/chunkserver/network_worker_thread.cc index 4147f71eb..d0f21767e 100644 --- a/src/chunkserver/network_worker_thread.cc +++ b/src/chunkserver/network_worker_thread.cc @@ -70,8 +70,8 @@ NetworkWorkerThread::NetworkWorkerThread(uint32_t id, uint32_t nrOfBgjobsWorkers // Create the JobPool instance with the specified number of workers. It would be serving // only this network worker thread, thus the number of listeners is 1. std::vector bgJobPoolWakeUpFds(1); - bgJobPool_ = - std::make_unique(name_, nrOfBgjobsWorkers, bgjobsCount, 1, bgJobPoolWakeUpFds); + bgJobPool_ = std::make_unique(name_, nrOfBgjobsWorkers, bgjobsCount, 1, + bgJobPoolWakeUpFds, gIOPriorityMode); bgJobPoolWakeUpFd_ = bgJobPoolWakeUpFds[0]; } catch (const std::exception &e) { safs::log_err("NetworkWorkerThread: Failed to create JobPool instance: {}", e.what()); diff --git a/src/chunkserver/network_worker_thread.h b/src/chunkserver/network_worker_thread.h index 00359df3b..35a3b187f 100644 --- a/src/chunkserver/network_worker_thread.h +++ b/src/chunkserver/network_worker_thread.h @@ -55,7 +55,7 @@ class NetworkWorkerThread { void askForTermination(); void addConnection(int newSocketFD); - JobPool *backgroundJobPool() { + ClientJobPool *backgroundJobPool() { return bgJobPool_.get(); } @@ -73,7 +73,7 @@ class NetworkWorkerThread { std::mutex csservheadLock; std::list csservEntries; - std::unique_ptr bgJobPool_; + std::unique_ptr bgJobPool_; std::atomic canTerminate_{false}; ///< Whether it is safe to terminate the thread int bgJobPoolWakeUpFd_; static const uint32_t JOB_FD_PDESC_POS = 1; diff --git a/src/common/pcqueue.cc b/src/common/pcqueue.cc index 55a670ebc..8e64fd88a 100644 --- a/src/common/pcqueue.cc +++ b/src/common/pcqueue.cc @@ -27,12 +27,14 @@ #include "devtools/TracePrinter.h" #include "slogger/slogger.h" -ProducerConsumerQueue::ProducerConsumerQueue(uint8_t priorityLevels, uint32_t maxSize, - Deleter deleter) +ProducerConsumerQueueWithPriority::ProducerConsumerQueueWithPriority(uint8_t priorityLevels, + uint32_t maxSize, + Deleter deleter) : maxSize_(maxSize), currentElements_(0), currentSize_(0), deleter_(deleter) { if (priorityLevels == 0) { - safs::log_info("ProducerConsumerQueue::{}: Given priorityLevels is 0, using 1 instead", - __func__); + safs::log_info( + "ProducerConsumerQueueWithPriority::{}: Given priorityLevels is 0, using 1 instead", + __func__); priorityLevels = 1; } queuesByPriority_.reserve(priorityLevels); @@ -40,7 +42,7 @@ ProducerConsumerQueue::ProducerConsumerQueue(uint8_t priorityLevels, uint32_t ma TRACETHIS(); } -ProducerConsumerQueue::~ProducerConsumerQueue() { +ProducerConsumerQueueWithPriority::~ProducerConsumerQueueWithPriority() { TRACETHIS(); std::lock_guard lock(mutex_); for (auto &queue : queuesByPriority_) { @@ -51,32 +53,32 @@ ProducerConsumerQueue::~ProducerConsumerQueue() { } } -bool ProducerConsumerQueue::isEmpty() const { +bool ProducerConsumerQueueWithPriority::isEmpty() const { TRACETHIS(); std::lock_guard lock(mutex_); return currentSize_ == 0; } -bool ProducerConsumerQueue::isFull() const { +bool ProducerConsumerQueueWithPriority::isFull() const { TRACETHIS(); std::lock_guard lock(mutex_); return maxSize_ > 0 && currentSize_ >= maxSize_; } -uint32_t ProducerConsumerQueue::sizeLeft() const { +uint32_t ProducerConsumerQueueWithPriority::sizeLeft() const { TRACETHIS(); std::lock_guard lock(mutex_); return maxSize_ > 0 ? (currentSize_ <= maxSize_ ? maxSize_ - currentSize_ : 0) : UINT32_MAX; } -uint32_t ProducerConsumerQueue::elements() const { +uint32_t ProducerConsumerQueueWithPriority::elements() const { TRACETHIS(); std::lock_guard lock(mutex_); return currentElements_; } -void ProducerConsumerQueue::put(uint32_t jobId, uint32_t jobType, uint8_t *data, uint32_t length, - uint8_t priority) { +void ProducerConsumerQueueWithPriority::put(uint32_t jobId, uint32_t jobType, uint8_t *data, + uint32_t length, uint8_t priority) { TRACETHIS(); std::unique_lock lock(mutex_); put_(jobId, jobType, data, length, priority); @@ -84,8 +86,8 @@ void ProducerConsumerQueue::put(uint32_t jobId, uint32_t jobType, uint8_t *data, notEmpty_.notify_one(); } -bool ProducerConsumerQueue::tryPut(uint32_t jobId, uint32_t jobType, uint8_t *data, uint32_t length, - uint8_t priority) { +bool ProducerConsumerQueueWithPriority::tryPut(uint32_t jobId, uint32_t jobType, uint8_t *data, + uint32_t length, uint8_t priority) { TRACETHIS(); std::lock_guard lock(mutex_); if (maxSize_ > 0) { @@ -106,18 +108,27 @@ bool ProducerConsumerQueue::tryPut(uint32_t jobId, uint32_t jobType, uint8_t *da return true; } -bool ProducerConsumerQueue::get(uint32_t *jobId, uint32_t *jobType, - uint8_t **data, uint32_t *length) { +void ProducerConsumerQueueWithPriority::get(uint32_t *jobId, uint32_t *jobType, uint8_t **data, + uint32_t *length) { TRACETHIS(); std::unique_lock lock(mutex_); notEmpty_.wait(lock, [this] { return currentSize_ > 0; }); get_(jobId, jobType, data, length); - return true; } -bool ProducerConsumerQueue::tryGet(uint32_t *jobId, uint32_t *jobType, - uint8_t **data, uint32_t *length) { +void ProducerConsumerQueueWithPriority::getUsingCustomPriority( + uint32_t *jobId, uint32_t *jobType, uint8_t **data, uint32_t *length, + std::span priorityLevelsToCheck) { + TRACETHIS(); + std::unique_lock lock(mutex_); + notEmpty_.wait(lock, [this] { return currentSize_ > 0; }); + + getUsingCustomPriority_(jobId, jobType, data, length, priorityLevelsToCheck); +} + +bool ProducerConsumerQueueWithPriority::tryGet(uint32_t *jobId, uint32_t *jobType, uint8_t **data, + uint32_t *length) { TRACETHIS(); std::lock_guard lock(mutex_); if (currentSize_ == 0) { @@ -133,12 +144,12 @@ bool ProducerConsumerQueue::tryGet(uint32_t *jobId, uint32_t *jobType, return true; } -void ProducerConsumerQueue::put_(uint32_t jobId, uint32_t jobType, uint8_t *data, uint32_t length, - uint8_t priority) { +void ProducerConsumerQueueWithPriority::put_(uint32_t jobId, uint32_t jobType, uint8_t *data, + uint32_t length, uint8_t priority) { assert(queuesByPriority_.size() > 0); if (priority >= queuesByPriority_.size()) { safs::log_info( - "ProducerConsumerQueue::{}: Given priority {} exceeds max priority {}, using lowest priority instead", + "ProducerConsumerQueueWithPriority::{}: Given priority {} exceeds max priority {}, using lowest priority instead", __func__, priority, queuesByPriority_.size() - 1); priority = queuesByPriority_.size() - 1; // lowest priority } @@ -148,19 +159,12 @@ void ProducerConsumerQueue::put_(uint32_t jobId, uint32_t jobType, uint8_t *data currentElements_++; } -void ProducerConsumerQueue::get_(uint32_t *jobId, uint32_t *jobType, uint8_t **data, - uint32_t *length) { - std::queue *notEmptyQueue = nullptr; - for (auto &queue : queuesByPriority_) { - if (!queue.empty()) { - notEmptyQueue = &queue; - break; - } - } - - if (notEmptyQueue == nullptr) { +void ProducerConsumerQueueWithPriority::retrieveFromQueue_(uint32_t *jobId, uint32_t *jobType, + uint8_t **data, uint32_t *length, + std::queue *queue) { + if (queue == nullptr) { safs::log_warn( - "ProducerConsumerQueue::{}: No non-empty queue found (this should not happen)", + "ProducerConsumerQueueWithPriority::{}: Trying to retrieve from a null queue (this should not happen)", __func__); if (jobId) { *jobId = 0; } if (jobType) { *jobType = 0; } @@ -169,7 +173,7 @@ void ProducerConsumerQueue::get_(uint32_t *jobId, uint32_t *jobType, uint8_t **d return; } - auto &entry = notEmptyQueue->front(); + auto &entry = queue->front(); currentSize_ -= entry.length; currentElements_--; @@ -178,5 +182,32 @@ void ProducerConsumerQueue::get_(uint32_t *jobId, uint32_t *jobType, uint8_t **d if (data) { *data = entry.data; } if (length) { *length = entry.length; } - notEmptyQueue->pop(); + queue->pop(); +} + +void ProducerConsumerQueueWithPriority::get_(uint32_t *jobId, uint32_t *jobType, uint8_t **data, + uint32_t *length) { + std::queue *notEmptyQueue = nullptr; + for (auto &queue : queuesByPriority_) { + if (!queue.empty()) { + notEmptyQueue = &queue; + break; + } + } + + retrieveFromQueue_(jobId, jobType, data, length, notEmptyQueue); +} + +void ProducerConsumerQueueWithPriority::getUsingCustomPriority_( + uint32_t *jobId, uint32_t *jobType, uint8_t **data, uint32_t *length, + std::span priorityLevelsToCheck) { + std::queue *notEmptyQueue = nullptr; + for (auto priority : priorityLevelsToCheck) { + if (priority < queuesByPriority_.size() && !queuesByPriority_[priority].empty()) { + notEmptyQueue = &queuesByPriority_[priority]; + break; + } + } + + retrieveFromQueue_(jobId, jobType, data, length, notEmptyQueue); } diff --git a/src/common/pcqueue.h b/src/common/pcqueue.h index e7666b090..f4d5b2b0d 100644 --- a/src/common/pcqueue.h +++ b/src/common/pcqueue.h @@ -27,6 +27,7 @@ #include #include #include +#include template void deleterByType(uint8_t *p) { @@ -35,7 +36,7 @@ void deleterByType(uint8_t *p) { inline void deleterDummy(uint8_t * /*unused*/) {} -/// @class ProducerConsumerQueue +/// @class ProducerConsumerQueueWithPriority /// @brief A thread-safe queue for producer-consumer scenarios. /// /// Can be configured to support several priority levels. Final interface is queue-like, @@ -57,7 +58,7 @@ inline void deleterDummy(uint8_t * /*unused*/) {} /// another module processes the tasks. /// /// // Example usage: -/// ProducerConsumerQueue queue(10, deleterByType); +/// ProducerConsumerQueueWithPriority queue(10, deleterByType); /// /// // Producer thread /// std::thread producer([&queue]() { @@ -82,11 +83,11 @@ inline void deleterDummy(uint8_t * /*unused*/) {} /// /// producer.join(); /// consumer.join(); -class ProducerConsumerQueue { +class ProducerConsumerQueueWithPriority { public: using Deleter = std::function; - /// @brief Constructs a ProducerConsumerQueue with a specified maximum size + /// @brief Constructs a ProducerConsumerQueueWithPriority with a specified maximum size /// and deleter. /// /// @param priorityLevels The number of priority levels. Default is 1 (no priorities). @@ -94,11 +95,11 @@ class ProducerConsumerQueue { /// Default is 0 (unlimited). /// @param deleter A callable type that defines how to delete the data /// stored in the queue. Default is deleterDummy. - explicit ProducerConsumerQueue(uint8_t priorityLevels = 1, uint32_t maxSize = 0, - Deleter deleter = deleterDummy); + explicit ProducerConsumerQueueWithPriority(uint8_t priorityLevels = 1, uint32_t maxSize = 0, + Deleter deleter = deleterDummy); - /// @brief Destructor for the ProducerConsumerQueue. - ~ProducerConsumerQueue(); + /// @brief Destructor for the ProducerConsumerQueueWithPriority. + virtual ~ProducerConsumerQueueWithPriority(); /// @brief Checks if the queue is empty. /// @@ -148,15 +149,31 @@ class ProducerConsumerQueue { /// @brief Removes an element from the queue. /// + /// @note This method will block if the queue is empty until an element is added. + /// Will remove the highest priority element available, preserving order within each priority + /// level. + /// /// @param jobId A pointer to store the job ID of the removed element. /// @param jobType A pointer to store the job type of the removed element. /// @param data A pointer to store the data of the removed element. /// @param length A pointer to store the length of the data of the removed /// element. - /// @return true if an element was removed successfully, false otherwise. - bool get(uint32_t *jobId, uint32_t *jobType, uint8_t **data, + void get(uint32_t *jobId, uint32_t *jobType, uint8_t **data, uint32_t *length); + /// @brief Removes an element from the queue using custom priority levels to check. + /// @note This method will block if the queue is empty until an element is added. Will check the + /// specified priority levels in order and remove the first available element, preserving order + /// within each priority level. + /// @param jobId A pointer to store the job ID of the removed element. + /// @param jobType A pointer to store the job type of the removed element. + /// @param data A pointer to store the data of the removed element. + /// @param length A pointer to store the length of the data of the removed element. + /// @param priorityLevelsToCheck A vector of priority levels to check in order (0 is the highest + /// priority). Needs to contain all priority levels used in the queue, but can be in any order. + void getUsingCustomPriority(uint32_t *jobId, uint32_t *jobType, uint8_t **data, + uint32_t *length, std::span priorityLevelsToCheck); + /// @brief Tries to remove an element from the queue without blocking. /// /// @param jobId A pointer to store the job ID of the removed element. @@ -174,10 +191,6 @@ class ProducerConsumerQueue { inline void put_(uint32_t jobId, uint32_t jobType, uint8_t *data, uint32_t length, uint8_t priority); - /// @brief Removes an element from the queue assuming non-emptiness. - /// mutex_: LOCKED - inline void get_(uint32_t *jobId, uint32_t *jobType, uint8_t **data, uint32_t *length); - /// @brief Represents an entry in the queue. struct QueueEntry { uint32_t jobId; ///< The job ID associated with the entry. @@ -209,6 +222,23 @@ class ProducerConsumerQueue { ~QueueEntry() = default; }; + /// @brief Removes an element from a specific queue. + /// mutex_: LOCKED + inline void retrieveFromQueue_(uint32_t *jobId, uint32_t *jobType, uint8_t **data, + uint32_t *length, std::queue *queue); + + /// @brief Removes an element from all queues assuming non-emptiness. + /// mutex_: LOCKED + inline void get_(uint32_t *jobId, uint32_t *jobType, uint8_t **data, uint32_t *length); + + /// @brief Removes an element from all queues assuming non-emptiness. + /// Checks the specified priority levels in order and removes the first available element, + /// preserving order within each priority level. + /// mutex_: LOCKED + inline void getUsingCustomPriority_(uint32_t *jobId, uint32_t *jobType, uint8_t **data, + uint32_t *length, + std::span priorityLevelsToCheck); + ///< The underlying queues storing the entries. std::vector> queuesByPriority_; ///< The maximum number of elements the queue can hold. @@ -224,3 +254,19 @@ class ProducerConsumerQueue { ///< The deleter function used to delete the data stored in the queue. Deleter deleter_; }; + +/// @class ProducerConsumerQueue +/// A simplified version of ProducerConsumerQueueWithPriority that only supports a single priority +/// level and provides a more queue-like interface. It inherits from +/// ProducerConsumerQueueWithPriority and uses its implementation, but hides the priority-related +/// functionality. +class ProducerConsumerQueue : public ProducerConsumerQueueWithPriority { +public: + ProducerConsumerQueue(uint32_t maxSize = 0, Deleter deleter = deleterDummy) + : ProducerConsumerQueueWithPriority(1, maxSize, deleter) {} + +private: + // Hide priority-specific functions by making them private, i.e removed from the public + // interface deleting them + using ProducerConsumerQueueWithPriority::getUsingCustomPriority; +}; diff --git a/src/common/pcqueue_unittest.cc b/src/common/pcqueue_unittest.cc index bcf458041..39b9fc5df 100644 --- a/src/common/pcqueue_unittest.cc +++ b/src/common/pcqueue_unittest.cc @@ -19,6 +19,7 @@ #include "common/pcqueue.h" #include #include +#include #include #include @@ -31,7 +32,7 @@ const uint32_t kMaxQueueSize = 100; // Test adding and removing a single element TEST(ProducerConsumerQueueTests, SingleElement) { - ProducerConsumerQueue queue(1, kMaxSize, customDeleter); + ProducerConsumerQueue queue(kMaxSize, customDeleter); auto *data = new uint8_t[kMaxLength]; EXPECT_TRUE(queue.tryPut(1, 1, data, kMaxLength)); @@ -40,7 +41,7 @@ TEST(ProducerConsumerQueueTests, SingleElement) { uint32_t length = 0; uint8_t *retrievedData = nullptr; - EXPECT_TRUE(queue.get(&jobId, &jobType, &retrievedData, &length)); + queue.get(&jobId, &jobType, &retrievedData, &length); EXPECT_EQ(jobId, 1U); EXPECT_EQ(jobType, 1U); EXPECT_EQ(length, kMaxLength); @@ -49,7 +50,7 @@ TEST(ProducerConsumerQueueTests, SingleElement) { // Test adding and removing multiple elements TEST(ProducerConsumerQueueTests, MultipleElements) { - ProducerConsumerQueue queue(1, kMaxQueueSize, customDeleter); + ProducerConsumerQueue queue(kMaxQueueSize, customDeleter); for (int i = 0; i < kMaxSize; ++i) { auto *data = new uint8_t[kMaxLength]; EXPECT_TRUE(queue.tryPut(i, i, data, kMaxLength)); @@ -61,7 +62,7 @@ TEST(ProducerConsumerQueueTests, MultipleElements) { uint32_t length = 0; uint8_t *retrievedData = nullptr; - EXPECT_TRUE(queue.get(&jobId, &jobType, &retrievedData, &length)); + queue.get(&jobId, &jobType, &retrievedData, &length); EXPECT_EQ(jobId, i); EXPECT_EQ(jobType, i); EXPECT_EQ(length, kMaxLength); @@ -70,7 +71,7 @@ TEST(ProducerConsumerQueueTests, MultipleElements) { // Test behavior when the queue is full TEST(ProducerConsumerQueueTests, QueueFull) { - ProducerConsumerQueue queue(1, 2, customDeleter); + ProducerConsumerQueue queue(2, customDeleter); auto *data1 = new uint8_t[kMaxLength]; auto *data2 = new uint8_t[kMaxLength]; auto *data3 = new uint8_t[kMaxLength]; @@ -82,7 +83,7 @@ TEST(ProducerConsumerQueueTests, QueueFull) { // Test behavior when the queue is empty TEST(ProducerConsumerQueueTests, QueueEmpty) { - ProducerConsumerQueue queue(1, kMaxSize, customDeleter); + ProducerConsumerQueue queue(kMaxSize, customDeleter); uint32_t jobId = 0; uint32_t jobType = 0; uint32_t length = 0; @@ -95,7 +96,7 @@ TEST(ProducerConsumerQueueTests, MultipleProducers) { const int kMaxProducersSize = 10; const int kMaxInsertionsPerThread = 10; - ProducerConsumerQueue queue(1, kMaxQueueSize, customDeleter); + ProducerConsumerQueue queue(kMaxQueueSize, customDeleter); std::vector producers; producers.reserve(kMaxProducersSize); @@ -117,7 +118,7 @@ TEST(ProducerConsumerQueueTests, MultipleConsumers) { const int kMaxConsumersSize = 10; const int kMaxRemovalsPerThread = 10; - ProducerConsumerQueue queue(1, kMaxQueueSize, customDeleter); + ProducerConsumerQueue queue(kMaxQueueSize, customDeleter); for (uint32_t i = 0; i < kMaxQueueSize; ++i) { auto *data = new uint8_t[kMaxLength]; queue.put(i, i, data, 1); @@ -149,7 +150,7 @@ TEST(ProducerConsumerQueueTests, ProducersAndConsumers) { const int kMaxInsertionsPerThread = 10; const int kMaxRemovalsPerThread = 10; - ProducerConsumerQueue queue(1, kMaxQueueSize, customDeleter); + ProducerConsumerQueue queue(kMaxQueueSize, customDeleter); std::vector producers; producers.reserve(kMaxProducersSize); @@ -192,7 +193,7 @@ TEST(ProducerConsumerQueueTests, CustomDeleter) { }; { - ProducerConsumerQueue queue(1, kMaxSize, customDeleter); + ProducerConsumerQueue queue(kMaxSize, customDeleter); auto *data = new uint8_t[kMaxLength]; queue.put(1, 1, data, kMaxLength); } @@ -207,7 +208,7 @@ TEST(ProducerConsumerQueueTests, MultiplePrioritiesBasic) { const int kNumberOfInsertions = kPriorityLevels * kInsertionsPerPriority; const int kBiggerMaxQueueSize = 100000; - ProducerConsumerQueue queue(kPriorityLevels, kBiggerMaxQueueSize, customDeleter); + ProducerConsumerQueueWithPriority queue(kPriorityLevels, kBiggerMaxQueueSize, customDeleter); for (int i = 0; i < kNumberOfInsertions; ++i) { auto *data = new uint8_t[kMaxLength]; queue.put(i, i, data, kMaxLength, i % kPriorityLevels); @@ -219,7 +220,7 @@ TEST(ProducerConsumerQueueTests, MultiplePrioritiesBasic) { uint32_t length = 0; uint8_t *retrievedData = nullptr; - EXPECT_TRUE(queue.get(&jobId, &jobType, &retrievedData, &length)); + queue.get(&jobId, &jobType, &retrievedData, &length); EXPECT_EQ(jobId, jobType); /// order should be: 0, 3, 6, 9, ... 1, 4, 7, 10, ... 2, 5, 8, 11, ... /// sorted by priority first (remainder), then by insertion order @@ -239,7 +240,7 @@ TEST(ProducerConsumerQueueTests, MultiplePrioritiesConcurrent) { const int kNumberOfThreads = 10; const int kBiggerMaxQueueSize = 100000; - ProducerConsumerQueue queue(kPriorityLevels, kBiggerMaxQueueSize, customDeleter); + ProducerConsumerQueueWithPriority queue(kPriorityLevels, kBiggerMaxQueueSize, customDeleter); std::mutex frequencyMutex; std::vector frequency(kPriorityLevels, 0); @@ -308,3 +309,56 @@ TEST(ProducerConsumerQueueTests, MultiplePrioritiesConcurrent) { EXPECT_TRUE(queue.isEmpty()); } + +// Test expected order of retrieval with custom priority levels +TEST(ProducerConsumerQueueTests, GetWithCustomPriority) { + const int kPriorityLevels = 10; + const int kTotalInsertions = 1000; + const int kBiggerMaxQueueSize = 1000000; + + ProducerConsumerQueueWithPriority queue(kPriorityLevels, kBiggerMaxQueueSize, customDeleter); + std::vector frequency(kPriorityLevels, 0); + std::vector priorityLevelsToCheck(kPriorityLevels); + std::iota(priorityLevelsToCheck.begin(), priorityLevelsToCheck.end(), 0); + + const uint32_t kSeed = 123456789U; + std::mt19937 rng(kSeed); + + for (int i = 0; i < kTotalInsertions; ++i) { + uint32_t basePriority = rng(); + uint32_t priority = basePriority % kPriorityLevels; + uint8_t *data = nullptr; + if (queue.tryPut(basePriority, priority, data, kMaxLength, priority)) { + frequency[priority]++; + } + } + + for (int i = 0; i < kTotalInsertions; ++i) { + uint32_t jobId = 0; + uint32_t jobType = 0; + uint32_t length = 0; + uint8_t *retrievedData = nullptr; + + std::shuffle(priorityLevelsToCheck.begin(), priorityLevelsToCheck.end(), rng); + queue.getUsingCustomPriority(&jobId, &jobType, &retrievedData, &length, + priorityLevelsToCheck); + + EXPECT_EQ(length, kMaxLength); + EXPECT_EQ(retrievedData, nullptr); + + auto basePriority = jobId; + auto priority = jobType; + EXPECT_EQ(priority, basePriority % kPriorityLevels); + + // All higher priorities (given the custom priority order) should have been consumed already + for (uint32_t j = 0; j < kPriorityLevels; ++j) { + if (priorityLevelsToCheck[j] == priority) { break; } + EXPECT_EQ(frequency[priorityLevelsToCheck[j]], 0); + } + + EXPECT_GT(frequency[priority], 0); + frequency[priority]--; + } + + EXPECT_TRUE(queue.isEmpty()); +} diff --git a/src/data/sfschunkserver.cfg.in b/src/data/sfschunkserver.cfg.in index c8e1107a2..28b690cf1 100644 --- a/src/data/sfschunkserver.cfg.in +++ b/src/data/sfschunkserver.cfg.in @@ -277,3 +277,11 @@ ## Example: /etc/saunafs/ssl/ca.crt ## (There is no default) # TLS_CA_CERT_FILE = + +## Define IO jobs, i.e reads and writes from drives, prioritization scheme. +## The available modes are: +## - 'FIFO': prioritizes the IO jobs first enqueued. +## - 'SWITCH': alternates read and write jobs (interleaving) to prevent one class of IO from +## starving the other. +## Not reloadable. +# IO_PRIORITY_MODE = FIFO diff --git a/src/mount/writedata.cc b/src/mount/writedata.cc index 041f2d5d3..b879a1ede 100644 --- a/src/mount/writedata.cc +++ b/src/mount/writedata.cc @@ -837,7 +837,7 @@ void write_data_init(uint32_t cachesize, uint32_t retries, uint32_t workers, writeStatsInit(); - jobsQueue = std::make_unique(1, 0, deleterByType); + jobsQueue = std::make_unique(0, deleterByType); pthread_attr_init(&thattr); pthread_attr_setstacksize(&thattr, 0x100000); @@ -1970,7 +1970,7 @@ void write_data_init(uint32_t cachesize, uint32_t retries, uint32_t workers, writeStatsInit(); - jobsQueue = std::make_unique(1, 0, deleterByType); + jobsQueue = std::make_unique(0, deleterByType); pthread_attr_init(&thattr); pthread_attr_setstacksize(&thattr, 0x100000); diff --git a/tests/test_suites/SanityChecks/test_simultaneous_write_read.sh b/tests/test_suites/SanityChecks/test_simultaneous_write_read.sh deleted file mode 100644 index 0eda5b88f..000000000 --- a/tests/test_suites/SanityChecks/test_simultaneous_write_read.sh +++ /dev/null @@ -1,17 +0,0 @@ -timeout_set 3 minutes - -CHUNKSERVERS=4 -USE_RAMDISK=YES -MOUNT_EXTRA_CONFIG="cacheexpirationtime=0" -setup_local_empty_saunafs info - -cd ${info[mount0]} - -mkdir dir_std - -for count in {1..2}; do - touch dir_std/file - FILE_SIZE=100000000 BLOCK_SIZE=2135 file-generate dir_std/file & - file-validate-growing dir_std/file 100000000 - rm dir_std/file -done diff --git a/tests/test_suites/ShortSystemTests/test_simultaneous_write_read.sh b/tests/test_suites/ShortSystemTests/test_simultaneous_write_read.sh new file mode 100644 index 000000000..0511923d3 --- /dev/null +++ b/tests/test_suites/ShortSystemTests/test_simultaneous_write_read.sh @@ -0,0 +1 @@ +source test_suites/TestTemplates/test_simultaneous_write_read.inc diff --git a/tests/test_suites/ShortSystemTests/test_simultaneous_write_read_switch_mode.sh b/tests/test_suites/ShortSystemTests/test_simultaneous_write_read_switch_mode.sh new file mode 100644 index 000000000..ec252ff04 --- /dev/null +++ b/tests/test_suites/ShortSystemTests/test_simultaneous_write_read_switch_mode.sh @@ -0,0 +1 @@ +IO_PRIORITY_MODE="SWITCH" source test_suites/TestTemplates/test_simultaneous_write_read.inc diff --git a/tests/test_suites/TestTemplates/test_simultaneous_write_read.inc b/tests/test_suites/TestTemplates/test_simultaneous_write_read.inc new file mode 100644 index 000000000..21203e031 --- /dev/null +++ b/tests/test_suites/TestTemplates/test_simultaneous_write_read.inc @@ -0,0 +1,24 @@ +timeout_set 3 minutes +assert_program_installed fio + +io_priority_mode=${IO_PRIORITY_MODE:-FIFO} + +CHUNKSERVERS=4 \ + USE_RAMDISK=YES \ + MOUNT_EXTRA_CONFIG="cacheexpirationtime=0" \ + CHUNKSERVER_EXTRA_CONFIG="IO_PRIORITY_MODE=${io_priority_mode}" +setup_local_empty_saunafs info + +cd ${info[mount0]} + +mkdir dir_std + +for count in {1..2}; do + touch dir_std/file + FILE_SIZE=100000000 BLOCK_SIZE=2135 file-generate dir_std/file & + file-validate-growing dir_std/file 100000000 + rm dir_std/file +done + +assert_success fio --directory="${info[mount0]}/dir_std" --name=simultaneous_rw --rw=rw --bs=64K \ + --size=400M --numjobs=4 --direct=1 --group_reporting --ioengine=libaio