@@ -44,6 +44,7 @@ type rangefeedMuxer struct {
4444 g ctxgroup.Group
4545
4646 ds * DistSender
47+ metrics * DistSenderRangeFeedMetrics
4748 cfg rangeFeedConfig
4849 registry * rangeFeedRegistry
4950 catchupSem * limit.ConcurrentRequestLimiter
@@ -84,9 +85,13 @@ func muxRangeFeed(
8485 registry : rr ,
8586 ds : ds ,
8687 cfg : cfg ,
88+ metrics : & ds .metrics .DistSenderRangeFeedMetrics ,
8789 catchupSem : catchupSem ,
8890 eventCh : eventCh ,
8991 }
92+ if cfg .knobs .metrics != nil {
93+ m .metrics = cfg .knobs .metrics
94+ }
9095
9196 divideAllSpansOnRangeBoundaries (spans , m .startSingleRangeFeed , ds , & m .g )
9297 return errors .CombineErrors (m .g .Wait (), ctx .Err ())
@@ -158,7 +163,7 @@ type activeMuxRangeFeed struct {
158163 roachpb.ReplicaDescriptor
159164 startAfter hlc.Timestamp
160165
161- // cathchupRes is the catchup scan quota acquired upon the
166+ // catchupRes is the catchup scan quota acquired upon the
162167 // start of rangefeed.
163168 // It is released when this stream receives first non-empty checkpoint
164169 // (meaning: catchup scan completes).
@@ -211,7 +216,7 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
211216
212217 // Register active mux range feed.
213218 stream := & activeMuxRangeFeed {
214- activeRangeFeed : newActiveRangeFeed (span , startAfter , m .registry , m .ds . metrics .RangefeedRanges ),
219+ activeRangeFeed : newActiveRangeFeed (span , startAfter , m .registry , m .metrics .RangefeedRanges ),
215220 rSpan : rs ,
216221 startAfter : startAfter ,
217222 token : token ,
@@ -241,7 +246,7 @@ func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error
241246
242247 {
243248 // Before starting single rangefeed, acquire catchup scan quota.
244- catchupRes , err := acquireCatchupScanQuota (ctx , m .ds , m .catchupSem )
249+ catchupRes , err := acquireCatchupScanQuota (ctx , & m .ds . st . SV , m .catchupSem , m . metrics )
245250 if err != nil {
246251 return err
247252 }
@@ -387,13 +392,19 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed(
387392 recvErr = nil
388393 }
389394
395+ toRestart := ms .close ()
396+
390397 // make sure that the underlying error is not fatal. If it is, there is no
391398 // reason to restart each rangefeed, so just bail out.
392399 if _ , err := handleRangefeedError (ctx , recvErr ); err != nil {
400+ // Regardless of an error, release any resources (i.e. metrics) still
401+ // being held by active stream.
402+ for _ , s := range toRestart {
403+ s .release ()
404+ }
393405 return err
394406 }
395407
396- toRestart := ms .close ()
397408 if log .V (1 ) {
398409 log .Infof (ctx , "mux to node %d restarted %d streams" , ms .nodeID , len (toRestart ))
399410 }
@@ -429,8 +440,14 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
429440 continue
430441 }
431442
432- if m .cfg .knobs .onMuxRangefeedEvent != nil {
433- m .cfg .knobs .onMuxRangefeedEvent (event )
443+ if m .cfg .knobs .onRangefeedEvent != nil {
444+ skip , err := m .cfg .knobs .onRangefeedEvent (ctx , active .Span , & event .RangeFeedEvent )
445+ if err != nil {
446+ return err
447+ }
448+ if skip {
449+ continue
450+ }
434451 }
435452
436453 switch t := event .GetValue ().(type ) {
@@ -451,7 +468,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
451468 case * kvpb.RangeFeedError :
452469 log .VErrEventf (ctx , 2 , "RangeFeedError: %s" , t .Error .GoError ())
453470 if active .catchupRes != nil {
454- m .ds . metrics .RangefeedErrorCatchup .Inc (1 )
471+ m .metrics .RangefeedErrorCatchup .Inc (1 )
455472 }
456473 ms .deleteStream (event .StreamID )
457474 // Restart rangefeed on another goroutine. Restart might be a bit
@@ -473,7 +490,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
473490 }
474491}
475492
476- // restarActiveRangeFeeds restarts one or more rangefeeds.
493+ // restartActiveRangeFeeds restarts one or more rangefeeds.
477494func (m * rangefeedMuxer ) restartActiveRangeFeeds (
478495 ctx context.Context , reason error , toRestart []* activeMuxRangeFeed ,
479496) error {
@@ -489,13 +506,7 @@ func (m *rangefeedMuxer) restartActiveRangeFeeds(
489506func (m * rangefeedMuxer ) restartActiveRangeFeed (
490507 ctx context.Context , active * activeMuxRangeFeed , reason error ,
491508) error {
492- m .ds .metrics .RangefeedRestartRanges .Inc (1 )
493-
494- if log .V (1 ) {
495- log .Infof (ctx , "RangeFeed %s@%s (r%d, replica %s) disconnected with last checkpoint %s ago: %v" ,
496- active .Span , active .StartAfter , active .RangeID , active .ReplicaDescriptor ,
497- timeutil .Since (active .Resolved .GoTime ()), reason )
498- }
509+ m .metrics .RangefeedRestartRanges .Inc (1 )
499510 active .setLastError (reason )
500511
501512 // Release catchup scan reservation if any -- we will acquire another
@@ -518,6 +529,12 @@ func (m *rangefeedMuxer) restartActiveRangeFeed(
518529 return err
519530 }
520531
532+ if log .V (1 ) {
533+ log .Infof (ctx , "RangeFeed %s@%s (r%d, replica %s) disconnected with last checkpoint %s ago: %v (errInfo %v)" ,
534+ active .Span , active .StartAfter , active .RangeID , active .ReplicaDescriptor ,
535+ timeutil .Since (active .Resolved .GoTime ()), reason , errInfo )
536+ }
537+
521538 if errInfo .evict {
522539 active .resetRouting (ctx , rangecache.EvictionToken {})
523540 }
@@ -587,13 +604,3 @@ func (c *muxStream) close() []*activeMuxRangeFeed {
587604
588605 return toRestart
589606}
590-
591- // a test only option to modify mux rangefeed event.
592- func withOnMuxEvent (fn func (event * kvpb.MuxRangeFeedEvent )) RangeFeedOption {
593- return optionFunc (func (c * rangeFeedConfig ) {
594- c .knobs .onMuxRangefeedEvent = fn
595- })
596- }
597-
598- // TestingWithOnMuxEvent allow external tests access to the withOnMuxEvent option.
599- var TestingWithOnMuxEvent = withOnMuxEvent
0 commit comments