@@ -116,10 +116,6 @@ struct TWriteBackCache::TNodeState
116116 // Efficient calculation of TWriteDataEntryParts from CachedEntries
117117 TWriteDataEntryIntervalMap CachedEntryIntervalMap;
118118
119- // Count entries in TWriteBackCache::TImpl::PendingEntries
120- // with status Pending filtered by nodeId
121- size_t PendingEntriesCount = 0 ;
122-
123119 // Prevent from concurrent read and write requests with overlapping ranges
124120 TReadWriteRangeLock RangeLock;
125121
@@ -129,10 +125,10 @@ struct TWriteBackCache::TNodeState
129125 : NodeId(nodeId)
130126 {}
131127
132- bool Empty () const
128+ bool CanBeDeleted () const
133129 {
134- return CachedEntries.empty () && PendingEntriesCount == 0 &&
135- RangeLock. Empty () && !FlushState.Executing ;
130+ return CachedEntries.empty () && RangeLock. Empty () &&
131+ !FlushState.Executing ;
136132 }
137133
138134 bool ShouldFlush () const
@@ -153,16 +149,14 @@ struct TWriteBackCache::TPendingOperations
153149 bool Executing = false ;
154150
155151 // The flag is set when an element is popped from
156- // |TImpl::CachedEntriesPersistentQueue| and free space is increased.
152+ // |TImpl::CachedEntriesPersistentQueue| and free space is increased or
153+ // an element is pushed to empty |TImpl::CachedEntriesPersistentQueue|.
157154 // We should try to push pending entries to the persistent queue.
158155 bool ShouldProcessPendingEntries = false ;
159156
160157 // Pending ReadData requests that have acquired |TNodeState::RangeLock|
161158 TVector<TPendingReadDataRequest> ReadData;
162159
163- // Pending WriteData requests that have acquired |TNodeState::RangeLock|
164- TVector<std::unique_ptr<TWriteDataEntry>> WriteData;
165-
166160 // Flush operations that have been scheduled but not yet started
167161 TVector<TNodeState*> Flush;
168162
@@ -174,7 +168,7 @@ struct TWriteBackCache::TPendingOperations
174168
175169 bool Empty () const
176170 {
177- return ReadData.empty () && WriteData. empty () && Flush.empty () &&
171+ return ReadData.empty () && Flush.empty () &&
178172 WriteDataCompleted.empty () && FlushCompleted.empty ();
179173 }
180174};
@@ -456,7 +450,7 @@ class TWriteBackCache::TImpl final
456450 auto guard = Guard (self->Lock );
457451 auto * nodeState = self->GetNodeState (nodeId);
458452 nodeState->RangeLock .UnlockRead (offset, end);
459- self->DeleteNodeStateIfEmpty (nodeState);
453+ self->DeleteNodeStateIfNeeded (nodeState);
460454 self->ExecutePendingOperations (guard);
461455 }
462456 };
@@ -529,7 +523,7 @@ class TWriteBackCache::TImpl final
529523 auto guard = Guard (self->Lock );
530524 auto * nodeState = self->GetNodeState (nodeId);
531525 nodeState->RangeLock .UnlockWrite (offset, end);
532- self->DeleteNodeStateIfEmpty (nodeState);
526+ self->DeleteNodeStateIfNeeded (nodeState);
533527 self->ExecutePendingOperations (guard);
534528 }
535529 };
@@ -548,7 +542,10 @@ class TWriteBackCache::TImpl final
548542 // TImpl is alive
549543 Y_ABORT_UNLESS (self);
550544 if (self) {
551- self->PendingOperations .WriteData .push_back (
545+ if (self->PendingEntries .empty ()) {
546+ self->PendingOperations .ShouldProcessPendingEntries = true ;
547+ }
548+ self->PendingEntries .push_back (
552549 std::unique_ptr<TWriteDataEntry>(entry));
553550 }
554551 };
@@ -718,9 +715,9 @@ class TWriteBackCache::TImpl final
718715 return ptr.get ();
719716 }
720717
721- void DeleteNodeStateIfEmpty (TNodeState* nodeState)
718+ void DeleteNodeStateIfNeeded (TNodeState* nodeState)
722719 {
723- if (nodeState != nullptr && nodeState->Empty ()) {
720+ if (nodeState != nullptr && nodeState->CanBeDeleted ()) {
724721 auto erased = NodeStates.erase (nodeState->NodeId );
725722 Y_DEBUG_ABORT_UNLESS (erased);
726723 Stats->DecrementNodeCount ();
@@ -751,13 +748,11 @@ class TWriteBackCache::TImpl final
751748 }
752749
753750 TVector<TPendingReadDataRequest> readData;
754- TVector<std::unique_ptr<TWriteDataEntry>> writeData;
755751 TVector<TNodeState*> flush;
756752 TVector<TPromise<NProto::TWriteDataResponse>> writeDataCompleted;
757753 TVector<TPromise<void >> flushCompleted;
758754
759755 swap (readData, PendingOperations.ReadData );
760- swap (writeData, PendingOperations.WriteData );
761756 swap (flush, PendingOperations.Flush );
762757 swap (writeDataCompleted, PendingOperations.WriteDataCompleted );
763758 swap (flushCompleted, PendingOperations.FlushCompleted );
@@ -770,10 +765,6 @@ class TWriteBackCache::TImpl final
770765 StartPendingReadDataRequest (std::move (request));
771766 }
772767
773- for (auto & entry: writeData) {
774- StartPendingWriteDataRequest (std::move (entry));
775- }
776-
777768 for (auto * nodeState: flush) {
778769 StartFlush (nodeState);
779770 }
@@ -817,8 +808,6 @@ class TWriteBackCache::TImpl final
817808 auto * nodeState = GetNodeState (entry->GetNodeId ());
818809 AddCachedEntry (nodeState, std::move (entry));
819810
820- Y_ABORT_UNLESS (nodeState->PendingEntriesCount > 0 );
821- nodeState->PendingEntriesCount --;
822811 PendingEntries.pop_front ();
823812 }
824813
@@ -1024,44 +1013,6 @@ class TWriteBackCache::TImpl final
10241013 state.Promise .SetValue (std::move (response));
10251014 }
10261015
1027- void StartPendingWriteDataRequest (std::unique_ptr<TWriteDataEntry> entry)
1028- {
1029- auto serializedSize = entry->GetSerializedSize ();
1030- auto guard = Guard (Lock);
1031-
1032- auto * nodeState = GetNodeState (entry->GetNodeId ());
1033-
1034- if (nodeState->PendingEntriesCount > 0 )
1035- {
1036- nodeState->PendingEntriesCount ++;
1037- PendingEntries.push_back (std::move (entry));
1038- return ;
1039- }
1040-
1041- char * allocationPtr = nullptr ;
1042- bool allocated = CachedEntriesPersistentQueue.AllocateBack (
1043- serializedSize,
1044- &allocationPtr);
1045-
1046- if (allocated) {
1047- Y_ABORT_UNLESS (allocationPtr != nullptr );
1048-
1049- entry->SerializeAndMoveRequestBuffer (
1050- allocationPtr,
1051- PendingOperations,
1052- this );
1053-
1054- CachedEntriesPersistentQueue.CommitAllocation (allocationPtr);
1055- AddCachedEntry (nodeState, std::move (entry));
1056- } else {
1057- nodeState->PendingEntriesCount ++;
1058- PendingEntries.push_back (std::move (entry));
1059- RequestFlushAll ();
1060- }
1061-
1062- UpdatePersistentQueueStats ();
1063- }
1064-
10651016 // |nodeState| becomes unusable if the function returns false
10661017 void PrepareFlush (TNodeState* nodeState)
10671018 {
@@ -1218,7 +1169,7 @@ class TWriteBackCache::TImpl final
12181169 PendingOperations.Flush .push_back (nodeState);
12191170 } else {
12201171 nodeState->FlushState .Executing = false ;
1221- DeleteNodeStateIfEmpty (nodeState);
1172+ DeleteNodeStateIfNeeded (nodeState);
12221173 }
12231174
12241175 ExecutePendingOperations (guard);
0 commit comments