Skip to content

Commit 5a4758c

Browse files
craig[bot]sumeerbhola
andcommitted
Merge #154412
154412: kvserver: use Pebble snapshot for catchup scans r=sumeerbhola a=sumeerbhola The snapshot is used to create an iterator, which is recreated based on the storage.snapshot.recreate_iter_duration cluster setting, which defaults to 20s. This is mostly plumbing changes, except for catchup_scan.go. Fixes #133851 Epic: none Release note (ops change): The cluster setting storage.snapshot.recreate_iter_duration (default 20s) controls how frequently a long-lived engine iterator, backed by an engine snapshot, will be closed and recreated. Currently, it is only used for iterators used in rangefeed catchup scans. Co-authored-by: sumeerbhola <[email protected]>
2 parents fdcfaed + 6e59022 commit 5a4758c

26 files changed

+389
-252
lines changed

pkg/backup/backupsink/file_sst_sink.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ func (s *FileSSTSink) copyPointKeys(ctx context.Context, dataSST []byte) (roachp
361361

362362
empty := true
363363
for iter.SeekGE(storage.MVCCKey{Key: keys.MinKey}); ; iter.Next() {
364-
if err := s.pacer.Pace(ctx); err != nil {
364+
if _, err := s.pacer.Pace(ctx); err != nil {
365365
return nil, err
366366
}
367367
if valid, err := iter.Valid(); !valid || err != nil {

pkg/backup/compaction_processor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ func compactSpanEntry(
422422
scratch = append(scratch, prefix...)
423423
iter := sstIter.iter
424424
for iter.SeekGE(trimmedStart); ; iter.NextKey() {
425-
if err := pacer.Pace(ctx); err != nil {
425+
if _, err := pacer.Pace(ctx); err != nil {
426426
return err
427427
}
428428
var key storage.MVCCKey

pkg/ccl/changefeedccl/batching_sink.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
476476
// TODO(yevgeniy): rework this function: this function should simply
477477
// return an error, and not rely on "handleError".
478478
// It's hard to reason about this functions correctness otherwise.
479-
_ = s.pacer.Pace(ctx)
479+
_, _ = s.pacer.Pace(ctx)
480480

481481
switch r := req.(type) {
482482
case *rowEvent:

pkg/ccl/changefeedccl/event_processing.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Even
343343
// Request CPU time to use for event consumption, block if this time is
344344
// unavailable. If there is unused CPU time left from the last call to
345345
// Pace, then use that time instead of blocking.
346-
if err := c.pacer.Pace(ctx); err != nil {
346+
if _, err := c.pacer.Pace(ctx); err != nil {
347347
return err
348348
}
349349

pkg/kv/bulk/sst_batcher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ func (b *SSTBatcher) AddMVCCKeyLDR(ctx context.Context, key storage.MVCCKey, val
445445
// Keys must be added in order.
446446
func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value []byte) error {
447447
// Pace based on admission control before adding the key.
448-
if err := b.pacer.Pace(ctx); err != nil {
448+
if _, err := b.pacer.Pace(ctx); err != nil {
449449
return err
450450
}
451451

pkg/kv/kvserver/rangefeed/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ go_library(
3939
"//pkg/storage",
4040
"//pkg/storage/enginepb",
4141
"//pkg/storage/fs",
42+
"//pkg/util",
4243
"//pkg/util/admission",
4344
"//pkg/util/bufalloc",
4445
"//pkg/util/buildutil",

pkg/kv/kvserver/rangefeed/buffered_registration.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,12 @@ type bufferedRegistration struct {
5656
outputLoopCancelFn func()
5757
disconnected bool
5858

59-
// catchUpIter is created by replcia under raftMu lock when registration is
60-
// created. It is detached by output loop for processing and closed.
61-
// If output loop was not started and catchUpIter is non-nil at the time
62-
// that disconnect is called, it is closed by disconnect.
63-
catchUpIter *CatchUpIterator
59+
// catchUpSnap is created by a replica under the raftMu lock when a
60+
// registration is created. It is detached by the output loop for
61+
// processing and closed. If the output loop was not started and
62+
// catchUpSnap is non-nil at the time that disconnect is called, it is
63+
// closed by disconnect.
64+
catchUpSnap *CatchUpSnapshot
6465
}
6566

6667
// Number of events that have been written to the buffer but
@@ -74,7 +75,7 @@ func newBufferedRegistration(
7475
streamCtx context.Context,
7576
span roachpb.Span,
7677
startTS hlc.Timestamp,
77-
catchUpIter *CatchUpIterator,
78+
catchUpSnap *CatchUpSnapshot,
7879
withDiff bool,
7980
withFiltering bool,
8081
withOmitRemote bool,
@@ -101,7 +102,7 @@ func newBufferedRegistration(
101102
buf: make(chan *sharedEvent, bufferSz),
102103
blockWhenFull: blockWhenFull,
103104
}
104-
br.mu.catchUpIter = catchUpIter
105+
br.mu.catchUpSnap = catchUpSnap
105106
return br
106107
}
107108

@@ -166,9 +167,9 @@ func (br *bufferedRegistration) Disconnect(pErr *kvpb.Error) {
166167
br.mu.Lock()
167168
defer br.mu.Unlock()
168169
if !br.mu.disconnected {
169-
if br.mu.catchUpIter != nil {
170-
br.mu.catchUpIter.Close()
171-
br.mu.catchUpIter = nil
170+
if br.mu.catchUpSnap != nil {
171+
br.mu.catchUpSnap.Close()
172+
br.mu.catchUpSnap = nil
172173
}
173174
if br.mu.outputLoopCancelFn != nil {
174175
br.mu.outputLoopCancelFn()
@@ -297,20 +298,19 @@ func (br *bufferedRegistration) drainAllocations(ctx context.Context) {
297298
// This uses the iterator provided when the registration was originally created;
298299
// after the scan completes, the iterator will be closed.
299300
//
300-
// If the registration does not have a catchUpIteratorConstructor, this method
301-
// is a no-op.
301+
// If the registration does not have a catchUpSnap, this method is a no-op.
302302
func (br *bufferedRegistration) maybeRunCatchUpScan(ctx context.Context) error {
303-
catchUpIter := br.detachCatchUpIter()
304-
if catchUpIter == nil {
303+
catchUpSnap := br.detachCatchUpSnap()
304+
if catchUpSnap == nil {
305305
return nil
306306
}
307307
start := crtime.NowMono()
308308
defer func() {
309-
catchUpIter.Close()
309+
catchUpSnap.Close()
310310
br.metrics.RangeFeedCatchUpScanNanos.Inc(start.Elapsed().Nanoseconds())
311311
}()
312312

313-
return catchUpIter.CatchUpScan(ctx, br.stream.SendUnbuffered, br.withDiff, br.withFiltering, br.withOmitRemote, br.bulkDelivery)
313+
return catchUpSnap.CatchUpScan(ctx, br.stream.SendUnbuffered, br.withDiff, br.withFiltering, br.withOmitRemote, br.bulkDelivery)
314314
}
315315

316316
// Wait for this registration to completely process its internal
@@ -335,13 +335,13 @@ func (br *bufferedRegistration) waitForCaughtUp(ctx context.Context) error {
335335
return errors.Errorf("bufferedRegistration %v failed to empty in time", br.Range())
336336
}
337337

338-
// detachCatchUpIter detaches the catchUpIter that was previously attached.
339-
func (br *bufferedRegistration) detachCatchUpIter() *CatchUpIterator {
338+
// detachCatchUpSnap detaches the catchUpSnap that was previously attached.
339+
func (br *bufferedRegistration) detachCatchUpSnap() *CatchUpSnapshot {
340340
br.mu.Lock()
341341
defer br.mu.Unlock()
342-
catchUpIter := br.mu.catchUpIter
343-
br.mu.catchUpIter = nil
344-
return catchUpIter
342+
catchUpSnap := br.mu.catchUpSnap
343+
br.mu.catchUpSnap = nil
344+
return catchUpSnap
345345
}
346346

347347
var overflowLogEvery = log.Every(5 * time.Second)

pkg/kv/kvserver/rangefeed/buffered_sender_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func TestBufferedSenderOnStreamShutdown(t *testing.T) {
258258
}
259259

260260
// Add our stream to the stream manager.
261-
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
261+
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpSnap */
262262
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
263263
sm.NewStream(streamID, 1 /*rangeID*/))
264264
require.True(t, registered)

0 commit comments

Comments
 (0)