Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 187 additions & 32 deletions src/chunkserver/chunk_high_level_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
#include "chunkserver/bgjobs.h"
#include "chunkserver/chunkserver_entry.h"
#include "chunkserver/hdd_readahead.h"
#include "chunkserver/hddspacemgr.h"
#include "chunkserver/masterconn.h"
#include "chunkserver/network_stats.h"
#include "chunkserver/network_worker_thread.h"
#include "protocol/cstocl.h"
#include "protocol/cstocs.h"

Expand Down Expand Up @@ -379,8 +381,8 @@ void WriteHighLevelOp::delayedCloseCallback(uint8_t status, void * /*entry*/) {
setNoWriteJobBeingProcessed();
}

assert(pendingDelayedJobs_ > 0);
pendingDelayedJobs_--;
assert(parentPendingWriteJobs() > 0);
parentPendingWriteJobs()--;
checkAndApplyClosedOnParent();
}

Expand All @@ -389,6 +391,10 @@ void WriteHighLevelOp::startNextWriteJob() {
safs::log_warn("({}) Called with no write data buffers.", __func__);
return;
}
if (writeDataBuffers_.front()->currentBlocks() == 0) {
safs::log_warn("({}) Called with no blocks in front InputBuffer.", __func__);
return;
}

if (isWriteJobBeingProcessed()) {
safs::log_warn("({}) Called with write job already in progress.", __func__);
Expand All @@ -409,14 +415,19 @@ void WriteHighLevelOp::writeCurrentInputPacket() {
}

void WriteHighLevelOp::continueWritingIfPossible() {
tryInstantReply();

if (!writeDataBuffers_.empty()) {
// there is a write buffer ready to be written, there should not be any
// write jobs being processed.
startNextWriteJob();
return;
}

if (inputBuffer_ != nullptr && !inputBuffer_->isBeingUpdated()) { writeCurrentInputPacket(); }
if (inputBuffer_ != nullptr && !inputBuffer_->isBeingUpdated()) {
assert(inputBuffer_->currentBlocks() > 0);
writeCurrentInputPacket();
}
}

void WriteHighLevelOp::writeFinishedCallback(uint8_t status, void * /*entry*/) {
Expand All @@ -426,16 +437,49 @@ void WriteHighLevelOp::writeFinishedCallback(uint8_t status, void * /*entry*/) {
}
setNoWriteJobBeingProcessed();

if (writeDataBuffers_.empty()) {
safs::log_warn("({}) No write data buffers available after write finished for chunkId {:016X}.",
__func__, chunkId_);

if (inDelayedClose_) {
parentPendingWriteJobs()--;
checkAndApplyClosedOnParent();
}

return;
}

auto statusWithWriteIdToReply = writeDataBuffers_.front()->getStatuses();
uint16_t alreadyRepliedBlocks = writeDataBuffers_.front()->repliedBlocks;

if (alreadyRepliedBlocks > 0) {
hddRemoveAlreadyRepliedInputBuffer(chunkId_, chunkType_, writeDataBuffers_.front());
}

getWriteInputBufferPool().put(std::move(writeDataBuffers_.front()));
writeDataBuffers_.pop_front();

for (const auto &[status, writeId] : statusWithWriteIdToReply) {
updateUsingWriteStatusAndReply(status, writeId);
if (status != SAUNAFS_STATUS_OK) { return; }
if (alreadyRepliedBlocks == 0) {
if (!inDelayedClose_) {
updateUsingWriteStatusAndReply(status, writeId);
if (status != SAUNAFS_STATUS_OK) { return; }
}
} else {
// Already replied
if (untoldStatus_ == SAUNAFS_STATUS_OK && status != SAUNAFS_STATUS_OK) {
untoldStatus_ = status;
}
alreadyRepliedBlocks--;
}
}

continueWritingIfPossible();

if (inDelayedClose_) {
parentPendingWriteJobs()--;
checkAndApplyClosedOnParent();
}
}

void WriteHighLevelOp::openWriteFinishedCallback(uint8_t status, void * /*entry*/) {
Expand Down Expand Up @@ -498,9 +542,13 @@ void WriteHighLevelOp::prepareForNewWriteData(bool mustForward, uint8_t *headerB
void WriteHighLevelOp::processWriteDataBlock(uint16_t blocknum, uint32_t opOffset, uint32_t opSize,
uint32_t writeId, uint32_t crc) {
inputBuffer_->setupLastWriteOperation(blocknum, opOffset, opSize, writeId, crc);
tryInstantReply();

// No write jobs in progress or current input buffer is full - write it
if (!isWriteJobBeingProcessed() || inputBuffer_->isFull()) { writeCurrentInputPacket(); }
if (!isWriteJobBeingProcessed() || inputBuffer_->isFull()) {
assert(inputBuffer_->currentBlocks() > 0);
writeCurrentInputPacket();
}
}

bool WriteHighLevelOp::isLastHeaderSizeValid() const { return inputBuffer_->isHeaderSizeValid(); }
Expand All @@ -517,29 +565,90 @@ ssize_t WriteHighLevelOp::writeData(int sock, size_t bytesToWrite) {
return inputBuffer_->writeToSocket(sock, bytesToWrite);
}

bool WriteHighLevelOp::isCompleted() const {
// Conditions:
// - no write job being processed
// - no partially completed writes (forward case)
// - no write data buffers waiting to be enqueued
// - no input buffer being filled (all data has been processed)
return !isWriteJobBeingProcessed() && partiallyCompletedWrites_.empty() &&
writeDataBuffers_.empty() && inputBuffer_ == nullptr;
bool WriteHighLevelOp::trySeal() {
if (inputBuffer_ != nullptr) {
if (inputBuffer_->isBeingUpdated()) { return false; }
if (inputBuffer_->currentBlocks() > inputBuffer_->repliedBlocks) {
return false;
}

for (auto buff : writeDataBuffers_) {
if (buff->currentBlocks() > buff->repliedBlocks) {
safs::log_warn(
"({}) Write data buffer has un-replied blocks and input buffer is replied.",
__func__);
return false;
}
}

// There is an input buffer, all its data has been processed and replied
// so we can write it out to complete the operation
assert(inputBuffer_->currentBlocks() > 0);
writeCurrentInputPacket();
} else {
// inputBuffer_ == nullptr
for (auto buff : writeDataBuffers_) {
if (buff->currentBlocks() > buff->repliedBlocks) {
// There is a write data buffer with un-replied blocks and no input buffer
return false;
}
}
}

// All buffers have been replied, check partially completed writes
isSealed_ = partiallyCompletedWrites_.empty();

return isSealed_;
}

void WriteHighLevelOp::delayedClose() {
workerJobPool()->disableJob(writeJobId_);
workerJobPool()->changeCallback(
writeJobId_,
[this](uint8_t status, void *entry) { this->delayedCloseCallback(status, entry); },
kEmptyExtra);
// Only write init received - disable open write job
if (isOpenWriteJobBeingProcessed()) {
workerJobPool()->disableJob(writeJobId_);
workerJobPool()->changeCallback(
writeJobId_,
[this](uint8_t status, void *entry) { this->delayedCloseCallback(status, entry); },
kEmptyExtra);

inDelayedClose_ = true;
parentPendingWriteJobs()++;
// When open write job is being processed no writes can be instantly replied, so that's it.
return;
}

// Some write data jobs received, handle input buffer
if (inputBuffer_ != nullptr) {
/// Drop the input buffer, it won't be used anymore
getWriteInputBufferPool().put(std::move(inputBuffer_));
if (inputBuffer_->repliedBlocks > 0) {
/// There are replied blocks in the input buffer, move it to write data buffers
if (inputBuffer_->isBeingUpdated()) {
// Unfinished update - we need to drop that last block
inputBuffer_->getWriteInfoVector().pop_back();
}
writeDataBuffers_.emplace_back(std::move(inputBuffer_));
} else {
/// Drop the input buffer, it won't be used anymore
getWriteInputBufferPool().put(std::move(inputBuffer_));
}
}
// Input buffer is now null
assert(inputBuffer_ == nullptr);

// Drop write data buffers with no replied blocks
while (writeDataBuffers_.size() > (isWriteJobBeingProcessed() ? 1 : 0) &&
writeDataBuffers_.back()->repliedBlocks == 0) {
getWriteInputBufferPool().put(std::move(writeDataBuffers_.back()));
writeDataBuffers_.pop_back();
}

pendingDelayedJobs_++;
if (!isWriteJobBeingProcessed() && !writeDataBuffers_.empty()) {
// No write job in progress, start one for the last write data buffer
startNextWriteJob();
}

// Pending write jobs will be handled in writeFinishedCallback, they need to write out the
// buffers
inDelayedClose_ = true;
parentPendingWriteJobs() += writeDataBuffers_.size();
}

void WriteHighLevelOp::cleanup() {
Expand All @@ -549,36 +658,82 @@ void WriteHighLevelOp::cleanup() {
}

while (!writeDataBuffers_.empty()) {
if (writeDataBuffers_.front()->repliedBlocks > 0) {
hddRemoveAlreadyRepliedInputBuffer(chunkId_, chunkType_, writeDataBuffers_.front());
}
getWriteInputBufferPool().put(std::move(writeDataBuffers_.front()));
writeDataBuffers_.pop_front();
}

if (inputBuffer_ != nullptr) {
/// Drop the input buffer, it won't be used anymore
if (inputBuffer_->repliedBlocks > 0) {
hddRemoveAlreadyRepliedInputBuffer(chunkId_, chunkType_, inputBuffer_);
}
getWriteInputBufferPool().put(std::move(inputBuffer_));
}

if (isChunkOpen_) {
if (isChunkLocked_) {
// We need to wait for the metadata to be synced before releasing the lock, so we use a
// callback to release the lock afterward
job_close(*workerJobPool(), jobCloseWriteCallback(chunkId_, chunkType_, SAUNAFS_STATUS_OK),
job_close(*workerJobPool(), jobCloseWriteCallback(chunkId_, chunkType_, untoldStatus_),
chunkId_, chunkType_);
} else {
job_close(*workerJobPool(), kEmptyCallback, chunkId_, chunkType_);
}
isChunkOpen_ = false;
} else if (isChunkLocked_) {
masterconn_get_job_pool()->endChunkLock(chunkId_, chunkType_, SAUNAFS_STATUS_OK);
masterconn_get_job_pool()->endChunkLock(chunkId_, chunkType_, untoldStatus_);
}
}

isChunkLocked_ = false;
partiallyCompletedWrites_.clear();
chunkId_ = 0;
chunkVersion_ = 0;
nextInputBufferBlockCount_ =
std::min(kDefaultInitialNextInputBufferBlockCount, maxBlocksPerHddWriteJob_);
chunkType_ = slice_traits::standard::ChunkPartType();
void WriteHighLevelOp::tryInstantReply() {
// No need to try instant reply if:
// - sealed: everything has been replied already.
// - in delayed close: we won't accept new write data and we will reply to pending ones in
// writeFinishedCallback, so no need to try here.
// - open write job being processed: we haven't received of the initial open write job.
// - chunk not locked: we cannot perform instant reply if the chunk is not locked because
// master will not receive the status if there is some failure.
if (isSealed_ || inDelayedClose_ || isOpenWriteJobBeingProcessed() || !isChunkLocked_) {
return;
}

auto bufferLevelInstantReply = [this](std::shared_ptr<InputBuffer> &buffer) {
auto &writeInfoVec = buffer->getWriteInfoVector();

if (writeInfoVec.size() > 0 && buffer->repliedBlocks == 0) {
hddInsertAlreadyRepliedInputBuffer(chunkId_, chunkType_, buffer);
}

modifyAvailableWriteBufferingBlocks(
-static_cast<int32_t>(writeInfoVec.size() - buffer->repliedBlocks));
while (buffer->repliedBlocks < writeInfoVec.size()) {
const auto &writeInfo = writeInfoVec[buffer->repliedBlocks];
updateUsingWriteStatusAndReply(SAUNAFS_STATUS_OK, writeInfo.writeId);
buffer->repliedBlocks++;
}
};

for (auto &buff : writeDataBuffers_) {
if (buff->repliedBlocks == buff->currentBlocks()) { continue; }

if (getAvailableWriteBufferingBlocks() + (int32_t)buff->repliedBlocks <
(int32_t)buff->currentBlocks()) {
return;
}

bufferLevelInstantReply(buff);
}

if (inputBuffer_ == nullptr || inputBuffer_->isBeingUpdated() ||
inputBuffer_->repliedBlocks == inputBuffer_->currentBlocks() ||
getAvailableWriteBufferingBlocks() + (int32_t)inputBuffer_->repliedBlocks <
(int32_t)inputBuffer_->currentBlocks()) {
return;
}

bufferLevelInstantReply(inputBuffer_);
}

std::function<void(uint8_t status, void *packet)> jobCloseWriteCallback(uint64_t chunkId,
Expand Down
19 changes: 16 additions & 3 deletions src/chunkserver/chunk_high_level_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class HighLevelOp {
/// Job pool of the network worker handling the parent ChunkserverEntry.
ClientJobPool *workerJobPool() const { return parent_->workerJobPool; }

uint32_t &parentPendingWriteJobs() { return parent_->pendingWriteJobs; }

/// Checks and applies closing on the parent ChunkserverEntry.
void checkAndApplyClosedOnParent() const { parent_->checkAndApplyClosed(); }

Expand Down Expand Up @@ -264,7 +266,11 @@ class WriteHighLevelOp : public HighLevelOp {
ssize_t writeData(int sock, size_t bytesToWrite);

/// Checks if the write operation is completed.
bool isCompleted() const;
bool trySeal();

bool isSealed() const { return isSealed_; }

bool isCompleted() const { return writeDataBuffers_.empty() && isSealed_; }

/// Performs a delayed close of the write operation.
/// Moves input buffer to its pool if not null.
Expand All @@ -273,6 +279,7 @@ class WriteHighLevelOp : public HighLevelOp {
/// Closes chunk if open.
void cleanup();

void tryInstantReply();
protected:
/// Checks if there is an open write job being processed.
bool isOpenWriteJobBeingProcessed() const;
Expand Down Expand Up @@ -313,11 +320,17 @@ class WriteHighLevelOp : public HighLevelOp {
/// Size in blocks of the next input buffer.
uint16_t nextInputBufferBlockCount_;

/// Indicates if the operation is in delayed close:
/// - no new replies issued
/// - decrease pending write jobs in parent on delayed close completion
bool inDelayedClose_ = false; ///< Indicates if the operation is in delayed close.
/// Indicates if the chunk is locked for writing. If true, master will be waiting for the lock
/// to be released when the write operation finishes.
bool isChunkLocked_ = false;
uint32_t writeJobId_ = 0; ///< ID of the current write job being processed
uint32_t writeJobWriteId_ = 0; ///< Specific write operation from client
bool isSealed_ = false; ///< Indicates if the write operation has ended.
uint8_t untoldStatus_ = SAUNAFS_STATUS_OK; ///< Untold status to be sent to master at cleanup.
uint32_t writeJobId_ = 0; ///< ID of the current write job being processed
uint32_t writeJobWriteId_ = 0; ///< Specific write operation from client
std::shared_ptr<InputBuffer> inputBuffer_ = nullptr; ///< Buffer for the current write job
/// writeJobWriteId's which:
/// - have been completed by our worker, but need ack from the next
Expand Down
Loading
Loading