Skip to content

Commit 0c1055b

Browse files
committed
feat: chunkserver side chunk lock
The current implementation of the chunk locking in the system relies only in the client responses when finishing write and truncate operations. This commit targets including also chunkserver side replies after receiving an actual stream of write operations to decide whether the chunk is locked or not. This is not useful in the current state of code and must not affect the behavior of the system because the client responses to the master in order to unlock the chunk happen after the data is already in the chunkserver or there is some error. The idea of the change is to keep track in each of the chunk parts of whether it is expected to be written at the time. Please note this affects all the writes coming from the client and some truncates. Those cases trigger one these functions: - chunk_create_operation - chunk_increase_version_operation - chunk_lock_operation - chunk_duplicate_operation which can be traced back to the chunk_multi_modify function. After checking some conditions (specially if the CS version supports this feature) the new packets arrive to the CS and in most of the cases the write operation needs to wait for its responses, and the chunk parts are considered being written. In the CS side, the locking is handled by the master's main JobPool. It can start, enforce, end and erase chunk lock jobs. The locked chunks are special the way that after ending the write operations on it, the master receives the status of the write operations that was not told to the clients, so far it is always OK because everything is told to the clients. Master jobs on enforced locked chunks will have to wait until the chunk is released, i.e finish writing. Back again in the master side, it handles disconnection of parts being written and errors on those writes the same way: increasing version when the chance appears. Side changes: - refactor chunk_multi_modify and chunk_multi_truncate functions. - change the effective grace time when starting and trying to create chunk before responding "no space" from 600s to 60s. - add documentation to some members of the Chunk class deeply involved in the change. - in the client side, always talk to the chunkservers when writing. - fix the test_read_corrupted_files. Signed-off-by: Dave <dave@leil.io>
1 parent 5a56195 commit 0c1055b

23 files changed

+1305
-290
lines changed

src/chunkserver/bgjobs.cc

Lines changed: 160 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "common/chunk_part_type.h"
2727
#include "common/chunk_type_with_address.h"
2828
#include "common/massert.h"
29+
#include "common/output_packet.h"
2930
#include "common/pcqueue.h"
3031
#include "devtools/TracePrinter.h"
3132
#include "devtools/request_log.h"
@@ -100,12 +101,37 @@ JobPool::~JobPool() {
100101
for (auto &listenerInfo : listenerInfos_) { close(listenerInfo.notifierFD); }
101102
}
102103

