@@ -56,11 +56,12 @@ type bufferedRegistration struct {
56
56
outputLoopCancelFn func ()
57
57
disconnected bool
58
58
59
- // catchUpIter is created by replcia under raftMu lock when registration is
60
- // created. It is detached by output loop for processing and closed.
61
- // If output loop was not started and catchUpIter is non-nil at the time
62
- // that disconnect is called, it is closed by disconnect.
63
- catchUpIter * CatchUpIterator
59
+ // catchUpSnap is created by a replica under the raftMu lock when a
60
+ // registration is created. It is detached by the output loop for
61
+ // processing and closed. If the output loop was not started and
62
+ // catchUpSnap is non-nil at the time that disconnect is called, it is
63
+ // closed by disconnect.
64
+ catchUpSnap * CatchUpSnapshot
64
65
}
65
66
66
67
// Number of events that have been written to the buffer but
@@ -74,7 +75,7 @@ func newBufferedRegistration(
74
75
streamCtx context.Context ,
75
76
span roachpb.Span ,
76
77
startTS hlc.Timestamp ,
77
- catchUpIter * CatchUpIterator ,
78
+ catchUpSnap * CatchUpSnapshot ,
78
79
withDiff bool ,
79
80
withFiltering bool ,
80
81
withOmitRemote bool ,
@@ -101,7 +102,7 @@ func newBufferedRegistration(
101
102
buf : make (chan * sharedEvent , bufferSz ),
102
103
blockWhenFull : blockWhenFull ,
103
104
}
104
- br .mu .catchUpIter = catchUpIter
105
+ br .mu .catchUpSnap = catchUpSnap
105
106
return br
106
107
}
107
108
@@ -166,9 +167,9 @@ func (br *bufferedRegistration) Disconnect(pErr *kvpb.Error) {
166
167
br .mu .Lock ()
167
168
defer br .mu .Unlock ()
168
169
if ! br .mu .disconnected {
169
- if br .mu .catchUpIter != nil {
170
- br .mu .catchUpIter .Close ()
171
- br .mu .catchUpIter = nil
170
+ if br .mu .catchUpSnap != nil {
171
+ br .mu .catchUpSnap .Close ()
172
+ br .mu .catchUpSnap = nil
172
173
}
173
174
if br .mu .outputLoopCancelFn != nil {
174
175
br .mu .outputLoopCancelFn ()
@@ -297,20 +298,19 @@ func (br *bufferedRegistration) drainAllocations(ctx context.Context) {
297
298
// This uses the iterator provided when the registration was originally created;
298
299
// after the scan completes, the iterator will be closed.
299
300
//
300
- // If the registration does not have a catchUpIteratorConstructor, this method
301
- // is a no-op.
301
+ // If the registration does not have a catchUpSnap, this method is a no-op.
302
302
func (br * bufferedRegistration ) maybeRunCatchUpScan (ctx context.Context ) error {
303
- catchUpIter := br .detachCatchUpIter ()
304
- if catchUpIter == nil {
303
+ catchUpSnap := br .detachCatchUpSnap ()
304
+ if catchUpSnap == nil {
305
305
return nil
306
306
}
307
307
start := crtime .NowMono ()
308
308
defer func () {
309
- catchUpIter .Close ()
309
+ catchUpSnap .Close ()
310
310
br .metrics .RangeFeedCatchUpScanNanos .Inc (start .Elapsed ().Nanoseconds ())
311
311
}()
312
312
313
- return catchUpIter .CatchUpScan (ctx , br .stream .SendUnbuffered , br .withDiff , br .withFiltering , br .withOmitRemote , br .bulkDelivery )
313
+ return catchUpSnap .CatchUpScan (ctx , br .stream .SendUnbuffered , br .withDiff , br .withFiltering , br .withOmitRemote , br .bulkDelivery )
314
314
}
315
315
316
316
// Wait for this registration to completely process its internal
@@ -335,13 +335,13 @@ func (br *bufferedRegistration) waitForCaughtUp(ctx context.Context) error {
335
335
return errors .Errorf ("bufferedRegistration %v failed to empty in time" , br .Range ())
336
336
}
337
337
338
- // detachCatchUpIter detaches the catchUpIter that was previously attached.
339
- func (br * bufferedRegistration ) detachCatchUpIter () * CatchUpIterator {
338
+ // detachCatchUpSnap detaches the catchUpSnap that was previously attached.
339
+ func (br * bufferedRegistration ) detachCatchUpSnap () * CatchUpSnapshot {
340
340
br .mu .Lock ()
341
341
defer br .mu .Unlock ()
342
- catchUpIter := br .mu .catchUpIter
343
- br .mu .catchUpIter = nil
344
- return catchUpIter
342
+ catchUpSnap := br .mu .catchUpSnap
343
+ br .mu .catchUpSnap = nil
344
+ return catchUpSnap
345
345
}
346
346
347
347
var overflowLogEvery = log .Every (5 * time .Second )
0 commit comments