@@ -56,12 +56,11 @@ type bufferedRegistration struct {
5656 outputLoopCancelFn func ()
5757 disconnected bool
5858
59- // catchUpSnap is created by a replica under the raftMu lock when a
60- // registration is created. It is detached by the output loop for
61- // processing and closed. If the output loop was not started and
62- // catchUpSnap is non-nil at the time that disconnect is called, it is
63- // closed by disconnect.
64- catchUpSnap * CatchUpSnapshot
59+ // catchUpIter is created by replcia under raftMu lock when registration is
60+ // created. It is detached by output loop for processing and closed.
61+ // If output loop was not started and catchUpIter is non-nil at the time
62+ // that disconnect is called, it is closed by disconnect.
63+ catchUpIter * CatchUpIterator
6564 }
6665
6766 // Number of events that have been written to the buffer but
@@ -75,7 +74,7 @@ func newBufferedRegistration(
7574 streamCtx context.Context ,
7675 span roachpb.Span ,
7776 startTS hlc.Timestamp ,
78- catchUpSnap * CatchUpSnapshot ,
77+ catchUpIter * CatchUpIterator ,
7978 withDiff bool ,
8079 withFiltering bool ,
8180 withOmitRemote bool ,
@@ -102,7 +101,7 @@ func newBufferedRegistration(
102101 buf : make (chan * sharedEvent , bufferSz ),
103102 blockWhenFull : blockWhenFull ,
104103 }
105- br .mu .catchUpSnap = catchUpSnap
104+ br .mu .catchUpIter = catchUpIter
106105 return br
107106}
108107
@@ -167,9 +166,9 @@ func (br *bufferedRegistration) Disconnect(pErr *kvpb.Error) {
167166 br .mu .Lock ()
168167 defer br .mu .Unlock ()
169168 if ! br .mu .disconnected {
170- if br .mu .catchUpSnap != nil {
171- br .mu .catchUpSnap .Close ()
172- br .mu .catchUpSnap = nil
169+ if br .mu .catchUpIter != nil {
170+ br .mu .catchUpIter .Close ()
171+ br .mu .catchUpIter = nil
173172 }
174173 if br .mu .outputLoopCancelFn != nil {
175174 br .mu .outputLoopCancelFn ()
@@ -298,19 +297,20 @@ func (br *bufferedRegistration) drainAllocations(ctx context.Context) {
298297// This uses the iterator provided when the registration was originally created;
299298// after the scan completes, the iterator will be closed.
300299//
301- // If the registration does not have a catchUpSnap, this method is a no-op.
300+ // If the registration does not have a catchUpIteratorConstructor, this method
301+ // is a no-op.
302302func (br * bufferedRegistration ) maybeRunCatchUpScan (ctx context.Context ) error {
303- catchUpSnap := br .detachCatchUpSnap ()
304- if catchUpSnap == nil {
303+ catchUpIter := br .detachCatchUpIter ()
304+ if catchUpIter == nil {
305305 return nil
306306 }
307307 start := crtime .NowMono ()
308308 defer func () {
309- catchUpSnap .Close ()
309+ catchUpIter .Close ()
310310 br .metrics .RangeFeedCatchUpScanNanos .Inc (start .Elapsed ().Nanoseconds ())
311311 }()
312312
313- return catchUpSnap .CatchUpScan (ctx , br .stream .SendUnbuffered , br .withDiff , br .withFiltering , br .withOmitRemote , br .bulkDelivery )
313+ return catchUpIter .CatchUpScan (ctx , br .stream .SendUnbuffered , br .withDiff , br .withFiltering , br .withOmitRemote , br .bulkDelivery )
314314}
315315
316316// Wait for this registration to completely process its internal
@@ -335,13 +335,13 @@ func (br *bufferedRegistration) waitForCaughtUp(ctx context.Context) error {
335335 return errors .Errorf ("bufferedRegistration %v failed to empty in time" , br .Range ())
336336}
337337
338- // detachCatchUpSnap detaches the catchUpSnap that was previously attached.
339- func (br * bufferedRegistration ) detachCatchUpSnap () * CatchUpSnapshot {
338+ // detachCatchUpIter detaches the catchUpIter that was previously attached.
339+ func (br * bufferedRegistration ) detachCatchUpIter () * CatchUpIterator {
340340 br .mu .Lock ()
341341 defer br .mu .Unlock ()
342- catchUpSnap := br .mu .catchUpSnap
343- br .mu .catchUpSnap = nil
344- return catchUpSnap
342+ catchUpIter := br .mu .catchUpIter
343+ br .mu .catchUpIter = nil
344+ return catchUpIter
345345}
346346
347347var overflowLogEvery = log .Every (5 * time .Second )
0 commit comments