Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ if(ENABLE_CCACHE AND CCACHE_FOUND)
message(STATUS "Using ccache")
endif()

set(DEFAULT_MIN_VERSION "5.7.0")
set(DEFAULT_MIN_VERSION "5.8.0")

include(GNUInstallDirs)

Expand Down
4 changes: 4 additions & 0 deletions doc/sfsmaster.cfg.5.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ for TLS connections (there is no default value).
Path to the trusted CA certificate which is used to authenticate
the TLS connection (there is no default value).

*USE_CHUNKSERVER_SIDE_CHUNK_LOCK (EXPERIMENTAL)*:: When set to 1, enables sending
chunk part lock messages to the chunkservers. This can be useful to track down which
chunk parts are currently being written. Reloadable (default: 0).

*EMPTY_RESERVED_FILES_PERIOD_MSECONDS (EXPERIMENTAL)*::
Interval for periodic cleaning of reserved files, in milliseconds.
If set to 0, the reserved files deletion is disabled. (Default: 0)
Expand Down
1 change: 1 addition & 0 deletions src/admin/dump_config_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ const static std::unordered_map<std::string, std::string> defaultOptionsMaster =
{"SNAPSHOT_INITIAL_BATCH_SIZE_LIMIT", "10000"},
{"FILE_TEST_LOOP_MIN_TIME", "3600"},
{"PRIORITIZE_DATA_PARTS", "1"},
{"USE_CHUNKSERVER_SIDE_CHUNK_LOCK", "0"},
{"CREATE_EMPTY_FOLDERS_WHEN_SPACE_DEPLETED", "1"},
};

Expand Down
172 changes: 160 additions & 12 deletions src/chunkserver/bgjobs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "common/chunk_part_type.h"
#include "common/chunk_type_with_address.h"
#include "common/massert.h"
#include "common/output_packet.h"
#include "common/pcqueue.h"
#include "devtools/TracePrinter.h"
#include "devtools/request_log.h"
Expand Down Expand Up @@ -100,12 +101,37 @@ JobPool::~JobPool() {
for (auto &listenerInfo : listenerInfos_) { close(listenerInfo.notifierFD); }
}

uint32_t JobPool::addJobIfNotLocked(ChunkWithType chunkWithType, ChunkOperation operation,
JobCallback callback, void *extra,
ProcessJobCallback processJob, uint32_t listenerId) {
std::unique_lock lockedChunkLock(chunkToJobReplyMapMutex_);
auto it = chunkToJobReplyMap_.find(chunkWithType);
if (it != chunkToJobReplyMap_.end() && it->second.writeInitReceived) {
// Chunk is locked, store the job to be added later
auto &lockedChunkData = it->second;

lockedChunkData.pendingAddJobs.emplace_back(
[this, operation, extra, listenerId, callback = std::move(callback),
processJob = std::move(processJob)]() mutable -> uint32_t {
return addJob(operation, std::move(callback), extra, std::move(processJob),
listenerId);
});

return lockedChunkData.lockJobId; // Lock guard is released here
}
lockedChunkLock.unlock(); // Release the lock before adding the job to avoid deadlocks

// Chunk is not locked, add the job immediately
return JobPool::addJob(operation, std::move(callback), extra, std::move(processJob),
listenerId);
}

