Conversation
Summary of ChangesHello @dmga44, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances write performance and consistency in the system by introducing write buffering and chunkserver-side chunk locking. These changes allow chunkservers to process write operations more efficiently, provide faster acknowledgments to clients, and enable the master to maintain a more accurate state of chunk parts being written. The refactoring of write handling logic and updates to communication protocols lay the groundwork for more robust and performant distributed storage operations. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a significant performance enhancement through chunkserver-side write buffering and instant replies, underpinned by a new chunk locking mechanism. The changes are extensive, touching the master, chunkserver, and protocol layers. The core idea is to allow chunkservers to acknowledge writes to the client before the data is persisted to disk, which is made safe by locking the chunk on the master to prevent conflicting operations. This required substantial modifications to WriteHighLevelOp, ChunkserverEntry, and JobPool. The read path has also been updated to correctly serve data from these in-memory write buffers. A major and very welcome part of this PR is the significant refactoring of chunk_multi_modify in chunks.cc into smaller, more understandable functions, which greatly improves code clarity and maintainability. The changes appear well-thought-out, especially regarding consistency and error handling in this new asynchronous model. I have one suggestion for a minor improvement regarding type safety in job functions.
src/chunkserver/bgjobs.cc
Outdated
| auto lockJobIterator = listenerInfo.jobHash.find(lockJobId); | ||
|
|
||
| if (lockJobIterator != listenerInfo.jobHash.end()) { | ||
| auto *outputPacket = reinterpret_cast<OutputPacket *>(lockJobIterator->second->extra); |
There was a problem hiding this comment.
The use of reinterpret_cast here is potentially unsafe. It relies on the caller of addLockJob always providing a void* that is a valid OutputPacket*. While this holds true for the current call sites in master_connection.cc, it creates a contract that is not enforced by the type system. Future changes could inadvertently violate this assumption, leading to undefined behavior. Consider using a safer type-erasure mechanism like std::any or a std::function to wrap the packet deletion, which would make the ownership and type contract explicit and safer.
References
- Prefer passing individual parameters to low-level job functions (e.g.,
job_*) instead of passing a pointer to a high-level operation object. This helps maintain type safety and explicit contracts, reducing the risk of undefined behavior.
| // Chunk operations | ||
|
|
||
| /// @brief Performs the chunk creation operation, which consists of creating a new chunk with | ||
| /// version 1, associating it with the given goal and sending create chunk messages to the provided | ||
| /// chunkservers. The parts in the chunk are marked as being written (it is expecteted that client | ||
| /// starts writing) if the corresponding chunkserver supports locking and the create chunk message | ||
| /// was sent with locking. | ||
| /// @param createdChunk A reference to a pointer where the created chunk will be stored. | ||
| /// @param goal The goal that will be associated with the created chunk. | ||
| /// @param serversWithChunkTypes The list of chunkservers to create the chunk on. | ||
| void chunk_create_operation( | ||
| Chunk *&createdChunk, uint8_t goal, | ||
| std::vector<std::pair<matocsserventry *, ChunkPartType>> &serversWithChunkTypes) { | ||
| createdChunk = chunk_new(ChunksMetadata::getAndIncrementNextChunkId(), 1); | ||
| createdChunk->interrupted = 0; | ||
| createdChunk->operation = Chunk::CREATE; | ||
| chunk_add_file_int(createdChunk, goal); | ||
|
|
||
| for (const auto &server_with_type : serversWithChunkTypes) { | ||
| createdChunk->parts.push_back(ChunkPart(matocsserv_get_csdb(server_with_type.first)->csid, | ||
| ChunkPart::BUSY, createdChunk->version, | ||
| server_with_type.second)); | ||
| bool sentChunkLock = false; | ||
| matocsserv_send_createchunk(server_with_type.first, createdChunk->chunkid, | ||
| server_with_type.second, createdChunk->version, | ||
| gUseChunkserverSideChunkLock, sentChunkLock); | ||
|
|
||
| if (sentChunkLock) { createdChunk->parts.back().mark_being_written(); } | ||
| // If the chunk lock was not sent, it means that the chunkserver does not support locking, | ||
| // so the part is not marked as being written. | ||
| } | ||
|
|
||
| createdChunk->updateStats(); | ||
| } | ||
|
|
||
| /// @brief Performs the chunk version increase operation, which consists of increasing the chunk | ||
| /// version and sending setchunkversion messages to all valid parts in the chunk. | ||
| /// @param chunk A pointer to the chunk whose version will be increased. | ||
| /// @param needsLocking A boolean indicating whether locking is needed, i.e it is expected that | ||
| /// client will start writing right after the version increase. | ||
| void chunk_increase_version_operation(Chunk *chunk, bool needsLocking) { | ||
| assert(chunk->isWritable()); | ||
| for (auto &part : chunk->parts) { | ||
| if (part.is_valid()) { | ||
| if (!part.is_busy()) { part.mark_busy(); } | ||
|
|
||
| part.version = chunk->version + 1; | ||
| // If part is already being written then we don't need to ask the chunkserver to lock | ||
| // it again, and we can just increase the version. | ||
| bool partNeedsLocking = | ||
| !part.is_being_written() && needsLocking && gUseChunkserverSideChunkLock; | ||
| bool sentChunkLock = false; | ||
| matocsserv_send_setchunkversion(part.server(), chunk->chunkid, chunk->version + 1, | ||
| chunk->version, part.type, partNeedsLocking, | ||
| sentChunkLock); | ||
|
|
||
| if (partNeedsLocking && sentChunkLock) { part.mark_being_written(); } | ||
| } | ||
| } | ||
|
|
||
| if (oc->fileCount() == 1) { // refcount==1 | ||
| *nchunkid = ochunkid; | ||
| c = oc; | ||
| if (c->operation != Chunk::NONE) { | ||
| return SAUNAFS_ERROR_CHUNKBUSY; | ||
| } | ||
| if (c->needverincrease) { | ||
| assert(c->isWritable()); | ||
| for (auto &part : c->parts) { | ||
| if (part.is_valid()) { | ||
| if (!part.is_busy()) { | ||
| part.mark_busy(); | ||
| } | ||
| part.version = c->version+1; | ||
| matocsserv_send_setchunkversion(part.server(), ochunkid, c->version+1, c->version, | ||
| part.type); | ||
| } | ||
| } | ||
| c->interrupted = 0; | ||
| c->operation = Chunk::SET_VERSION; | ||
| c->version++; | ||
| *opflag=1; | ||
| } else { | ||
| *opflag=0; | ||
| } | ||
| } else { | ||
| if (oc->fileCount() == 0) { // it's serious structure error | ||
| safs_pretty_syslog(LOG_WARNING,"serious structure inconsistency: (chunkid:%016" PRIX64 ")",ochunkid); | ||
| return SAUNAFS_ERROR_CHUNKLOST; // ERROR_STRUCTURE | ||
| } | ||
| if (quota_exceeded) { | ||
| return SAUNAFS_ERROR_QUOTA; | ||
| } | ||
| assert(oc->isWritable()); | ||
| c = chunk_new(ChunksMetadata::getAndIncrementNextChunkId(), 1); | ||
| c->interrupted = 0; | ||
| c->operation = Chunk::DUPLICATE; | ||
| chunk_delete_file_int(oc,goal); | ||
| chunk_add_file_int(c,goal); | ||
| for (const auto &old_part : oc->parts) { | ||
| if (old_part.is_valid()) { | ||
| c->parts.push_back(ChunkPart(old_part.csid, ChunkPart::BUSY, c->version, old_part.type)); | ||
| matocsserv_send_duplicatechunk(old_part.server(), c->chunkid, c->version, old_part.type, | ||
| oc->chunkid, oc->version); | ||
| chunk->interrupted = 0; | ||
| chunk->operation = Chunk::SET_VERSION; | ||
| chunk->version++; | ||
| } | ||
|
|
||
| /// @brief Performs the chunk lock operation, which consists of sending chunk lock messages to all | ||
| /// valid parts in the chunk and marking the parts as being written if the chunk lock message was | ||
| /// sent with locking. | ||
| /// @param chunk A pointer to the chunk to lock. | ||
| void chunk_lock_operation(Chunk *chunk) { | ||
| bool mustWaitForReply = false; | ||
| assert(chunk->isWritable()); | ||
| if (gUseChunkserverSideChunkLock) { | ||
| for (auto &part : chunk->parts) { | ||
| if (part.is_valid()) { | ||
| if (part.is_busy()) { continue; } | ||
| // No busy parts from now on | ||
|
|
||
| bool sentChunkLock = false; | ||
| matocsserv_send_chunklock(part.server(), chunk->chunkid, part.type, | ||
| !part.is_being_written(), sentChunkLock); | ||
| if (sentChunkLock) { | ||
| part.mark_being_written(); | ||
| mustWaitForReply = true; | ||
| part.mark_busy(); | ||
| } | ||
| } | ||
| c->updateStats(); | ||
| *nchunkid = c->chunkid; | ||
| *opflag=1; | ||
| } | ||
| } | ||
|
|
||
| c->lockedto = eventloop_time() + LOCKTIMEOUT; | ||
| if (*lockid == 0) { | ||
| if (usedummylockid) { | ||
| *lockid = 1; | ||
| chunk->interrupted = 0; | ||
| if (mustWaitForReply) { | ||
| // We'll need to wait for some replies | ||
| chunk->operation = Chunk::LOCK; | ||
| } else { | ||
| // No need to wait, parts have been set to be written | ||
| chunk->operation = Chunk::NONE; | ||
| } | ||
| } | ||
|
|
||
| /// @brief Performs the chunk duplication operation, which consists of creating a new chunk with | ||
| /// version 1, associating it with the given goal, sending duplicate chunk messages to the | ||
| /// corresponding chunkservers and marking the parts in the new chunk as being written if the | ||
| /// corresponding duplicate chunk message was sent with locking. It is expected that client will | ||
| /// start writing to the new chunk right after the duplication. | ||
| /// @param originalChunk A pointer to the original chunk to duplicate. | ||
| /// @param goal The goal associated with the new chunk. | ||
| /// @param newChunk A reference to a pointer where the new chunk will be stored. | ||
| void chunk_duplicate_operation(Chunk *originalChunk, uint8_t goal, Chunk *&newChunk) { | ||
| assert(originalChunk->isWritable()); | ||
| newChunk = chunk_new(ChunksMetadata::getAndIncrementNextChunkId(), 1); | ||
| newChunk->interrupted = 0; | ||
| newChunk->operation = Chunk::DUPLICATE; | ||
| chunk_delete_file_int(originalChunk, goal); | ||
| chunk_add_file_int(newChunk, goal); | ||
|
|
||
| for (const auto &oldPart : originalChunk->parts) { | ||
| if (oldPart.is_valid()) { | ||
| newChunk->parts.push_back( | ||
| ChunkPart(oldPart.csid, ChunkPart::BUSY, newChunk->version, oldPart.type)); | ||
|
|
||
| bool sentChunkLock = false; | ||
| matocsserv_send_duplicatechunk(oldPart.server(), newChunk->chunkid, newChunk->version, | ||
| oldPart.type, originalChunk->chunkid, | ||
| originalChunk->version, gUseChunkserverSideChunkLock, | ||
| sentChunkLock); | ||
|
|
||
| if (sentChunkLock) { newChunk->parts.back().mark_being_written(); } | ||
| } | ||
| } | ||
|
|
||
| newChunk->updateStats(); | ||
| } | ||
|
|
||
| /// @brief Performs the chunk truncate operation, which consists of increasing the chunk version and | ||
| /// sending truncate chunk messages to all valid parts in the chunk. It is not expected that client | ||
| /// will start writing right after the truncation. | ||
| /// @param chunk A pointer to the chunk to truncate. | ||
| /// @param length The new length of the chunk. | ||
| void chunk_truncate_operation(Chunk *chunk, uint32_t length) { | ||
| assert(chunk->isWritable()); | ||
| for (auto &part : chunk->parts) { | ||
| if (part.is_valid()) { | ||
| if (!part.is_busy()) { part.mark_busy(); } | ||
| part.version = chunk->version + 1; | ||
| uint32_t chunkTypeLength = | ||
| slice_traits::chunkLengthToChunkPartLength(part.type, length); | ||
| matocsserv_send_truncatechunk(part.server(), chunk->chunkid, part.type, chunkTypeLength, | ||
| chunk->version + 1, chunk->version); | ||
| } | ||
| } | ||
|
|
||
| chunk->interrupted = 0; | ||
| chunk->operation = Chunk::TRUNCATE; | ||
| chunk->version++; | ||
| } | ||
|
|
||
| /// @brief Performs the chunk duplicate and truncate operation, which consists of creating a new | ||
| /// chunk with version 1, associating it with the given goal, sending duplicate and truncate chunk | ||
| /// messages to the corresponding chunkservers. It is not expected that client will start writing to | ||
| /// the new chunk right after the duplication and truncation. | ||
| /// @param originalChunk A pointer to the original chunk to duplicate and truncate. | ||
| void chunk_duplicate_and_truncate_operation(Chunk *originalChunk, uint8_t goal, Chunk *&newChunk, | ||
| uint32_t length) { | ||
| assert(originalChunk->isWritable()); | ||
| newChunk = chunk_new(ChunksMetadata::getAndIncrementNextChunkId(), 1); | ||
| newChunk->interrupted = 0; | ||
| newChunk->operation = Chunk::DUPTRUNC; | ||
| chunk_delete_file_int(originalChunk, goal); | ||
| chunk_add_file_int(newChunk, goal); | ||
|
|
||
| for (const auto &oldPart : originalChunk->parts) { | ||
| if (oldPart.is_valid()) { | ||
| newChunk->parts.push_back( | ||
| ChunkPart(oldPart.csid, ChunkPart::BUSY, newChunk->version, oldPart.type)); | ||
| uint32_t chunkTypeLength = | ||
| slice_traits::chunkLengthToChunkPartLength(oldPart.type, length); | ||
| matocsserv_send_duptruncchunk(oldPart.server(), newChunk->chunkid, newChunk->version, | ||
| oldPart.type, originalChunk->chunkid, | ||
| originalChunk->version, chunkTypeLength); | ||
| } | ||
| } | ||
|
|
||
| newChunk->updateStats(); | ||
| } | ||
|
|
||
| /// @brief Handles the chunk creation case of the chunk_multi_modify operation, which consists of | ||
| /// checking if the chunk can be created with the given and proceed if so. | ||
| /// @param quotaExceeded Whether the quota has been exceeded. | ||
| /// @param goal The goal for the chunk creation. | ||
| /// @param operation Pointer to the operation code. | ||
| /// @param newChunkId Pointer to the new chunk ID. | ||
| /// @param minServerVersion The minimum server version required. | ||
| /// @param createdChunk Pointer to the created chunk. | ||
| /// @return The status code of the operation. | ||
| uint8_t chunk_create(bool quotaExceeded, uint8_t goal, uint8_t *operation, uint64_t *newChunkId, | ||
| uint32_t minServerVersion, Chunk *&createdChunk) { | ||
| // First check if quota is exceeded | ||
| if (quotaExceeded) { return SAUNAFS_ERROR_QUOTA; } | ||
|
|
||
| // Next check availability of chunkservers for the given goal | ||
| uint16_t minServerCount = 0; | ||
| auto serversWithChunkTypes = | ||
| matocsserv_getservers_for_new_chunk(goal, minServerCount, minServerVersion); | ||
| if (serversWithChunkTypes.empty()) { | ||
| uint16_t usableChunkservers, totalChunkservers; | ||
| double minUsage, maxUsage; | ||
| matocsserv_usagedifference(&minUsage, &maxUsage, &usableChunkservers, &totalChunkservers); | ||
|
|
||
| if (usableChunkservers >= minServerCount && | ||
| eventloop_time() > starttime + kStartupGracePeriodSeconds) { | ||
| // if there are enough chunkservers and it's at least one minute after start then it | ||
| // means that there is no space left | ||
| return SAUNAFS_ERROR_NOSPACE; | ||
| } | ||
|
|
||
| return SAUNAFS_ERROR_NOCHUNKSERVERS; | ||
| } | ||
|
|
||
| // Check if the chunk would be safe to write with the current redundancy level | ||
| ChunkCopiesCalculator calculator(gFSOperations->getGoalDefinition(goal)); | ||
| for (const auto &serverWithType : serversWithChunkTypes) { | ||
| calculator.addPart(serverWithType.second, MediaLabel::kWildcard); | ||
| } | ||
| calculator.evalRedundancyLevel(); | ||
| if (!calculator.isSafeEnoughToWrite(gRedundancyLevel)) { return SAUNAFS_ERROR_NOCHUNKSERVERS; } | ||
|
|
||
| // All checks passed, we can create the chunk | ||
| chunk_create_operation(createdChunk, goal, serversWithChunkTypes); | ||
| *operation = Chunk::CREATE; | ||
| *newChunkId = createdChunk->chunkid; | ||
| return SAUNAFS_STATUS_OK; | ||
| } | ||
|
|
||
| /// @brief Handles the chunk modification case of the chunk_multi_modify operation, which consists | ||
| /// of checking if the chunk can be modified with the given parameters and proceed if so. | ||
| /// @param currentChunkId The ID of the chunk to modify. | ||
| /// @param lockId Pointer to the lock ID for the chunk modification. | ||
| /// @param goal The goal for the chunk modification. | ||
| /// @param quotaExceeded Whether the quota has been exceeded. | ||
| /// @param operation Pointer to the operation code. | ||
| /// @param targetChunkId Pointer to the target chunk ID after modification. | ||
| /// @param targetChunk Reference to a pointer where the target chunk after modification will be | ||
| /// stored. | ||
| /// @return The status code of the operation. | ||
| uint8_t chunk_modify(uint64_t currentChunkId, uint32_t *lockId, uint8_t goal, bool quotaExceeded, | ||
| uint8_t *operation, uint64_t *targetChunkId, Chunk *&targetChunk) { | ||
| // First find the chunk | ||
| Chunk *currentChunk = chunk_find(currentChunkId); | ||
| if (currentChunk == NULL) { return SAUNAFS_ERROR_NOCHUNK; } | ||
|
|
||
| // Next check if the chunk is locked and if the lockid matches | ||
| if (*lockId != 0 && *lockId != currentChunk->lockid) { | ||
| if (currentChunk->lockid == 0 || currentChunk->lockedto == 0) { | ||
| // Lock was removed by some chunk operation or by a different client | ||
| return SAUNAFS_ERROR_NOTLOCKED; | ||
| } | ||
|
|
||
| // Case *lockid != currentChunk->lockid | ||
| return SAUNAFS_ERROR_WRONGLOCKID; | ||
| } | ||
| if (*lockId == 0 && currentChunk->isLocked()) { | ||
| *targetChunkId = currentChunkId; | ||
| return SAUNAFS_ERROR_LOCKED; | ||
| } | ||
|
|
||
| // Check if the chunk is writable | ||
| if (!currentChunk->isWritable()) { return SAUNAFS_ERROR_CHUNKLOST; } | ||
|
|
||
| // Check if the chunk would be safe to write with the desired redundancy level | ||
| ChunkCopiesCalculator calculator(currentChunk->getGoal()); | ||
| for (auto &part : currentChunk->parts) { calculator.addPart(part.type, MediaLabel::kWildcard); } | ||
| calculator.evalRedundancyLevel(); | ||
| if (!calculator.isSafeEnoughToWrite(gRedundancyLevel)) { return SAUNAFS_ERROR_NOCHUNKSERVERS; } | ||
|
|
||
| if (currentChunk->fileCount() == 1) { | ||
| // Only one reference case | ||
| *targetChunkId = currentChunkId; | ||
| targetChunk = currentChunk; | ||
| if (targetChunk->operation != Chunk::NONE) { return SAUNAFS_ERROR_CHUNKBUSY; } | ||
|
|
||
| if (targetChunk->needVersionIncrease) { | ||
| // We are expected to start writing to the chunk, but it has lost some copies and we | ||
| // haven't increased its version yet, so we need to increase the version before allowing | ||
| // the write operation to proceed. | ||
| chunk_increase_version_operation(targetChunk, true); | ||
| } else { | ||
| *lockid = 2 + rnd_ranged<uint32_t>(0xFFFFFFF0); // some random number greater than 1 | ||
| chunk_lock_operation(targetChunk); | ||
| } | ||
| } else { | ||
| if (currentChunk->fileCount() == 0) { // it's serious structure error | ||
| safs::log_warn("serious structure inconsistency: (chunkid:{:016X})", currentChunkId); | ||
| return SAUNAFS_ERROR_CHUNKLOST; // ERROR_STRUCTURE | ||
| } | ||
| // More than one reference case | ||
| if (quotaExceeded) { return SAUNAFS_ERROR_QUOTA; } | ||
|
|
||
| chunk_duplicate_operation(currentChunk, goal, targetChunk); | ||
| *targetChunkId = targetChunk->chunkid; | ||
| } | ||
| c->lockid = *lockid; | ||
| chunk_update_checksum(c); | ||
| emit_chunk_changed(c); | ||
| *operation = targetChunk->operation; | ||
|
|
||
| return SAUNAFS_STATUS_OK; | ||
| } | ||
|
|
||
| uint8_t chunk_multi_truncate(uint64_t ochunkid, uint32_t lockid, uint32_t length, | ||
| uint8_t goal, bool denyTruncatingParityParts, bool quota_exceeded, uint64_t *nchunkid) { | ||
| Chunk *oc, *c; | ||
| /// @brief Handles the chunk_multi_modify operation, which consists of performing either chunk | ||
| /// creation or modification. Called when writing on the chunk is needed. | ||
| /// | ||
| /// Since the chunk is going to be modified, the chunk is locked and a lock ID is | ||
| /// assigned if the chunk is not already locked. After any of the operations, the chunk is expected | ||
| /// to be written to by the client, so if enabled, the chunkserver side locking is used to lock the | ||
| /// chunk, so the parts in the chunk are marked as being written and the corresponding chunk lock | ||
| /// messages are sent to the chunkservers. | ||
| /// | ||
| /// @param currentChunkId The current chunk ID in the file layout, 0 means no chunk in current | ||
| /// index. | ||
| /// @param lockid Pointer to the lock ID, used to transmit the assigned lock ID to the caller and to | ||
| /// check the lock ID in case of modification. The lock ID is assigned in case of creation or | ||
| /// modification if the chunk is not already locked. If the chunk is already locked, then the lock | ||
| /// ID is used to verify the lock ownership. | ||
| /// @param goal The goal for the chunk creation or modification. | ||
| /// @param quotaExceeded Whether the quota has been exceeded, used to check if the operation can be | ||
| /// performed. | ||
| /// @param operation Pointer to the operation code, used to transmit the performed operation code to | ||
| /// the caller. | ||
| /// @param targetChunkId Pointer to the target chunk ID after modification, used to transmit the | ||
| /// target chunk ID to the caller in case of modification. | ||
| /// @param minServerVersion The minimum server version required for the chunk creation, used to | ||
| /// check if the operation can be performed in case of creation. | ||
| /// @return The status of the operation. | ||
| uint8_t chunk_multi_modify(uint64_t currentChunkId, uint32_t *lockid, uint8_t goal, | ||
| bool quotaExceeded, uint8_t *operation, uint64_t *targetChunkId, | ||
| uint32_t minServerVersion = 0) { | ||
| Chunk *targetChunk = nullptr; | ||
| if (currentChunkId == 0) { | ||
| // New chunk case | ||
| auto status = chunk_create(quotaExceeded, goal, operation, targetChunkId, minServerVersion, | ||
| targetChunk); | ||
| if (status != SAUNAFS_STATUS_OK) { return status; } | ||
| } else { | ||
| // Existing chunk case | ||
| auto status = chunk_modify(currentChunkId, lockid, goal, quotaExceeded, operation, | ||
| targetChunkId, targetChunk); | ||
| if (status != SAUNAFS_STATUS_OK) { return status; } | ||
| } | ||
|
|
||
| c=NULL; | ||
| oc = chunk_find(ochunkid); | ||
| if (oc==NULL) { | ||
| safs::log_err("chunk_multi_truncate: could not find chunkid {}", ochunkid); | ||
| return SAUNAFS_ERROR_NOCHUNK; | ||
| // Set the lock if needed | ||
| targetChunk->lockedto = eventloop_time() + LOCKTIMEOUT; | ||
| if (*lockid == 0) { | ||
| *lockid = 1 + rnd_ranged<uint32_t>(0xFFFFFFF0); // some random number greater than 0 | ||
| } | ||
| if (!oc->isWritable()) { | ||
| return SAUNAFS_ERROR_CHUNKLOST; | ||
| targetChunk->lockid = *lockid; | ||
|
|
||
| chunk_update_checksum(targetChunk); | ||
| emit_chunk_changed(targetChunk); | ||
| return SAUNAFS_STATUS_OK; | ||
| } | ||
|
|
||
| /// @brief Handles the chunk_multi_truncate operation, which consists of performing either chunk | ||
| /// truncation or duplication and truncation. | ||
| /// | ||
| /// Since the chunk is going to be modified, the chunk is locked and a lock ID is | ||
| /// assigned if the chunk is not already locked. | ||
| /// | ||
| /// @param currentChunkId The current chunk ID in the file layout. Should be non-zero since | ||
| /// truncation of a non-existing chunk doesn't make sense. | ||
| /// @param lockid The lock ID, used to check if the chunk is locked and to assign a new lock ID if | ||
| /// needed. | ||
| /// @param length The length to truncate the chunk to. | ||
| /// @param goal The goal for the chunk truncation. | ||
| /// @param denyTruncatingParityParts Whether truncating parity parts is denied. | ||
| /// @param quotaExceeded Whether the quota has been exceeded, used to check if the operation can be | ||
| /// performed. | ||
| /// @param targetChunkId Pointer to the target chunk ID after truncation, used to transmit the | ||
| /// target chunk ID to the caller in case of truncation. | ||
| /// @return The status of the operation. | ||
| uint8_t chunk_multi_truncate(uint64_t currentChunkId, uint32_t lockid, uint32_t length, | ||
| uint8_t goal, bool denyTruncatingParityParts, bool quotaExceeded, | ||
| uint64_t *targetChunkId) { | ||
| Chunk *currentChunk, *targetChunk = NULL; | ||
|
|
||
| // First find the chunk | ||
| currentChunk = chunk_find(currentChunkId); | ||
| if (currentChunk == NULL) { | ||
| safs::log_err("chunk_multi_truncate: could not find chunkid {}", currentChunkId); | ||
| return SAUNAFS_ERROR_NOCHUNK; | ||
| } | ||
| if (oc->isLocked() && (lockid == 0 || lockid != oc->lockid)) { | ||
|
|
||
| // Chunk must be writable to be truncated | ||
| if (!currentChunk->isWritable()) { return SAUNAFS_ERROR_CHUNKLOST; } | ||
|
|
||
| // Check if the chunk is locked and if the lockid matches | ||
| if (currentChunk->isLocked() && (lockid == 0 || lockid != currentChunk->lockid)) { | ||
| return SAUNAFS_ERROR_LOCKED; | ||
| } | ||
|
|
||
| // Deny truncating parity parts if initiating a truncate operation while reducing the file size | ||
| if (denyTruncatingParityParts) { | ||
| for (const auto &part : oc->parts) { | ||
| if (slice_traits::isParityPart(part.type)) { | ||
| return SAUNAFS_ERROR_NOTPOSSIBLE; | ||
| } | ||
| for (const auto &part : currentChunk->parts) { | ||
| if (slice_traits::isParityPart(part.type)) { return SAUNAFS_ERROR_NOTPOSSIBLE; } | ||
| } | ||
| } | ||
| if (oc->fileCount() == 1) { // refcount==1 | ||
| *nchunkid = ochunkid; | ||
| c = oc; | ||
| if (c->operation != Chunk::NONE) { | ||
| return SAUNAFS_ERROR_CHUNKBUSY; | ||
| } | ||
| assert(c->isWritable()); | ||
| for (auto &part : c->parts) { | ||
| if (part.is_valid()) { | ||
| if (!part.is_busy()) { | ||
| part.mark_busy(); | ||
| } | ||
| part.version = c->version+1; | ||
| uint32_t chunkTypeLength = | ||
| slice_traits::chunkLengthToChunkPartLength(part.type, length); | ||
| matocsserv_send_truncatechunk(part.server(), ochunkid, part.type, chunkTypeLength, | ||
| c->version + 1, c->version); | ||
| } | ||
| } | ||
| c->interrupted = 0; | ||
| c->operation = Chunk::TRUNCATE; | ||
| c->version++; | ||
|
|
||
| if (currentChunk->fileCount() == 1) { | ||
| // Only one reference case - we can truncate the chunk without duplication | ||
| *targetChunkId = currentChunkId; | ||
| targetChunk = currentChunk; | ||
| if (targetChunk->operation != Chunk::NONE) { return SAUNAFS_ERROR_CHUNKBUSY; } | ||
|
|
||
| chunk_truncate_operation(targetChunk, length); | ||
| } else { | ||
| if (oc->fileCount() == 0) { // it's serious structure error | ||
| safs_pretty_syslog(LOG_WARNING,"serious structure inconsistency: (chunkid:%016" PRIX64 ")",ochunkid); | ||
| return SAUNAFS_ERROR_CHUNKLOST; // ERROR_STRUCTURE | ||
| } | ||
| if (quota_exceeded) { | ||
| return SAUNAFS_ERROR_QUOTA; | ||
| if (currentChunk->fileCount() == 0) { // it's serious structure error | ||
| safs_pretty_syslog(LOG_WARNING, | ||
| "serious structure inconsistency: (chunkid:%016" PRIX64 ")", | ||
| currentChunkId); | ||
| return SAUNAFS_ERROR_CHUNKLOST; // ERROR_STRUCTURE | ||
| } | ||
| // More than one reference case - need to duplicate and truncate | ||
| if (quotaExceeded) { return SAUNAFS_ERROR_QUOTA; } | ||
|
|
||
| assert(oc->isWritable()); | ||
| c = chunk_new(ChunksMetadata::getAndIncrementNextChunkId(), 1); | ||
| c->interrupted = 0; | ||
| c->operation = Chunk::DUPTRUNC; | ||
| chunk_delete_file_int(oc,goal); | ||
| chunk_add_file_int(c,goal); | ||
| for (const auto &old_part : oc->parts) { | ||
| if (old_part.is_valid()) { | ||
| c->parts.push_back(ChunkPart(old_part.csid, ChunkPart::BUSY, c->version, old_part.type)); | ||
| matocsserv_send_duptruncchunk(old_part.server(), c->chunkid, c->version, | ||
| old_part.type, oc->chunkid, oc->version, | ||
| slice_traits::chunkLengthToChunkPartLength(old_part.type, length)); | ||
| } | ||
| } | ||
| c->updateStats(); | ||
| *nchunkid = c->chunkid; | ||
| chunk_duplicate_and_truncate_operation(currentChunk, goal, targetChunk, length); | ||
| *targetChunkId = targetChunk->chunkid; | ||
| } | ||
|
|
||
| c->lockedto=(uint32_t)eventloop_time()+LOCKTIMEOUT; | ||
| c->lockid = lockid; | ||
| chunk_update_checksum(c); | ||
| emit_chunk_changed(c); | ||
| targetChunk->lockedto = eventloop_time() + LOCKTIMEOUT; | ||
| targetChunk->lockid = lockid; | ||
|
|
||
| chunk_update_checksum(targetChunk); | ||
| emit_chunk_changed(targetChunk); | ||
| return SAUNAFS_STATUS_OK; | ||
| } | ||
| #endif // ! METARESTORE |
There was a problem hiding this comment.
The refactoring of the monolithic chunk_multi_modify and chunk_multi_truncate functions into smaller, single-purpose helper functions (chunk_create_operation, chunk_lock_operation, chunk_modify, etc.) is an excellent improvement. It makes the complex logic for chunk creation and modification significantly easier to read, understand, and maintain. Great work on this.
3471071 to
209e321
Compare
209e321 to
ee8ae12
Compare
Signed-off-by: Dave <dave@leil.io>
ee8ae12 to
554350f
Compare
WIP
Signed-off-by: Dave dave@leil.io