@@ -2682,7 +2682,7 @@ func (rs *replicaState) scheduledRaftMuLocked(
26822682 // knowledge will become known in the next
26832683 // rangeController.HandleRaftEventRaftMuLocked, which will happen at the
26842684 // next tick. We accept a latency hiccup in this case for now.
2685- rss .mu . sendQueue . forceFlushStopIndex = 0
2685+ rss .setForceFlushStopIndexRaftMuAndStreamLocked ( 0 )
26862686 }
26872687 forceFlushNeedsToPause := forceFlushActiveAndPaused ()
26882688 watchForTokens :=
@@ -2768,7 +2768,7 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked(
27682768 rss .startForceFlushRaftMuAndStreamLocked (ctx , directive .forceFlushStopIndex )
27692769 } else {
27702770 if rss .mu .sendQueue .forceFlushStopIndex != directive .forceFlushStopIndex {
2771- rss .mu . sendQueue . forceFlushStopIndex = directive .forceFlushStopIndex
2771+ rss .setForceFlushStopIndexRaftMuAndStreamLocked ( directive .forceFlushStopIndex )
27722772 }
27732773 if wasExceedingInflightBytesThreshold &&
27742774 ! rss .reachedInflightBytesThresholdRaftMuAndStreamLocked () {
@@ -2780,8 +2780,7 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked(
27802780 if rss .mu .sendQueue .forceFlushStopIndex .active () {
27812781 // Must have a send-queue, so sendingEntries should stay empty (these
27822782 // will be queued).
2783- rss .mu .sendQueue .forceFlushStopIndex = 0
2784- rss .parent .parent .opts .RangeControllerMetrics .SendQueue .ForceFlushedScheduledCount .Dec (1 )
2783+ rss .setForceFlushStopIndexRaftMuAndStreamLocked (0 )
27852784 rss .startAttemptingToEmptySendQueueViaWatcherStreamLocked (ctx )
27862785 if directive .hasSendTokens {
27872786 panic (errors .AssertionFailedf ("hasSendTokens true despite send-queue" ))
@@ -2998,8 +2997,7 @@ func (rss *replicaSendStream) startForceFlushRaftMuAndStreamLocked(
29982997) {
29992998 rss .parent .parent .opts .ReplicaMutexAsserter .RaftMuAssertHeld ()
30002999 rss .mu .AssertHeld ()
3001- rss .parent .parent .opts .RangeControllerMetrics .SendQueue .ForceFlushedScheduledCount .Inc (1 )
3002- rss .mu .sendQueue .forceFlushStopIndex = forceFlushStopIndex
3000+ rss .setForceFlushStopIndexRaftMuAndStreamLocked (forceFlushStopIndex )
30033001 if ! rss .reachedInflightBytesThresholdRaftMuAndStreamLocked () {
30043002 rss .parent .parent .scheduleReplica (rss .parent .replicaID )
30053003 }
@@ -3136,8 +3134,7 @@ func (rss *replicaSendStream) stopAttemptingToEmptySendQueueRaftMuAndStreamLocke
31363134 rss .parent .parent .opts .ReplicaMutexAsserter .RaftMuAssertHeld ()
31373135 rss .mu .AssertHeld ()
31383136 if rss .mu .sendQueue .forceFlushStopIndex .active () {
3139- rss .mu .sendQueue .forceFlushStopIndex = 0
3140- rss .parent .parent .opts .RangeControllerMetrics .SendQueue .ForceFlushedScheduledCount .Dec (1 )
3137+ rss .setForceFlushStopIndexRaftMuAndStreamLocked (0 )
31413138 }
31423139 rss .stopAttemptingToEmptySendQueueViaWatcherRaftMuAndStreamLocked (ctx , disconnect )
31433140}
@@ -3167,6 +3164,21 @@ func (rss *replicaSendStream) stopAttemptingToEmptySendQueueViaWatcherRaftMuAndS
31673164 }
31683165}
31693166
3167+ func (rss * replicaSendStream ) setForceFlushStopIndexRaftMuAndStreamLocked (
3168+ index forceFlushStopIndex ,
3169+ ) {
3170+ rss .parent .parent .opts .ReplicaMutexAsserter .RaftMuAssertHeld ()
3171+ rss .mu .AssertHeld ()
3172+ nextIsActive := index .active ()
3173+ prevIsActive := rss .mu .sendQueue .forceFlushStopIndex .active ()
3174+ if ! prevIsActive && nextIsActive {
3175+ rss .parent .parent .opts .RangeControllerMetrics .SendQueue .ForceFlushedScheduledCount .Inc (1 )
3176+ } else if prevIsActive && ! nextIsActive {
3177+ rss .parent .parent .opts .RangeControllerMetrics .SendQueue .ForceFlushedScheduledCount .Dec (1 )
3178+ }
3179+ rss .mu .sendQueue .forceFlushStopIndex = index
3180+ }
3181+
31703182// Requires that send-queue is non-empty. Note that it is possible that all
31713183// the entries in the send-queue are not subject to replication admission
31723184// control, and we will still wait for non-zero tokens. This is considered
0 commit comments