2424#include " chunkserver/bgjobs.h"
2525#include " chunkserver/chunkserver_entry.h"
2626#include " chunkserver/hdd_readahead.h"
27+ #include " chunkserver/hddspacemgr.h"
2728#include " chunkserver/masterconn.h"
2829#include " chunkserver/network_stats.h"
30+ #include " chunkserver/network_worker_thread.h"
2931#include " protocol/cstocl.h"
3032#include " protocol/cstocs.h"
3133
@@ -375,8 +377,8 @@ void WriteHighLevelOp::delayedCloseCallback(uint8_t status, void * /*entry*/) {
375377 setNoWriteJobBeingProcessed ();
376378 }
377379
378- assert (pendingDelayedJobs_ > 0 );
379- pendingDelayedJobs_ --;
380+ assert (parentPendingWriteJobs () > 0 );
381+ parentPendingWriteJobs () --;
380382 checkAndApplyClosedOnParent ();
381383}
382384
@@ -385,6 +387,10 @@ void WriteHighLevelOp::startNextWriteJob() {
385387 safs::log_warn (" ({}) Called with no write data buffers." , __func__);
386388 return ;
387389 }
390+ if (writeDataBuffers_.front ()->currentBlocks () == 0 ) {
391+ safs::log_warn (" ({}) Called with no blocks in front InputBuffer." , __func__);
392+ return ;
393+ }
388394
389395 if (isWriteJobBeingProcessed ()) {
390396 safs::log_warn (" ({}) Called with write job already in progress." , __func__);
@@ -405,14 +411,19 @@ void WriteHighLevelOp::writeCurrentInputPacket() {
405411}
406412
407413void WriteHighLevelOp::continueWritingIfPossible () {
414+ tryInstantReply ();
415+
408416 if (!writeDataBuffers_.empty ()) {
409417 // there is a write buffer ready to be written, there should not be any
410418 // write jobs being processed.
411419 startNextWriteJob ();
412420 return ;
413421 }
414422
415- if (inputBuffer_ != nullptr && !inputBuffer_->isBeingUpdated ()) { writeCurrentInputPacket (); }
423+ if (inputBuffer_ != nullptr && !inputBuffer_->isBeingUpdated ()) {
424+ assert (inputBuffer_->currentBlocks () > 0 );
425+ writeCurrentInputPacket ();
426+ }
416427}
417428
418429void WriteHighLevelOp::writeFinishedCallback (uint8_t status, void * /* entry*/ ) {
@@ -422,16 +433,49 @@ void WriteHighLevelOp::writeFinishedCallback(uint8_t status, void * /*entry*/) {
422433 }
423434 setNoWriteJobBeingProcessed ();
424435
436+ if (writeDataBuffers_.empty ()) {
437+ safs::log_warn (" ({}) No write data buffers available after write finished for chunkId {:016X}." ,
438+ __func__, chunkId_);
439+
440+ if (inDelayedClose_) {
441+ parentPendingWriteJobs ()--;
442+ checkAndApplyClosedOnParent ();
443+ }
444+
445+ return ;
446+ }
447+
425448 auto statusWithWriteIdToReply = writeDataBuffers_.front ()->getStatuses ();
449+ uint16_t alreadyRepliedBlocks = writeDataBuffers_.front ()->repliedBlocks ;
450+
451+ if (alreadyRepliedBlocks > 0 ) {
452+ hddRemoveAlreadyRepliedInputBuffer (chunkId_, chunkType_, writeDataBuffers_.front ());
453+ }
454+
426455 getWriteInputBufferPool ().put (std::move (writeDataBuffers_.front ()));
427456 writeDataBuffers_.pop_front ();
428457
429458 for (const auto &[status, writeId] : statusWithWriteIdToReply) {
430- updateUsingWriteStatusAndReply (status, writeId);
431- if (status != SAUNAFS_STATUS_OK) { return ; }
459+ if (alreadyRepliedBlocks == 0 ) {
460+ if (!inDelayedClose_) {
461+ updateUsingWriteStatusAndReply (status, writeId);
462+ if (status != SAUNAFS_STATUS_OK) { return ; }
463+ }
464+ } else {
465+ // Already replied
466+ if (untoldStatus_ == SAUNAFS_STATUS_OK && status != SAUNAFS_STATUS_OK) {
467+ untoldStatus_ = status;
468+ }
469+ alreadyRepliedBlocks--;
470+ }
432471 }
433472
434473 continueWritingIfPossible ();
474+
475+ if (inDelayedClose_) {
476+ parentPendingWriteJobs ()--;
477+ checkAndApplyClosedOnParent ();
478+ }
435479}
436480
437481void WriteHighLevelOp::openWriteFinishedCallback (uint8_t status, void * /* entry*/ ) {
@@ -491,9 +535,13 @@ void WriteHighLevelOp::prepareForNewWriteData(bool mustForward, uint8_t *headerB
491535void WriteHighLevelOp::processWriteDataBlock (uint16_t blocknum, uint32_t opOffset, uint32_t opSize,
492536 uint32_t writeId, uint32_t crc) {
493537 inputBuffer_->setupLastWriteOperation (blocknum, opOffset, opSize, writeId, crc);
538+ tryInstantReply ();
494539
495540 // No write jobs in progress or current input buffer is full - write it
496- if (!isWriteJobBeingProcessed () || inputBuffer_->isFull ()) { writeCurrentInputPacket (); }
541+ if (!isWriteJobBeingProcessed () || inputBuffer_->isFull ()) {
542+ assert (inputBuffer_->currentBlocks () > 0 );
543+ writeCurrentInputPacket ();
544+ }
497545}
498546
499547bool WriteHighLevelOp::isLastHeaderSizeValid () const { return inputBuffer_->isHeaderSizeValid (); }
@@ -510,29 +558,90 @@ ssize_t WriteHighLevelOp::writeData(int sock, size_t bytesToWrite) {
510558 return inputBuffer_->writeToSocket (sock, bytesToWrite);
511559}
512560
513- bool WriteHighLevelOp::isCompleted () const {
514- // Conditions:
515- // - no write job being processed
516- // - no partially completed writes (forward case)
517- // - no write data buffers waiting to be enqueued
518- // - no input buffer being filled (all data has been processed)
519- return !isWriteJobBeingProcessed () && partiallyCompletedWrites_.empty () &&
520- writeDataBuffers_.empty () && inputBuffer_ == nullptr ;
561+ bool WriteHighLevelOp::trySeal () {
562+ if (inputBuffer_ != nullptr ) {
563+ if (inputBuffer_->isBeingUpdated ()) { return false ; }
564+ if (inputBuffer_->currentBlocks () > inputBuffer_->repliedBlocks ) {
565+ return false ;
566+ }
567+
568+ for (auto buff : writeDataBuffers_) {
569+ if (buff->currentBlocks () > buff->repliedBlocks ) {
570+ safs::log_warn (
571+ " ({}) Write data buffer has un-replied blocks and input buffer is replied." ,
572+ __func__);
573+ return false ;
574+ }
575+ }
576+
577+ // There is an input buffer, all its data has been processed and replied
578+ // so we can write it out to complete the operation
579+ assert (inputBuffer_->currentBlocks () > 0 );
580+ writeCurrentInputPacket ();
581+ } else {
582+ // inputBuffer_ == nullptr
583+ for (auto buff : writeDataBuffers_) {
584+ if (buff->currentBlocks () > buff->repliedBlocks ) {
585+ // There is a write data buffer with un-replied blocks and no input buffer
586+ return false ;
587+ }
588+ }
589+ }
590+
591+ // All buffers have been replied, check partially completed writes
592+ isSealed_ = partiallyCompletedWrites_.empty ();
593+
594+ return isSealed_;
521595}
522596
523597void WriteHighLevelOp::delayedClose () {
524- workerJobPool ()->disableJob (writeJobId_);
525- workerJobPool ()->changeCallback (
526- writeJobId_,
527- [this ](uint8_t status, void *entry) { this ->delayedCloseCallback (status, entry); },
528- kEmptyExtra );
598+ // Only write init received - disable open write job
599+ if (isOpenWriteJobBeingProcessed ()) {
600+ workerJobPool ()->disableJob (writeJobId_);
601+ workerJobPool ()->changeCallback (
602+ writeJobId_,
603+ [this ](uint8_t status, void *entry) { this ->delayedCloseCallback (status, entry); },
604+ kEmptyExtra );
605+
606+ inDelayedClose_ = true ;
607+ parentPendingWriteJobs ()++;
608+ // When open write job is being processed no writes can be instantly replied, so that's it.
609+ return ;
610+ }
529611
612+ // Some write data jobs received, handle input buffer
530613 if (inputBuffer_ != nullptr ) {
531- // / Drop the input buffer, it won't be used anymore
532- getWriteInputBufferPool ().put (std::move (inputBuffer_));
614+ if (inputBuffer_->repliedBlocks > 0 ) {
615+ // / There are replied blocks in the input buffer, move it to write data buffers
616+ if (inputBuffer_->isBeingUpdated ()) {
617+ // Unfinished update - we need to drop that last block
618+ inputBuffer_->getWriteInfoVector ().pop_back ();
619+ }
620+ writeDataBuffers_.emplace_back (std::move (inputBuffer_));
621+ } else {
622+ // / Drop the input buffer, it won't be used anymore
623+ getWriteInputBufferPool ().put (std::move (inputBuffer_));
624+ }
625+ }
626+ // Input buffer is now null
627+ assert (inputBuffer_ == nullptr );
628+
629+ // Drop write data buffers with no replied blocks
630+ while (writeDataBuffers_.size () > (isWriteJobBeingProcessed () ? 1 : 0 ) &&
631+ writeDataBuffers_.back ()->repliedBlocks == 0 ) {
632+ getWriteInputBufferPool ().put (std::move (writeDataBuffers_.back ()));
633+ writeDataBuffers_.pop_back ();
533634 }
534635
535- pendingDelayedJobs_++;
636+ if (!isWriteJobBeingProcessed () && !writeDataBuffers_.empty ()) {
637+ // No write job in progress, start one for the last write data buffer
638+ startNextWriteJob ();
639+ }
640+
641+ // Pending write jobs will be handled in writeFinishedCallback, they need to write out the
642+ // buffers
643+ inDelayedClose_ = true ;
644+ parentPendingWriteJobs () += writeDataBuffers_.size ();
536645}
537646
538647void WriteHighLevelOp::cleanup () {
@@ -542,34 +651,82 @@ void WriteHighLevelOp::cleanup() {
542651 }
543652
544653 while (!writeDataBuffers_.empty ()) {
654+ if (writeDataBuffers_.front ()->repliedBlocks > 0 ) {
655+ hddRemoveAlreadyRepliedInputBuffer (chunkId_, chunkType_, writeDataBuffers_.front ());
656+ }
545657 getWriteInputBufferPool ().put (std::move (writeDataBuffers_.front ()));
546658 writeDataBuffers_.pop_front ();
547659 }
548660
549661 if (inputBuffer_ != nullptr ) {
550662 // / Drop the input buffer, it won't be used anymore
663+ if (inputBuffer_->repliedBlocks > 0 ) {
664+ hddRemoveAlreadyRepliedInputBuffer (chunkId_, chunkType_, inputBuffer_);
665+ }
551666 getWriteInputBufferPool ().put (std::move (inputBuffer_));
552667 }
553668
554669 if (isChunkOpen_) {
555670 if (isChunkLocked_) {
556671 // We need to wait for the metadata to be synced before releasing the lock, so we use a
557672 // callback to release the lock afterward
558- job_close (*workerJobPool (), jobCloseWriteCallback (chunkId_, chunkType_, SAUNAFS_STATUS_OK ),
673+ job_close (*workerJobPool (), jobCloseWriteCallback (chunkId_, chunkType_, untoldStatus_ ),
559674 chunkId_, chunkType_);
560675 } else {
561676 job_close (*workerJobPool (), kEmptyCallback , chunkId_, chunkType_);
562677 }
563- isChunkOpen_ = false ;
564678 } else if (isChunkLocked_) {
565- masterconn_get_job_pool ()->endChunkLock (chunkId_, chunkType_, SAUNAFS_STATUS_OK );
679+ masterconn_get_job_pool ()->endChunkLock (chunkId_, chunkType_, untoldStatus_ );
566680 }
681+ }
567682
568- isChunkLocked_ = false ;
569- partiallyCompletedWrites_.clear ();
570- chunkId_ = 0 ;
571- chunkVersion_ = 0 ;
572- chunkType_ = slice_traits::standard::ChunkPartType ();
683+ void WriteHighLevelOp::tryInstantReply () {
684+ // No need to try instant reply if:
685+ // - sealed: everything has been replied already.
686+ // - in delayed close: we won't accept new write data and we will reply to pending ones in
687+ // writeFinishedCallback, so no need to try here.
688+ // - open write job being processed: we haven't received of the initial open write job.
689+ // - chunk not locked: we cannot perform instant reply if the chunk is not locked because
690+ // master will not receive the status if there is some failure.
691+ if (isSealed_ || inDelayedClose_ || isOpenWriteJobBeingProcessed () || !isChunkLocked_) {
692+ return ;
693+ }
694+
695+ auto bufferLevelInstantReply = [this ](std::shared_ptr<InputBuffer> &buffer) {
696+ auto &writeInfoVec = buffer->getWriteInfoVector ();
697+
698+ if (writeInfoVec.size () > 0 && buffer->repliedBlocks == 0 ) {
699+ hddInsertAlreadyRepliedInputBuffer (chunkId_, chunkType_, buffer);
700+ }
701+
702+ modifyAvailableWriteBufferingBlocks (
703+ -static_cast <int32_t >(writeInfoVec.size () - buffer->repliedBlocks ));
704+ while (buffer->repliedBlocks < writeInfoVec.size ()) {
705+ const auto &writeInfo = writeInfoVec[buffer->repliedBlocks ];
706+ updateUsingWriteStatusAndReply (SAUNAFS_STATUS_OK, writeInfo.writeId );
707+ buffer->repliedBlocks ++;
708+ }
709+ };
710+
711+ for (auto &buff : writeDataBuffers_) {
712+ if (buff->repliedBlocks == buff->currentBlocks ()) { continue ; }
713+
714+ if (getAvailableWriteBufferingBlocks () + (int32_t )buff->repliedBlocks <
715+ (int32_t )buff->currentBlocks ()) {
716+ return ;
717+ }
718+
719+ bufferLevelInstantReply (buff);
720+ }
721+
722+ if (inputBuffer_ == nullptr || inputBuffer_->isBeingUpdated () ||
723+ inputBuffer_->repliedBlocks == inputBuffer_->currentBlocks () ||
724+ getAvailableWriteBufferingBlocks () + (int32_t )inputBuffer_->repliedBlocks <
725+ (int32_t )inputBuffer_->currentBlocks ()) {
726+ return ;
727+ }
728+
729+ bufferLevelInstantReply (inputBuffer_);
573730}
574731
575732std::function<void (uint8_t status, void *packet)> jobCloseWriteCallback (uint64_t chunkId,
0 commit comments