104+
uint32_t JobPool::addJobIfNotLocked(ChunkWithType chunkWithType, ChunkOperation operation,
105+
JobCallback callback, void *extra,
106+
ProcessJobCallback processJob, uint32_t listenerId) {
107+
std::unique_lock lockedChunkLock(chunkToJobReplyMapMutex_);
108+
auto it = chunkToJobReplyMap_.find(chunkWithType);
109+
if (it != chunkToJobReplyMap_.end() && it->second.writeInitReceived) {
110+
// Chunk is locked, store the job to be added later
111+
auto &lockedChunkData = it->second;
112+
113+
lockedChunkData.pendingAddJobs.emplace_back(
114+
[this, operation, extra, listenerId, callback = std::move(callback),
115+
processJob = std::move(processJob)]() mutable -> uint32_t {
116+
return addJob(operation, std::move(callback), extra, std::move(processJob),
117+
listenerId);
118+
});
119+
120+
return lockedChunkData.lockJobId; // Lock guard is released here
121+
}
122+
lockedChunkLock.unlock(); // Release the lock before adding the job to avoid deadlocks
123+
124+
// Chunk is not locked, add the job immediately
125+
return JobPool::addJob(operation, std::move(callback), extra, std::move(processJob),
126+
listenerId);
127+
}
128+
103129
uint32_t JobPool::addJob(ChunkOperation operation, JobCallback callback, void *extra,
104130
ProcessJobCallback processJob, uint32_t listenerId) {
105131
// Check if the listenerId is valid
106132
if (listenerId >= listenerInfos_.size()) {
107-
safs::log_warn("JobPool: addJob: Invalid listenerId {} for operation {}, resetting to 0",
108-
listenerId, static_cast<int>(operation));
133+
safs::log_warn("JobPool: {}: Invalid listenerId {} for operation {}, resetting to 0",
134+
__func__, listenerId, static_cast<int>(operation));
109135
listenerId = 0; // Reset to the first listener
110136
}
111137

@@ -127,6 +153,29 @@ uint32_t JobPool::addJob(ChunkOperation operation, JobCallback callback, void *e
127153
return jobId;
128154
}
129155

156+
uint32_t JobPool::addLockJob(JobCallback callback, void *extra, uint32_t listenerId) {
157+
// Check if the listenerId is valid
158+
if (listenerId >= listenerInfos_.size()) {
159+
safs::log_warn("JobPool: {}: Invalid listenerId {} for operation LOCK, resetting to 0",
160+
__func__, listenerId);
161+
listenerId = 0; // Reset to the first listener
162+
}
163+
164+
auto &listenerInfo = listenerInfos_[listenerId];
165+
std::unique_lock lock(listenerInfo.jobsMutex);
166+
uint32_t jobId = listenerInfo.nextJobId++;
167+
auto job = std::make_unique<Job>();
168+
job->jobId = jobId;
169+
job->callback = std::move(callback);
170+
// No processJob for lock jobs, as they are just markers for locked chunks
171+
job->extra = extra;
172+
job->state = JobPool::State::Enabled;
173+
job->listenerId = listenerId;
174+
listenerInfo.jobHash[jobId] = std::move(job);
175+
// Not an actual job, but a marker for a locked chunk, so never inserted into the job queue.
176+
return jobId;
177+
}
178+
130179
uint32_t JobPool::getJobCount() const {
131180
TRACETHIS();
132181
return jobsQueue->elements();
@@ -531,8 +580,9 @@ uint32_t job_delete(JobPool &jobPool, JobPool::JobCallback callback, void *extra
531580
JobPool::ProcessJobCallback processJob = [=]() -> uint8_t {
532581
return hddInternalDelete(chunkId, chunkVersion, chunkType);
533582
};
534-
return jobPool.addJob(JobPool::ChunkOperation::Delete, std::move(callback), extra, processJob,
535-
listenerId);
583+
return jobPool.addJobIfNotLocked(ChunkWithType{chunkId, chunkType},
584+
JobPool::ChunkOperation::Delete, std::move(callback), extra,
585+
processJob, listenerId);
536586
}
537587

538588
uint32_t job_create(JobPool &jobPool, JobPool::JobCallback callback, void *extra, uint64_t chunkId,
@@ -551,8 +601,9 @@ uint32_t job_version(JobPool &jobPool, const JobPool::JobCallback &callback, voi
551601
JobPool::ProcessJobCallback processJob = [=]() -> uint8_t {
552602
return hddInternalUpdateVersion(chunkId, chunkVersion, newChunkVersion, chunkType);
553603
};
554-
return jobPool.addJob(JobPool::ChunkOperation::ChangeVersion, callback, extra, processJob,
555-
listenerId);
604+
return jobPool.addJobIfNotLocked(ChunkWithType{chunkId, chunkType},
605+
JobPool::ChunkOperation::ChangeVersion, callback, extra,
606+
processJob, listenerId);
556607
}
557608
return job_invalid(jobPool, callback, extra, listenerId);
558609
}
@@ -564,8 +615,9 @@ uint32_t job_truncate(JobPool &jobPool, const JobPool::JobCallback &callback, vo
564615
JobPool::ProcessJobCallback processJob = [=]() -> uint8_t {
565616
return hddTruncate(chunkId, chunkVersion, chunkType, newChunkVersion, length);
566617
};
567-
return jobPool.addJob(JobPool::ChunkOperation::Truncate, callback, extra, processJob,
568-
listenerId);
618+
return jobPool.addJobIfNotLocked(ChunkWithType{chunkId, chunkType},
619+
JobPool::ChunkOperation::Truncate, callback, extra,
620+
processJob, listenerId);
569621
}
570622
return job_invalid(jobPool, callback, extra, listenerId);
571623
}
@@ -579,8 +631,9 @@ uint32_t job_duplicate(JobPool &jobPool, const JobPool::JobCallback &callback, v
579631
return hddDuplicate(chunkId, chunkVersion, newChunkVersion, chunkType, chunkIdCopy,
580632
chunkVersionCopy);
581633
};
582-
return jobPool.addJob(JobPool::ChunkOperation::Duplicate, callback, extra, processJob,
583-
listenerId);
634+
return jobPool.addJobIfNotLocked(ChunkWithType{chunkId, chunkType},
635+
JobPool::ChunkOperation::Duplicate, callback, extra,
636+
processJob, listenerId);
584637
}
585638
return job_invalid(jobPool, callback, extra, listenerId);
586639
}
@@ -594,8 +647,103 @@ uint32_t job_duptrunc(JobPool &jobPool, const JobPool::JobCallback &callback, vo
594647
return hddDuplicateTruncate(chunkId, chunkVersion, newChunkVersion, chunkType,
595648
chunkIdCopy, chunkVersionCopy, length);
596649
};
597-
return jobPool.addJob(JobPool::ChunkOperation::DuplicateTruncate, callback, extra,
598-
processJob, listenerId);
650+
return jobPool.addJobIfNotLocked(ChunkWithType{chunkId, chunkType},
651+
JobPool::ChunkOperation::DuplicateTruncate, callback,
652+
extra, processJob, listenerId);
599653
}
600654
return job_invalid(jobPool, callback, extra, listenerId);
601655
}
656+
657+
bool JobPool::startChunkLock(const JobPool::JobCallback &callback, void *packet, uint64_t chunkId,
658+
ChunkPartType chunkType, uint32_t listenerId) {
659+
std::unique_lock lock(chunkToJobReplyMapMutex_);
660+
if (chunkToJobReplyMap_.find(ChunkWithType{chunkId, chunkType}) != chunkToJobReplyMap_.end()) {
661+
lock.unlock(); // Release the lock before logging to avoid extra contention
662+
663+
safs::log_warn(
664+
"{}: Chunk lock job already exists for chunkId {:016X}, chunkType {}. Treating request "
665+
"as retransmission; not adding a new lock job.",
666+
__func__, chunkId, chunkType.toString());
667+
return false;
668+
}
669+
670+
chunkToJobReplyMap_[ChunkWithType{chunkId, chunkType}] =
671+
LockedChunkData(addLockJob(callback, packet, listenerId), listenerId);
672+
return true;
673+
}
674+
675+
void JobPool::enforceChunkLock(uint64_t chunkId, ChunkPartType chunkType) {
676+
std::unique_lock lock(chunkToJobReplyMapMutex_);
677+
auto it = chunkToJobReplyMap_.find(ChunkWithType{chunkId, chunkType});
678+
if (it == chunkToJobReplyMap_.end()) {
679+
lock.unlock(); // Release the lock before logging to avoid extra contention
680+
681+
// Master was not waiting for this lock, just log and return
682+
safs::log_warn("{}: No chunk lock job found for chunkId {:016X}, chunkType {}. Ignoring.",
683+
__func__, chunkId, chunkType.toString());
684+
return;
685+
}
686+
687+
it->second.writeInitReceived = true;
688+
}
689+
690+
void JobPool::endChunkLock(uint64_t chunkId, ChunkPartType chunkType, uint8_t status) {
691+
uint32_t lockJobId;
692+
uint32_t listenerId;
693+
std::vector<AddJobFunc> pendingAddJobs;
694+
695+
std::unique_lock lock(chunkToJobReplyMapMutex_);
696+
auto it = chunkToJobReplyMap_.find(ChunkWithType{chunkId, chunkType});
697+
if (it == chunkToJobReplyMap_.end()) {
698+
lock.unlock(); // Release the lock before logging to avoid extra contention
699+
700+
// Master was not waiting for this lock, just log and return
701+
safs::log_warn("{}: No chunk lock job found for chunkId {:016X}, chunkType {}. Ignoring.",
702+
__func__, chunkId, chunkType.toString());
703+
return;
704+
}
705+
706+
lockJobId = it->second.lockJobId;
707+
listenerId = it->second.listenerId;
708+
pendingAddJobs = std::move(it->second.pendingAddJobs);
709+
chunkToJobReplyMap_.erase(it);
710+
lock.unlock(); // Release the lock before processing pending jobs to avoid extra contention
711+
712+
for (const auto &addJobFunc : pendingAddJobs) { addJobFunc(); }
713+
714+
sendStatus(lockJobId, status, listenerId);
715+
}
716+
717+
void JobPool::eraseChunkLock(uint64_t chunkId, ChunkPartType chunkType) {
718+
uint32_t lockJobId;
719+
uint32_t listenerId;
720+
std::vector<AddJobFunc> pendingAddJobs;
721+
722+
std::unique_lock lock(chunkToJobReplyMapMutex_);
723+
auto it = chunkToJobReplyMap_.find(ChunkWithType{chunkId, chunkType});
724+
if (it == chunkToJobReplyMap_.end()) {
725+
lock.unlock(); // Release the lock before logging to avoid extra contention
726+
727+
safs::log_warn("{}: No chunk lock job found for chunkId {:016X}, chunkType {}. Ignoring.",
728+
__func__, chunkId, chunkType.toString());
729+
return;
730+
}
731+
732+
lockJobId = it->second.lockJobId;
733+
listenerId = it->second.listenerId;
734+
pendingAddJobs = std::move(it->second.pendingAddJobs);
735+
chunkToJobReplyMap_.erase(it);
736+
lock.unlock(); // Release the lock before processing pending jobs to avoid extra contention
737+
738+
for (const auto &addJobFunc : pendingAddJobs) { addJobFunc(); }
739+
740+
// Remove the lock job and the related packet
741+
auto &listenerInfo = listenerInfos_[listenerId];
742+
std::unique_lock jobsUniqueLock(listenerInfo.jobsMutex);
743+
auto lockJobIterator = listenerInfo.jobHash.find(lockJobId);
744+
if (lockJobIterator != listenerInfo.jobHash.end()) {
745+
auto *outputPacket = reinterpret_cast<OutputPacket *>(lockJobIterator->second->extra);
746+
if (outputPacket) { delete outputPacket; }
747+
listenerInfo.jobHash.erase(lockJobIterator);
748+
}
749+
}