uint32_t JobPool::addJob(ChunkOperation operation, JobCallback callback, void *extra,
ProcessJobCallback processJob, uint32_t listenerId) {
// Check if the listenerId is valid
if (listenerId >= listenerInfos_.size()) {
safs::log_warn("JobPool: addJob: Invalid listenerId {} for operation {}, resetting to 0",
listenerId, static_cast<int>(operation));
safs::log_warn("JobPool: {}: Invalid listenerId {} for operation {}, resetting to 0",
__func__, listenerId, static_cast<int>(operation));
listenerId = 0; // Reset to the first listener
}

Expand All @@ -127,6 +153,29 @@ uint32_t JobPool::addJob(ChunkOperation operation, JobCallback callback, void *e
return jobId;
}

uint32_t JobPool::addLockJob(JobCallback callback, void *extra, uint32_t listenerId) {
// Check if the listenerId is valid
if (listenerId >= listenerInfos_.size()) {
safs::log_warn("JobPool: {}: Invalid listenerId {} for operation LOCK, resetting to 0",
__func__, listenerId);
listenerId = 0; // Reset to the first listener
}

auto &listenerInfo = listenerInfos_[listenerId];
std::unique_lock lock(listenerInfo.jobsMutex);
uint32_t jobId = listenerInfo.nextJobId++;
auto job = std::make_unique<Job>();
job->jobId = jobId;
job->callback = std::move(callback);
// No processJob for lock jobs, as they are just markers for locked chunks
job->extra = extra;
job->state = JobPool::State::Enabled;
job->listenerId = listenerId;
listenerInfo.jobHash[jobId] = std::move(job);
// Not an actual job, but a marker for a locked chunk, so never inserted into the job queue.
return jobId;
}

uint32_t JobPool::getJobCount() const {
TRACETHIS();
return jobsQueue->elements();
Expand Down Expand Up @@ -531,8 +580,9 @@ uint32_t job_delete(JobPool &jobPool, JobPool::JobCallback callback, void *extra
JobPool::ProcessJobCallback processJob = [=]() -> uint8_t {
return hddInternalDelete(chunkId, chunkVersion, chunkType);
};
return jobPool.addJob(JobPool::ChunkOperation::Delete, std::move(callback), extra, processJob,
listenerId);
return jobPool.addJobIfNotLocked(ChunkWithType{chunkId, chunkType},
JobPool::ChunkOperation::Delete, std::move(callback), extra,
processJob, listenerId);
}

uint32_t job_create(JobPool &jobPool, JobPool::JobCallback callback, void *extra, uint64_t chunkId,
Expand All @@ -551,8 +601,9 @@ uint32_t job_version(JobPool &jobPool, const JobPool::JobCallback &callback, voi
JobPool::ProcessJobCallback processJob = [=]() -> uint8_t {
return hddInternalUpdateVersion(chunkId, chunkVersion, newChunkVersion, chunkType);
};
return jobPool.addJob(JobPool::ChunkOperation::ChangeVersion, callback, extra, processJob,
listenerId);
return jobPool.addJobIfNotLocked(ChunkWithType{chunkId, chunkType},
JobPool::ChunkOperation::ChangeVersion, callback, extra,
processJob, listenerId);
}
return job_invalid(jobPool, callback, extra, listenerId);
}
Expand All @@ -564,8 +615,9 @@ uint32_t job_truncate(JobPool &jobPool, const JobPool::JobCallback &callback, vo
JobPool::ProcessJobCallback processJob = [=]() -> uint8_t {
return hddTruncate(chunkId, chunkVersion, chunkType, newChunkVersion, length);
};
return jobPool.addJob(JobPool::ChunkOperation::Truncate, callback, extra, processJob,
listenerId);
return jobPool.addJobIfNotLocked(ChunkWithType{chunkId, chunkType},
JobPool::ChunkOperation::Truncate, callback, extra,
processJob, listenerId);
}
return job_invalid(jobPool, callback, extra, listenerId);
}
Expand All @@ -579,8 +631,9 @@ uint32_t job_duplicate(JobPool &jobPool, const JobPool::JobCallback &callback, v
return hddDuplicate(chunkId, chunkVersion, newChunkVersion, chunkType, chunkIdCopy,
chunkVersionCopy);
};
return jobPool.addJob(JobPool::ChunkOperation::Duplicate, callback, extra, processJob,
listenerId);
return jobPool.addJobIfNotLocked(ChunkWithType{chunkId, chunkType},
JobPool::ChunkOperation::Duplicate, callback, extra,
processJob, listenerId);
}
return job_invalid(jobPool, callback, extra, listenerId);
}
Expand All @@ -594,8 +647,103 @@ uint32_t job_duptrunc(JobPool &jobPool, const JobPool::JobCallback &callback, vo
return hddDuplicateTruncate(chunkId, chunkVersion, newChunkVersion, chunkType,
chunkIdCopy, chunkVersionCopy, length);
};
return jobPool.addJob(JobPool::ChunkOperation::DuplicateTruncate, callback, extra,
processJob, listenerId);
return jobPool.addJobIfNotLocked(ChunkWithType{chunkId, chunkType},
JobPool::ChunkOperation::DuplicateTruncate, callback,
extra, processJob, listenerId);
}
return job_invalid(jobPool, callback, extra, listenerId);
}

