2424#include " chunkserver/bgjobs.h"
2525#include " chunkserver/chunkserver_entry.h"
2626#include " chunkserver/hdd_readahead.h"
27+ #include " chunkserver/hddspacemgr.h"
2728#include " chunkserver/masterconn.h"
2829#include " chunkserver/network_stats.h"
30+ #include " chunkserver/network_worker_thread.h"
2931#include " protocol/cstocl.h"
3032#include " protocol/cstocs.h"
3133
@@ -379,8 +381,8 @@ void WriteHighLevelOp::delayedCloseCallback(uint8_t status, void * /*entry*/) {
379381 setNoWriteJobBeingProcessed ();
380382 }
381383
382- assert (pendingDelayedJobs_ > 0 );
383- pendingDelayedJobs_ --;
384+ assert (parentPendingWriteJobs () > 0 );
385+ parentPendingWriteJobs () --;
384386 checkAndApplyClosedOnParent ();
385387}
386388
@@ -389,6 +391,10 @@ void WriteHighLevelOp::startNextWriteJob() {
389391 safs::log_warn (" ({}) Called with no write data buffers." , __func__);
390392 return ;
391393 }
394+ if (writeDataBuffers_.front ()->currentBlocks () == 0 ) {
395+ safs::log_warn (" ({}) Called with no blocks in front InputBuffer." , __func__);
396+ return ;
397+ }
392398
393399 if (isWriteJobBeingProcessed ()) {
394400 safs::log_warn (" ({}) Called with write job already in progress." , __func__);
@@ -409,14 +415,19 @@ void WriteHighLevelOp::writeCurrentInputPacket() {
409415}
410416
411417void WriteHighLevelOp::continueWritingIfPossible () {
418+ tryInstantReply ();
419+
412420 if (!writeDataBuffers_.empty ()) {
413421 // there is a write buffer ready to be written, there should not be any
414422 // write jobs being processed.
415423 startNextWriteJob ();
416424 return ;
417425 }
418426
419- if (inputBuffer_ != nullptr && !inputBuffer_->isBeingUpdated ()) { writeCurrentInputPacket (); }
427+ if (inputBuffer_ != nullptr && !inputBuffer_->isBeingUpdated ()) {
428+ assert (inputBuffer_->currentBlocks () > 0 );
429+ writeCurrentInputPacket ();
430+ }
420431}
421432
422433void WriteHighLevelOp::writeFinishedCallback (uint8_t status, void * /* entry*/ ) {
@@ -426,16 +437,49 @@ void WriteHighLevelOp::writeFinishedCallback(uint8_t status, void * /*entry*/) {
426437 }
427438 setNoWriteJobBeingProcessed ();
428439
440+ if (writeDataBuffers_.empty ()) {
441+ safs::log_warn (" ({}) No write data buffers available after write finished for chunkId {:016X}." ,
442+ __func__, chunkId_);
443+
444+ if (inDelayedClose_) {
445+ parentPendingWriteJobs ()--;
446+ checkAndApplyClosedOnParent ();
447+ }
448+
449+ return ;
450+ }
451+
429452 auto statusWithWriteIdToReply = writeDataBuffers_.front ()->getStatuses ();
453+ uint16_t alreadyRepliedBlocks = writeDataBuffers_.front ()->repliedBlocks ;
454+
455+ if (alreadyRepliedBlocks > 0 ) {
456+ hddRemoveAlreadyRepliedInputBuffer (chunkId_, chunkType_, writeDataBuffers_.front ());
457+ }
458+
430459 getWriteInputBufferPool ().put (std::move (writeDataBuffers_.front ()));
431460 writeDataBuffers_.pop_front ();
432461
433462 for (const auto &[status, writeId] : statusWithWriteIdToReply) {
434- updateUsingWriteStatusAndReply (status, writeId);
435- if (status != SAUNAFS_STATUS_OK) { return ; }
463+ if (alreadyRepliedBlocks == 0 ) {
464+ if (!inDelayedClose_) {
465+ updateUsingWriteStatusAndReply (status, writeId);
466+ if (status != SAUNAFS_STATUS_OK) { return ; }
467+ }
468+ } else {
469+ // Already replied
470+ if (untoldStatus_ == SAUNAFS_STATUS_OK && status != SAUNAFS_STATUS_OK) {
471+ untoldStatus_ = status;
472+ }
473+ alreadyRepliedBlocks--;
474+ }
436475 }
437476
438477 continueWritingIfPossible ();
478+
479+ if (inDelayedClose_) {
480+ parentPendingWriteJobs ()--;
481+ checkAndApplyClosedOnParent ();
482+ }
439483}
440484
441485void WriteHighLevelOp::openWriteFinishedCallback (uint8_t status, void * /* entry*/ ) {
@@ -498,9 +542,13 @@ void WriteHighLevelOp::prepareForNewWriteData(bool mustForward, uint8_t *headerB
498542void WriteHighLevelOp::processWriteDataBlock (uint16_t blocknum, uint32_t opOffset, uint32_t opSize,
499543 uint32_t writeId, uint32_t crc) {
500544 inputBuffer_->setupLastWriteOperation (blocknum, opOffset, opSize, writeId, crc);
545+ tryInstantReply ();
501546
502547 // No write jobs in progress or current input buffer is full - write it
503- if (!isWriteJobBeingProcessed () || inputBuffer_->isFull ()) { writeCurrentInputPacket (); }
548+ if (!isWriteJobBeingProcessed () || inputBuffer_->isFull ()) {
549+ assert (inputBuffer_->currentBlocks () > 0 );
550+ writeCurrentInputPacket ();
551+ }
504552}
505553
506554bool WriteHighLevelOp::isLastHeaderSizeValid () const { return inputBuffer_->isHeaderSizeValid (); }
@@ -517,29 +565,90 @@ ssize_t WriteHighLevelOp::writeData(int sock, size_t bytesToWrite) {
517565 return inputBuffer_->writeToSocket (sock, bytesToWrite);
518566}
519567
520- bool WriteHighLevelOp::isCompleted () const {
521- // Conditions:
522- // - no write job being processed
523- // - no partially completed writes (forward case)
524- // - no write data buffers waiting to be enqueued
525- // - no input buffer being filled (all data has been processed)
526- return !isWriteJobBeingProcessed () && partiallyCompletedWrites_.empty () &&
527- writeDataBuffers_.empty () && inputBuffer_ == nullptr ;
568+ bool WriteHighLevelOp::trySeal () {
569+ if (inputBuffer_ != nullptr ) {
570+ if (inputBuffer_->isBeingUpdated ()) { return false ; }
571+ if (inputBuffer_->currentBlocks () > inputBuffer_->repliedBlocks ) {
572+ return false ;
573+ }
574+
575+ for (auto buff : writeDataBuffers_) {
576+ if (buff->currentBlocks () > buff->repliedBlocks ) {
577+ safs::log_warn (
578+ " ({}) Write data buffer has un-replied blocks and input buffer is replied." ,
579+ __func__);
580+ return false ;
581+ }
582+ }
583+
584+ // There is an input buffer, all its data has been processed and replied
585+ // so we can write it out to complete the operation
586+ assert (inputBuffer_->currentBlocks () > 0 );
587+ writeCurrentInputPacket ();
588+ } else {
589+ // inputBuffer_ == nullptr
590+ for (auto buff : writeDataBuffers_) {
591+ if (buff->currentBlocks () > buff->repliedBlocks ) {
592+ // There is a write data buffer with un-replied blocks and no input buffer
593+ return false ;
594+ }
595+ }
596+ }
597+
598+ // All buffers have been replied, check partially completed writes
599+ isSealed_ = partiallyCompletedWrites_.empty ();
600+
601+ return isSealed_;
528602}
529603
530604void WriteHighLevelOp::delayedClose () {
531- workerJobPool ()->disableJob (writeJobId_);
532- workerJobPool ()->changeCallback (
533- writeJobId_,
534- [this ](uint8_t status, void *entry) { this ->delayedCloseCallback (status, entry); },
535- kEmptyExtra );
605+ // Only write init received - disable open write job
606+ if (isOpenWriteJobBeingProcessed ()) {
607+ workerJobPool ()->disableJob (writeJobId_);
608+ workerJobPool ()->changeCallback (
609+ writeJobId_,
610+ [this ](uint8_t status, void *entry) { this ->delayedCloseCallback (status, entry); },
611+ kEmptyExtra );
612+
613+ inDelayedClose_ = true ;
614+ parentPendingWriteJobs ()++;
615+ // When open write job is being processed no writes can be instantly replied, so that's it.
616+ return ;
617+ }
536618
619+ // Some write data jobs received, handle input buffer
537620 if (inputBuffer_ != nullptr ) {
538- // / Drop the input buffer, it won't be used anymore
539- getWriteInputBufferPool ().put (std::move (inputBuffer_));
621+ if (inputBuffer_->repliedBlocks > 0 ) {
622+ // / There are replied blocks in the input buffer, move it to write data buffers
623+ if (inputBuffer_->isBeingUpdated ()) {
624+ // Unfinished update - we need to drop that last block
625+ inputBuffer_->getWriteInfoVector ().pop_back ();
626+ }
627+ writeDataBuffers_.emplace_back (std::move (inputBuffer_));
628+ } else {
629+ // / Drop the input buffer, it won't be used anymore
630+ getWriteInputBufferPool ().put (std::move (inputBuffer_));
631+ }
632+ }
633+ // Input buffer is now null
634+ assert (inputBuffer_ == nullptr );
635+
636+ // Drop write data buffers with no replied blocks
637+ while (writeDataBuffers_.size () > (isWriteJobBeingProcessed () ? 1 : 0 ) &&
638+ writeDataBuffers_.back ()->repliedBlocks == 0 ) {
639+ getWriteInputBufferPool ().put (std::move (writeDataBuffers_.back ()));
640+ writeDataBuffers_.pop_back ();
540641 }
541642
542- pendingDelayedJobs_++;
643+ if (!isWriteJobBeingProcessed () && !writeDataBuffers_.empty ()) {
644+ // No write job in progress, start one for the last write data buffer
645+ startNextWriteJob ();
646+ }
647+
648+ // Pending write jobs will be handled in writeFinishedCallback, they need to write out the
649+ // buffers
650+ inDelayedClose_ = true ;
651+ parentPendingWriteJobs () += writeDataBuffers_.size ();
543652}
544653
545654void WriteHighLevelOp::cleanup () {
@@ -549,36 +658,82 @@ void WriteHighLevelOp::cleanup() {
549658 }
550659
551660 while (!writeDataBuffers_.empty ()) {
661+ if (writeDataBuffers_.front ()->repliedBlocks > 0 ) {
662+ hddRemoveAlreadyRepliedInputBuffer (chunkId_, chunkType_, writeDataBuffers_.front ());
663+ }
552664 getWriteInputBufferPool ().put (std::move (writeDataBuffers_.front ()));
553665 writeDataBuffers_.pop_front ();
554666 }
555667
556668 if (inputBuffer_ != nullptr ) {
557669 // / Drop the input buffer, it won't be used anymore
670+ if (inputBuffer_->repliedBlocks > 0 ) {
671+ hddRemoveAlreadyRepliedInputBuffer (chunkId_, chunkType_, inputBuffer_);
672+ }
558673 getWriteInputBufferPool ().put (std::move (inputBuffer_));
559674 }
560675
561676 if (isChunkOpen_) {
562677 if (isChunkLocked_) {
563678 // We need to wait for the metadata to be synced before releasing the lock, so we use a
564679 // callback to release the lock afterward
565- job_close (*workerJobPool (), jobCloseWriteCallback (chunkId_, chunkType_, SAUNAFS_STATUS_OK ),
680+ job_close (*workerJobPool (), jobCloseWriteCallback (chunkId_, chunkType_, untoldStatus_ ),
566681 chunkId_, chunkType_);
567682 } else {
568683 job_close (*workerJobPool (), kEmptyCallback , chunkId_, chunkType_);
569684 }
570- isChunkOpen_ = false ;
571685 } else if (isChunkLocked_) {
572- masterconn_get_job_pool ()->endChunkLock (chunkId_, chunkType_, SAUNAFS_STATUS_OK );
686+ masterconn_get_job_pool ()->endChunkLock (chunkId_, chunkType_, untoldStatus_ );
573687 }
688+ }
574689
575- isChunkLocked_ = false ;
576- partiallyCompletedWrites_.clear ();
577- chunkId_ = 0 ;
578- chunkVersion_ = 0 ;
579- nextInputBufferBlockCount_ =
580- std::min (kDefaultInitialNextInputBufferBlockCount , maxBlocksPerHddWriteJob_);
581- chunkType_ = slice_traits::standard::ChunkPartType ();
690+ void WriteHighLevelOp::tryInstantReply () {
691+ // No need to try instant reply if:
692+ // - sealed: everything has been replied already.
693+ // - in delayed close: we won't accept new write data and we will reply to pending ones in
694+ // writeFinishedCallback, so no need to try here.
695+ // - open write job being processed: we haven't received of the initial open write job.
696+ // - chunk not locked: we cannot perform instant reply if the chunk is not locked because
697+ // master will not receive the status if there is some failure.
698+ if (isSealed_ || inDelayedClose_ || isOpenWriteJobBeingProcessed () || !isChunkLocked_) {
699+ return ;
700+ }
701+
702+ auto bufferLevelInstantReply = [this ](std::shared_ptr<InputBuffer> &buffer) {
703+ auto &writeInfoVec = buffer->getWriteInfoVector ();
704+
705+ if (writeInfoVec.size () > 0 && buffer->repliedBlocks == 0 ) {
706+ hddInsertAlreadyRepliedInputBuffer (chunkId_, chunkType_, buffer);
707+ }
708+
709+ modifyAvailableWriteBufferingBlocks (
710+ -static_cast <int32_t >(writeInfoVec.size () - buffer->repliedBlocks ));
711+ while (buffer->repliedBlocks < writeInfoVec.size ()) {
712+ const auto &writeInfo = writeInfoVec[buffer->repliedBlocks ];
713+ updateUsingWriteStatusAndReply (SAUNAFS_STATUS_OK, writeInfo.writeId );
714+ buffer->repliedBlocks ++;
715+ }
716+ };
717+
718+ for (auto &buff : writeDataBuffers_) {
719+ if (buff->repliedBlocks == buff->currentBlocks ()) { continue ; }
720+
721+ if (getAvailableWriteBufferingBlocks () + (int32_t )buff->repliedBlocks <
722+ (int32_t )buff->currentBlocks ()) {
723+ return ;
724+ }
725+
726+ bufferLevelInstantReply (buff);
727+ }
728+
729+ if (inputBuffer_ == nullptr || inputBuffer_->isBeingUpdated () ||
730+ inputBuffer_->repliedBlocks == inputBuffer_->currentBlocks () ||
731+ getAvailableWriteBufferingBlocks () + (int32_t )inputBuffer_->repliedBlocks <
732+ (int32_t )inputBuffer_->currentBlocks ()) {
733+ return ;
734+ }
735+
736+ bufferLevelInstantReply (inputBuffer_);
582737}
583738
584739std::function<void (uint8_t status, void *packet)> jobCloseWriteCallback (uint64_t chunkId,
0 commit comments