2626#include " common/chunk_part_type.h"
2727#include " common/chunk_type_with_address.h"
2828#include " common/massert.h"
29+ #include " common/output_packet.h"
2930#include " common/pcqueue.h"
3031#include " devtools/TracePrinter.h"
3132#include " devtools/request_log.h"
@@ -100,12 +101,37 @@ JobPool::~JobPool() {
100101 for (auto &listenerInfo : listenerInfos_) { close (listenerInfo.notifierFD ); }
101102}
102103
104+ uint32_t JobPool::addJobIfNotLocked (ChunkWithType chunkWithType, ChunkOperation operation,
105+ JobCallback callback, void *extra,
106+ ProcessJobCallback processJob, uint32_t listenerId) {
107+ std::unique_lock lockedChunkLock (chunkToJobReplyMapMutex_);
108+ auto it = chunkToJobReplyMap_.find (chunkWithType);
109+ if (it != chunkToJobReplyMap_.end () && it->second .writeInitReceived ) {
110+ // Chunk is locked, store the job to be added later
111+ auto &lockedChunkData = it->second ;
112+
113+ lockedChunkData.pendingAddJobs .emplace_back (
114+ [this , operation, extra, listenerId, callback = std::move (callback),
115+ processJob = std::move (processJob)]() mutable -> uint32_t {
116+ return addJob (operation, std::move (callback), extra, std::move (processJob),
117+ listenerId);
118+ });
119+
120+ return lockedChunkData.lockJobId ; // Lock guard is released here
121+ }
122+ lockedChunkLock.unlock (); // Release the lock before adding the job to avoid deadlocks
123+
124+ // Chunk is not locked, add the job immediately
125+ return JobPool::addJob (operation, std::move (callback), extra, std::move (processJob),
126+ listenerId);
127+ }
128+
103129uint32_t JobPool::addJob (ChunkOperation operation, JobCallback callback, void *extra,
104130 ProcessJobCallback processJob, uint32_t listenerId) {
105131 // Check if the listenerId is valid
106132 if (listenerId >= listenerInfos_.size ()) {
107- safs::log_warn (" JobPool: addJob : Invalid listenerId {} for operation {}, resetting to 0" ,
108- listenerId, static_cast <int >(operation));
133+ safs::log_warn (" JobPool: {} : Invalid listenerId {} for operation {}, resetting to 0" ,
134+ __func__, listenerId, static_cast <int >(operation));
109135 listenerId = 0 ; // Reset to the first listener
110136 }
111137
@@ -127,6 +153,29 @@ uint32_t JobPool::addJob(ChunkOperation operation, JobCallback callback, void *e
127153 return jobId;
128154}
129155
156+ uint32_t JobPool::addLockJob (JobCallback callback, void *extra, uint32_t listenerId) {
157+ // Check if the listenerId is valid
158+ if (listenerId >= listenerInfos_.size ()) {
159+ safs::log_warn (" JobPool: {}: Invalid listenerId {} for operation LOCK, resetting to 0" ,
160+ __func__, listenerId);
161+ listenerId = 0 ; // Reset to the first listener
162+ }
163+
164+ auto &listenerInfo = listenerInfos_[listenerId];
165+ std::unique_lock lock (listenerInfo.jobsMutex );
166+ uint32_t jobId = listenerInfo.nextJobId ++;
167+ auto job = std::make_unique<Job>();
168+ job->jobId = jobId;
169+ job->callback = std::move (callback);
170+ // No processJob for lock jobs, as they are just markers for locked chunks
171+ job->extra = extra;
172+ job->state = JobPool::State::Enabled;
173+ job->listenerId = listenerId;
174+ listenerInfo.jobHash [jobId] = std::move (job);
175+ // Not an actual job, but a marker for a locked chunk, so never inserted into the job queue.
176+ return jobId;
177+ }
178+
130179uint32_t JobPool::getJobCount () const {
131180 TRACETHIS ();
132181 return jobsQueue->elements ();
@@ -531,8 +580,9 @@ uint32_t job_delete(JobPool &jobPool, JobPool::JobCallback callback, void *extra
531580 JobPool::ProcessJobCallback processJob = [=]() -> uint8_t {
532581 return hddInternalDelete (chunkId, chunkVersion, chunkType);
533582 };
534- return jobPool.addJob (JobPool::ChunkOperation::Delete, std::move (callback), extra, processJob,
535- listenerId);
583+ return jobPool.addJobIfNotLocked (ChunkWithType{chunkId, chunkType},
584+ JobPool::ChunkOperation::Delete, std::move (callback), extra,
585+ processJob, listenerId);
536586}
537587
538588uint32_t job_create (JobPool &jobPool, JobPool::JobCallback callback, void *extra, uint64_t chunkId,
@@ -551,8 +601,9 @@ uint32_t job_version(JobPool &jobPool, const JobPool::JobCallback &callback, voi
551601 JobPool::ProcessJobCallback processJob = [=]() -> uint8_t {
552602 return hddInternalUpdateVersion (chunkId, chunkVersion, newChunkVersion, chunkType);
553603 };
554- return jobPool.addJob (JobPool::ChunkOperation::ChangeVersion, callback, extra, processJob,
555- listenerId);
604+ return jobPool.addJobIfNotLocked (ChunkWithType{chunkId, chunkType},
605+ JobPool::ChunkOperation::ChangeVersion, callback, extra,
606+ processJob, listenerId);
556607 }
557608 return job_invalid (jobPool, callback, extra, listenerId);
558609}
@@ -564,8 +615,9 @@ uint32_t job_truncate(JobPool &jobPool, const JobPool::JobCallback &callback, vo
564615 JobPool::ProcessJobCallback processJob = [=]() -> uint8_t {
565616 return hddTruncate (chunkId, chunkVersion, chunkType, newChunkVersion, length);
566617 };
567- return jobPool.addJob (JobPool::ChunkOperation::Truncate, callback, extra, processJob,
568- listenerId);
618+ return jobPool.addJobIfNotLocked (ChunkWithType{chunkId, chunkType},
619+ JobPool::ChunkOperation::Truncate, callback, extra,
620+ processJob, listenerId);
569621 }
570622 return job_invalid (jobPool, callback, extra, listenerId);
571623}
@@ -579,8 +631,9 @@ uint32_t job_duplicate(JobPool &jobPool, const JobPool::JobCallback &callback, v
579631 return hddDuplicate (chunkId, chunkVersion, newChunkVersion, chunkType, chunkIdCopy,
580632 chunkVersionCopy);
581633 };
582- return jobPool.addJob (JobPool::ChunkOperation::Duplicate, callback, extra, processJob,
583- listenerId);
634+ return jobPool.addJobIfNotLocked (ChunkWithType{chunkId, chunkType},
635+ JobPool::ChunkOperation::Duplicate, callback, extra,
636+ processJob, listenerId);
584637 }
585638 return job_invalid (jobPool, callback, extra, listenerId);
586639}
@@ -594,8 +647,103 @@ uint32_t job_duptrunc(JobPool &jobPool, const JobPool::JobCallback &callback, vo
594647 return hddDuplicateTruncate (chunkId, chunkVersion, newChunkVersion, chunkType,
595648 chunkIdCopy, chunkVersionCopy, length);
596649 };
597- return jobPool.addJob (JobPool::ChunkOperation::DuplicateTruncate, callback, extra,
598- processJob, listenerId);
650+ return jobPool.addJobIfNotLocked (ChunkWithType{chunkId, chunkType},
651+ JobPool::ChunkOperation::DuplicateTruncate, callback,
652+ extra, processJob, listenerId);
599653 }
600654 return job_invalid (jobPool, callback, extra, listenerId);
601655}
656+
657+ bool JobPool::startChunkLock (const JobPool::JobCallback &callback, void *packet, uint64_t chunkId,
658+ ChunkPartType chunkType, uint32_t listenerId) {
659+ std::unique_lock lock (chunkToJobReplyMapMutex_);
660+ if (chunkToJobReplyMap_.find (ChunkWithType{chunkId, chunkType}) != chunkToJobReplyMap_.end ()) {
661+ lock.unlock (); // Release the lock before logging to avoid extra contention
662+
663+ safs::log_warn (
664+ " {}: Chunk lock job already exists for chunkId {:016X}, "
665+ " chunkType {}. Overtaking lock." ,
666+ __func__, chunkId, chunkType.toString ());
667+ return false ;
668+ }
669+
670+ chunkToJobReplyMap_[ChunkWithType{chunkId, chunkType}] =
671+ LockedChunkData (addLockJob (callback, packet, listenerId), listenerId);
672+ return true ;
673+ }
674+
675+ void JobPool::enforceChunkLock (uint64_t chunkId, ChunkPartType chunkType) {
676+ std::unique_lock lock (chunkToJobReplyMapMutex_);
677+ auto it = chunkToJobReplyMap_.find (ChunkWithType{chunkId, chunkType});
678+ if (it == chunkToJobReplyMap_.end ()) {
679+ lock.unlock (); // Release the lock before logging to avoid extra contention
680+
681+ // Master was not waiting for this lock, just log and return
682+ safs::log_warn (" {}: No chunk lock job found for chunkId {:016X}, chunkType {}. Ignoring." ,
683+ __func__, chunkId, chunkType.toString ());
684+ return ;
685+ }
686+
687+ it->second .writeInitReceived = true ;
688+ }
689+
690+ void JobPool::endChunkLock (uint64_t chunkId, ChunkPartType chunkType, uint8_t status) {
691+ uint32_t lockJobId;
692+ uint32_t listenerId;
693+ std::vector<AddJobFunc> pendingAddJobs;
694+
695+ std::unique_lock lock (chunkToJobReplyMapMutex_);
696+ auto it = chunkToJobReplyMap_.find (ChunkWithType{chunkId, chunkType});
697+ if (it == chunkToJobReplyMap_.end ()) {
698+ lock.unlock (); // Release the lock before logging to avoid extra contention
699+
700+ // Master was not waiting for this lock, just log and return
701+ safs::log_warn (" {}: No chunk lock job found for chunkId {:016X}, chunkType {}. Ignoring." ,
702+ __func__, chunkId, chunkType.toString ());
703+ return ;
704+ }
705+
706+ lockJobId = it->second .lockJobId ;
707+ listenerId = it->second .listenerId ;
708+ pendingAddJobs = std::move (it->second .pendingAddJobs );
709+ chunkToJobReplyMap_.erase (it);
710+ lock.unlock (); // Release the lock before processing pending jobs to avoid extra contention
711+
712+ for (const auto &addJobFunc : pendingAddJobs) { addJobFunc (); }
713+
714+ sendStatus (lockJobId, status, listenerId);
715+ }
716+
717+ void JobPool::eraseChunkLock (uint64_t chunkId, ChunkPartType chunkType) {
718+ uint32_t lockJobId;
719+ uint32_t listenerId;
720+ std::vector<AddJobFunc> pendingAddJobs;
721+
722+ std::unique_lock lock (chunkToJobReplyMapMutex_);
723+ auto it = chunkToJobReplyMap_.find (ChunkWithType{chunkId, chunkType});
724+ if (it == chunkToJobReplyMap_.end ()) {
725+ lock.unlock (); // Release the lock before logging to avoid extra contention
726+
727+ safs::log_warn (" {}: No chunk lock job found for chunkId {:016X}, chunkType {}. Ignoring." ,
728+ __func__, chunkId, chunkType.toString ());
729+ return ;
730+ }
731+
732+ lockJobId = it->second .lockJobId ;
733+ listenerId = it->second .listenerId ;
734+ pendingAddJobs = std::move (it->second .pendingAddJobs );
735+ chunkToJobReplyMap_.erase (it);
736+ lock.unlock (); // Release the lock before processing pending jobs to avoid extra contention
737+
738+ for (const auto &addJobFunc : pendingAddJobs) { addJobFunc (); }
739+
740+ // Remove the lock job and the related packet
741+ auto &listenerInfo = listenerInfos_[listenerId];
742+ std::unique_lock jobsUniqueLock (listenerInfo.jobsMutex );
743+ auto lockJobIterator = listenerInfo.jobHash .find (lockJobId);
744+ if (lockJobIterator != listenerInfo.jobHash .end ()) {
745+ auto *outputPacket = reinterpret_cast <OutputPacket *>(lockJobIterator->second ->extra );
746+ if (outputPacket) { delete outputPacket; }
747+ listenerInfo.jobHash .erase (lockJobIterator);
748+ }
749+ }
0 commit comments