bool JobPool::startChunkLock(const JobPool::JobCallback &callback, void *packet, uint64_t chunkId,
ChunkPartType chunkType, uint32_t listenerId) {
std::unique_lock lock(chunkToJobReplyMapMutex_);
if (chunkToJobReplyMap_.contains(ChunkWithType{chunkId, chunkType})) {
lock.unlock(); // Release the lock before logging to avoid extra contention

safs::log_warn(
"{}: Chunk lock job already exists for chunkId {:016X}, chunkType {}. Treating request "
"as retransmission; not adding a new lock job.",
__func__, chunkId, chunkType.toString());
return false;
}

chunkToJobReplyMap_[ChunkWithType{chunkId, chunkType}] =
LockedChunkData(addLockJob(callback, packet, listenerId), listenerId);
return true;
}

bool JobPool::enforceChunkLock(uint64_t chunkId, ChunkPartType chunkType) {
std::unique_lock lock(chunkToJobReplyMapMutex_);
auto entry = chunkToJobReplyMap_.find(ChunkWithType{chunkId, chunkType});
if (entry == chunkToJobReplyMap_.end()) {
lock.unlock(); // Release the lock before logging to avoid extra contention

// Master was not waiting for this lock, just log and return
safs::log_trace("{}: No chunk lock job found for chunkId {:016X}, chunkType {}. Ignoring.",
__func__, chunkId, chunkType.toString());
return false;
}

entry->second.writeInitReceived = true;
return true;
}

bool JobPool::releaseChunkLockEntry(uint64_t chunkId, ChunkPartType chunkType,
const char *callerName, uint32_t &lockJobId,
uint32_t &listenerId, std::vector<AddJobFunc> &pendingAddJobs) {
std::unique_lock lock(chunkToJobReplyMapMutex_);
auto entry = chunkToJobReplyMap_.find(ChunkWithType{chunkId, chunkType});
if (entry == chunkToJobReplyMap_.end()) {
lock.unlock(); // Release the lock before logging to avoid extra contention

// Master was not waiting for this lock, just log and return
safs::log_warn("{}: No chunk lock job found for chunkId {:016X}, chunkType {}. Ignoring.",
callerName, chunkId, chunkType.toString());
return false;
}

lockJobId = entry->second.lockJobId;
listenerId = entry->second.listenerId;
pendingAddJobs = std::move(entry->second.pendingAddJobs);
chunkToJobReplyMap_.erase(entry);
return true;
}

void JobPool::endChunkLock(uint64_t chunkId, ChunkPartType chunkType, uint8_t status) {
uint32_t lockJobId;
uint32_t listenerId;
std::vector<AddJobFunc> pendingAddJobs;

if (!releaseChunkLockEntry(chunkId, chunkType, __func__, lockJobId, listenerId,
pendingAddJobs)) {
return; // No lock job found, just return
}

for (const auto &addJobFunc : pendingAddJobs) { addJobFunc(); }

sendStatus(lockJobId, status, listenerId);
}