src/chunkserver/bgjobs.h

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
#include "common/platform.h"
2424

25+
#include "chunkserver-common/chunk_map.h"
2526
#include "chunkserver/io_buffers.h"
2627
#include "common/pcqueue.h"
2728

@@ -87,6 +88,8 @@ class JobPool {
8788
/// @return The status of the job processing.
8889
using ProcessJobCallback = std::function<uint8_t()>;
8990

91+
using AddJobFunc = std::function<uint32_t()>;
92+
9093
/// @brief Constructor for JobPool.
9194
///
9295
/// @param name Human readable name for this pool, useful for debugging.
@@ -112,6 +115,27 @@ class JobPool {
112115
uint32_t addJob(ChunkOperation operation, JobCallback callback, void *extra,
113116
ProcessJobCallback processJob, uint32_t listenerId = 0);
114117

118+
/// @brief Adds a job to the JobPool if the chunk is not locked.
119+
/// If the chunk is locked, the job will be stored and added once the lock is released.
120+
/// @param chunkWithType The chunk and its type associated with the job.
121+
/// @param operation The type of operation to be performed on the chunk.
122+
/// @param callback The callback function to be called upon job completion.
123+
/// @param extra Additional data to be passed to the callback.
124+
/// @param processJob The callback function to process the job.
125+
/// @param listenerId The ID of the listener associated with the job.
126+
/// @return The ID of the added job, or the lock job ID if the chunk is locked.
127+
uint32_t addJobIfNotLocked(ChunkWithType chunkWithType, ChunkOperation operation,
128+
JobCallback callback, void *extra, ProcessJobCallback processJob,
129+
uint32_t listenerId = 0);
130+
131+
/// @brief Adds a lock job to the JobPool for a specific chunk and type.
132+
/// This function is used to create a lock job that will be associated with a locked chunk.
133+
/// @param callback The callback function to be called when the lock is released.
134+
/// @param extra Additional data to be passed to the callback.
135+
/// @param listenerId The ID of the listener associated with the job.
136+
/// @return The ID of the added lock job.
137+
uint32_t addLockJob(JobCallback callback, void *extra, uint32_t listenerId = 0);
138+
115139
/// @brief Gets the number of jobs in the JobPool.
116140
uint32_t getJobCount() const;
117141

@@ -155,6 +179,45 @@ class JobPool {
155179
void changeCallback(std::list<uint32_t> &jobIds, const JobCallback &callback,
156180
uint32_t listenerId = 0);
157181

182+
/// @brief Starts a chunk lock job for a specific chunk and type.
183+
/// This function is triggered when the master server sends a chunk lock request for a chunk
184+
/// that is not currently locked. It adds a lock job to the JobPool and associates it with the
185+
/// locked chunk. If the chunk is already locked, it returns false and does not add a new job,
186+
/// as the existing lock job will be responsible for handling the lock.
187+
/// @param callback The callback function to be called when the lock is released.
188+
/// @param packet The packet to be sent to the master server with the write end status.
189+
/// @param chunkId The ID of the chunk to be locked.
190+
/// @param chunkType The type of the chunk to be locked.
191+
/// @param listenerId The ID of the listener associated with the job.
192+
/// @return true if the lock job was successfully added, false if the chunk is already locked.
193+
bool startChunkLock(const JobPool::JobCallback &callback, void *packet, uint64_t chunkId,
194+
ChunkPartType chunkType, uint32_t listenerId = 0);
195+
196+
/// @brief Enforces a chunk lock for a specific chunk and type.
197+
/// This function is called when the client sends a write initialization for a chunk that is
198+
/// already locked, to ensure that the lock is properly enforced. From now on, the master
199+
/// requests on the locked chunk will wait for the lock to be released before being processed.
200+
/// @param chunkId The ID of the chunk to enforce the lock on.
201+
/// @param chunkType The type of the chunk to enforce the lock on.
202+
void enforceChunkLock(uint64_t chunkId, ChunkPartType chunkType);
203+
204+
/// @brief Ends a chunk lock for a specific chunk and type.
205+
/// This function is called when cleaning up the write operation on the locked chunk, to end the
206+
/// lock and allow any pending jobs to be added and processed. It sends the write end status to
207+
/// the master server and removes the lock job from the JobPool.
208+
/// @param chunkId The ID of the chunk to end the lock on.
209+
/// @param chunkType The type of the chunk to end the lock on.
210+
/// @param status The status to be sent to the master server for the write end.
211+
void endChunkLock(uint64_t chunkId, ChunkPartType chunkType, uint8_t status);
212+
213+
/// @brief Erases a chunk lock for a specific chunk and type.
214+
/// This function is called when the master server sends a chunk unlock request for a chunk that
215+
/// is currently locked. It removes the lock job from the JobPool and allows any pending jobs
216+
/// that were waiting for the lock to be released to be added and processed.
217+
/// @param chunkId The ID of the chunk to erase the lock on.
218+
/// @param chunkType The type of the chunk to erase the lock on.
219+
void eraseChunkLock(uint64_t chunkId, ChunkPartType chunkType);
220+
158221
private:
159222
/// @brief Represents a job in the JobPool.
160223
struct Job {
@@ -196,6 +259,26 @@ class JobPool {
196259
uint32_t nextJobId; /// Next job ID to be assigned.
197260
};
198261

262+
/// @brief Structure to hold information about a locked chunk.
263+
struct LockedChunkData {
264+
uint32_t lockJobId = 0; /// The ID of the lock job associated with the locked chunk.
265+
uint32_t listenerId = 0; /// The ID of the listener associated with the locked chunk.
266+
/// A vector of functions to add pending jobs to be executed once the lock is released.
267+
std::vector<AddJobFunc> pendingAddJobs{};
268+
/// Flag to indicate if the write initialization has been received for the locked chunk.
269+
bool writeInitReceived = false;
270+
271+
LockedChunkData(uint32_t lockJobId, uint32_t listenerId)
272+
: lockJobId(lockJobId), listenerId(listenerId) {}
273+
274+
LockedChunkData() = default;
275+
};
276+
277+
/// Mutex to protect access to the chunkToJobReplyMap_.
278+
std::mutex chunkToJobReplyMapMutex_;
279+
/// Map to associate locked chunks with their corresponding lock job and pending jobs.
280+
std::unordered_map<ChunkWithType, LockedChunkData, KeyOperations, KeyOperations>
281+
chunkToJobReplyMap_;
199282
std::vector<ListenerInfo> listenerInfos_; /// Vector of listener information.
200283
std::string name_; /// Human readable id of the JobPool.
201284
uint8_t workers; /// Number of worker threads in the pool.

0 commit comments

Comments
 (0)