void JobPool::eraseChunkLock(uint64_t chunkId, ChunkPartType chunkType) {
uint32_t lockJobId;
uint32_t listenerId;
std::vector<AddJobFunc> pendingAddJobs;

if (!releaseChunkLockEntry(chunkId, chunkType, __func__, lockJobId, listenerId,
pendingAddJobs)) {
return; // No lock job found, just return
}

for (const auto &addJobFunc : pendingAddJobs) { addJobFunc(); }

// Remove the lock job and the related packet
auto &listenerInfo = listenerInfos_[listenerId];
std::unique_lock jobsUniqueLock(listenerInfo.jobsMutex);
auto lockJobIterator = listenerInfo.jobHash.find(lockJobId);

if (lockJobIterator != listenerInfo.jobHash.end()) {
auto *outputPacket = reinterpret_cast<OutputPacket *>(lockJobIterator->second->extra);
delete outputPacket;
listenerInfo.jobHash.erase(lockJobIterator);
}
}
102 changes: 102 additions & 0 deletions src/chunkserver/bgjobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "common/platform.h"

#include "chunkserver-common/chunk_map.h"
#include "chunkserver/io_buffers.h"
#include "common/pcqueue.h"

Expand Down Expand Up @@ -87,6 +88,11 @@ class JobPool {
/// @return The status of the job processing.
using ProcessJobCallback = std::function<uint8_t()>;

/// @brief Add job functions type. It is a lambda that contains the expected parameters for add
/// job call.
/// @return The ID of the added job.
using AddJobFunc = std::function<uint32_t()>;

/// @brief Constructor for JobPool.
///
/// @param name Human readable name for this pool, useful for debugging.
Expand All @@ -112,6 +118,27 @@ class JobPool {
uint32_t addJob(ChunkOperation operation, JobCallback callback, void *extra,
ProcessJobCallback processJob, uint32_t listenerId = 0);

/// @brief Adds a job to the JobPool if the chunk is not locked.
/// If the chunk is locked, the job will be stored and added once the lock is released.
/// @param chunkWithType The chunk and its type associated with the job.
/// @param operation The type of operation to be performed on the chunk.
/// @param callback The callback function to be called upon job completion.
/// @param extra Additional data to be passed to the callback.
/// @param processJob The callback function to process the job.
/// @param listenerId The ID of the listener associated with the job.
/// @return The ID of the added job, or the lock job ID if the chunk is locked.
uint32_t addJobIfNotLocked(ChunkWithType chunkWithType, ChunkOperation operation,
JobCallback callback, void *extra, ProcessJobCallback processJob,
uint32_t listenerId = 0);

/// @brief Adds a lock job to the JobPool for a specific chunk and type.
/// This function is used to create a lock job that will be associated with a locked chunk.
/// @param callback The callback function to be called when the lock is released.
/// @param extra Additional data to be passed to the callback.
/// @param listenerId The ID of the listener associated with the job.
/// @return The ID of the added lock job.
uint32_t addLockJob(JobCallback callback, void *extra, uint32_t listenerId = 0);

/// @brief Gets the number of jobs in the JobPool.
uint32_t getJobCount() const;

Expand Down Expand Up @@ -155,6 +182,61 @@ class JobPool {
void changeCallback(std::list<uint32_t> &jobIds, const JobCallback &callback,
uint32_t listenerId = 0);

/// @brief Starts a chunk lock job for a specific chunk and type.
/// This function is triggered when the master server sends a chunk lock request for a chunk
/// that is not currently locked. It adds a lock job to the JobPool and associates it with the
/// locked chunk. If the chunk is already locked, it returns false and does not add a new job,
/// as the existing lock job will be responsible for handling the lock.
/// @param callback The callback function to be called when the lock is released.
/// @param packet The packet to be sent to the master server with the write end status.
/// @param chunkId The ID of the chunk to be locked.
/// @param chunkType The type of the chunk to be locked.
/// @param listenerId The ID of the listener associated with the job.
/// @return true if the lock job was successfully added, false if the chunk is already locked.
bool startChunkLock(const JobPool::JobCallback &callback, void *packet, uint64_t chunkId,
ChunkPartType chunkType, uint32_t listenerId = 0);

/// @brief Enforces a chunk lock for a specific chunk and type.
/// This function is called when the client sends a write initialization for a chunk that is
/// already locked, to ensure that the lock is properly enforced. From now on, the master
/// requests on the locked chunk will wait for the lock to be released before being processed.
/// @param chunkId The ID of the chunk to enforce the lock on.
/// @param chunkType The type of the chunk to enforce the lock on.
/// @return true if the lock was successfully enforced, false if no lock job was found for the
/// chunk.
bool enforceChunkLock(uint64_t chunkId, ChunkPartType chunkType);

/// @brief Releases a chunk lock entry for a specific chunk and type.
/// This is a helper function that returns the lock job ID, listener ID, and pending jobs
/// associated with a locked chunk if found, and removes the lock entry from the internal map.
/// @param chunkId The ID of the chunk to release the lock on.
/// @param chunkType The type of the chunk to release the lock on.
/// @param callerName The name of the function calling this helper, used for logging.
/// @param lockJobId The ID of the lock job associated with the chunk.
/// @param listenerId The ID of the listener associated with the lock job.
/// @param pendingAddJobs The list of pending jobs associated with the locked chunk.
/// @return true if the lock entry was found and released, false otherwise.
bool releaseChunkLockEntry(uint64_t chunkId, ChunkPartType chunkType, const char *callerName,
uint32_t &lockJobId, uint32_t &listenerId,
std::vector<AddJobFunc> &pendingAddJobs);

/// @brief Ends a chunk lock for a specific chunk and type.
/// This function is called when cleaning up the write operation on the locked chunk, to end the
/// lock and allow any pending jobs to be added and processed. It sends the write end status to
/// the master server and removes the lock job from the JobPool.
/// @param chunkId The ID of the chunk to end the lock on.
/// @param chunkType The type of the chunk to end the lock on.
/// @param status The status to be sent to the master server for the write end.
void endChunkLock(uint64_t chunkId, ChunkPartType chunkType, uint8_t status);

/// @brief Erases a chunk lock for a specific chunk and type.
/// This function is called when the master server sends a chunk unlock request for a chunk that
/// is currently locked. It removes the lock job from the JobPool and allows any pending jobs
/// that were waiting for the lock to be released to be added and processed.
/// @param chunkId The ID of the chunk to erase the lock on.
/// @param chunkType The type of the chunk to erase the lock on.
void eraseChunkLock(uint64_t chunkId, ChunkPartType chunkType);

private:
/// @brief Represents a job in the JobPool.
struct Job {
Expand Down Expand Up @@ -196,6 +278,26 @@ class JobPool {
uint32_t nextJobId; /// Next job ID to be assigned.
};

/// @brief Structure to hold information about a locked chunk.
struct LockedChunkData {
uint32_t lockJobId; /// The ID of the lock job associated with the locked chunk.
uint32_t listenerId; /// The ID of the listener associated with the locked chunk.
/// A vector of functions to add pending jobs to be executed once the lock is released.
std::vector<AddJobFunc> pendingAddJobs;
/// Flag to indicate if the write initialization has been received for the locked chunk.
bool writeInitReceived = false;

LockedChunkData(uint32_t lockJobId, uint32_t listenerId)
: lockJobId(lockJobId), listenerId(listenerId) {}

LockedChunkData() = default;
};

/// Mutex to protect access to the chunkToJobReplyMap_.
std::mutex chunkToJobReplyMapMutex_;
/// Map to associate locked chunks with their corresponding lock job and pending jobs.
std::unordered_map<ChunkWithType, LockedChunkData, KeyOperations, KeyOperations>
chunkToJobReplyMap_;
std::vector<ListenerInfo> listenerInfos_; /// Vector of listener information.
std::string name_; /// Human readable id of the JobPool.
uint8_t workers; /// Number of worker threads in the pool.
Expand Down
Loading