Skip to content

Commit 264bacc

Browse files
committed
kvserver: use Pebble snapshot for catchup scans
The snapshot is used to create an iterator, which is recreated based on the storage.snapshot.recreate_iter_duration cluster setting, which defaults to 20s. This is mostly plumbing changes, except for catchup_scan.go. Fixes #133851 Epic: none Release note (ops change): The cluster setting storage.snapshot.recreate_iter_duration (default 20s) controls how frequently a long-lived engine iterator, backed by an engine snapshot, will be closed and recreated. Currently, it is only used for iterators used in rangefeed catchup scans.
1 parent 00ea203 commit 264bacc

27 files changed

+397
-249
lines changed

pkg/backup/backupsink/file_sst_sink.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ func (s *FileSSTSink) copyPointKeys(ctx context.Context, dataSST []byte) (roachp
361361

362362
empty := true
363363
for iter.SeekGE(storage.MVCCKey{Key: keys.MinKey}); ; iter.Next() {
364-
if err := s.pacer.Pace(ctx); err != nil {
364+
if _, err := s.pacer.Pace(ctx); err != nil {
365365
return nil, err
366366
}
367367
if valid, err := iter.Valid(); !valid || err != nil {

pkg/backup/compaction_processor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ func compactSpanEntry(
422422
scratch = append(scratch, prefix...)
423423
iter := sstIter.iter
424424
for iter.SeekGE(trimmedStart); ; iter.NextKey() {
425-
if err := pacer.Pace(ctx); err != nil {
425+
if _, err := pacer.Pace(ctx); err != nil {
426426
return err
427427
}
428428
var key storage.MVCCKey

pkg/ccl/changefeedccl/batching_sink.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
476476
// TODO(yevgeniy): rework this function: this function should simply
477477
// return an error, and not rely on "handleError".
478478
// It's hard to reason about this functions correctness otherwise.
479-
_ = s.pacer.Pace(ctx)
479+
_, _ = s.pacer.Pace(ctx)
480480

481481
switch r := req.(type) {
482482
case *rowEvent:

pkg/ccl/changefeedccl/event_processing.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Even
343343
// Request CPU time to use for event consumption, block if this time is
344344
// unavailable. If there is unused CPU time left from the last call to
345345
// Pace, then use that time instead of blocking.
346-
if err := c.pacer.Pace(ctx); err != nil {
346+
if _, err := c.pacer.Pace(ctx); err != nil {
347347
return err
348348
}
349349

pkg/kv/bulk/sst_batcher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ func (b *SSTBatcher) AddMVCCKeyLDR(ctx context.Context, key storage.MVCCKey, val
444444
// Keys must be added in order.
445445
func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value []byte) error {
446446
// Pace based on admission control before adding the key.
447-
if err := b.pacer.Pace(ctx); err != nil {
447+
if _, err := b.pacer.Pace(ctx); err != nil {
448448
return err
449449
}
450450

pkg/kv/kvclient/rangefeed/rangefeed_external_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -841,6 +841,11 @@ func TestWithOnDeleteRange(t *testing.T) {
841841
// should be visible, because catchup scans do emit tombstones. The range
842842
// tombstone should be ordered after the initial point, but before the foo
843843
// catchup point, and the previous values should respect the range tombstones.
844+
//
845+
// NB: When RaceEnabled=true, the range key will be emitted multiple
846+
// times, since CatchUpSnapshot.CatchUpScan recreates the iterator at
847+
// every step. These duplications are harmless and are de-duped by
848+
// testEvents when printing.
844849
require.NoError(t, db.Put(ctx, makeKey(srv.Codec(), "covered"), "covered"))
845850
require.NoError(t, db.Put(ctx, makeKey(srv.Codec(), "foo"), "covered"))
846851
require.NoError(t, db.DelRangeUsingTombstone(ctx, makeKey(srv.Codec(), "a"), makeKey(srv.Codec(), "z")))

pkg/kv/kvserver/rangefeed/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ go_library(
4040
"//pkg/storage",
4141
"//pkg/storage/enginepb",
4242
"//pkg/storage/fs",
43+
"//pkg/util",
4344
"//pkg/util/admission",
4445
"//pkg/util/bufalloc",
4546
"//pkg/util/buildutil",

pkg/kv/kvserver/rangefeed/buffered_registration.go

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,11 @@ type bufferedRegistration struct {
5656
outputLoopCancelFn func()
5757
disconnected bool
5858

59-
// catchUpIter is created by replica under raftMu lock when registration is
59+
// catchUpSnap is created by replica under raftMu lock when registration is
6060
// created. It is detached by output loop for processing and closed. If
61-
// output loop was not started and catchUpIter is non-nil at the time that
61+
// output loop was not started and catchUpSnap is non-nil at the time that
6262
// disconnect is called, it is closed by disconnect.
63-
catchUpIter *CatchUpIterator
63+
catchUpSnap *CatchUpSnapshot
6464
}
6565

6666
// Number of events that have been written to the buffer but
@@ -74,7 +74,7 @@ func newBufferedRegistration(
7474
streamCtx context.Context,
7575
span roachpb.Span,
7676
startTS hlc.Timestamp,
77-
catchUpIter *CatchUpIterator,
77+
catchUpSnap *CatchUpSnapshot,
7878
withDiff bool,
7979
withFiltering bool,
8080
withOmitRemote bool,
@@ -100,7 +100,7 @@ func newBufferedRegistration(
100100
buf: make(chan *sharedEvent, bufferSz),
101101
blockWhenFull: blockWhenFull,
102102
}
103-
br.mu.catchUpIter = catchUpIter
103+
br.mu.catchUpSnap = catchUpSnap
104104
return br
105105
}
106106

@@ -172,9 +172,9 @@ func (br *bufferedRegistration) Disconnect(pErr *kvpb.Error) {
172172
br.mu.Lock()
173173
defer br.mu.Unlock()
174174
if !br.mu.disconnected {
175-
if br.mu.catchUpIter != nil {
176-
br.mu.catchUpIter.Close()
177-
br.mu.catchUpIter = nil
175+
if br.mu.catchUpSnap != nil {
176+
br.mu.catchUpSnap.Close()
177+
br.mu.catchUpSnap = nil
178178
}
179179
if br.mu.outputLoopCancelFn != nil {
180180
br.mu.outputLoopCancelFn()
@@ -303,20 +303,19 @@ func (br *bufferedRegistration) drainAllocations(ctx context.Context) {
303303
// This uses the iterator provided when the registration was originally created;
304304
// after the scan completes, the iterator will be closed.
305305
//
306-
// If the registration does not have a catchUpIteratorConstructor, this method
307-
// is a no-op.
306+
// If the registration does not have a catchUpSnap, this method is a no-op.
308307
func (br *bufferedRegistration) maybeRunCatchUpScan(ctx context.Context) error {
309-
catchUpIter := br.detachCatchUpIter()
310-
if catchUpIter == nil {
308+
catchUpSnap := br.detachCatchUpSnap()
309+
if catchUpSnap == nil {
311310
return nil
312311
}
313312
start := crtime.NowMono()
314313
defer func() {
315-
catchUpIter.Close()
314+
catchUpSnap.Close()
316315
br.metrics.RangeFeedCatchUpScanNanos.Inc(start.Elapsed().Nanoseconds())
317316
}()
318317

319-
return catchUpIter.CatchUpScan(ctx, br.stream.SendUnbuffered, br.withDiff, br.withFiltering, br.withOmitRemote, br.bulkDelivery)
318+
return catchUpSnap.CatchUpScan(ctx, br.stream.SendUnbuffered, br.withDiff, br.withFiltering, br.withOmitRemote, br.bulkDelivery)
320319
}
321320

322321
// Wait for this registration to completely process its internal
@@ -341,13 +340,13 @@ func (br *bufferedRegistration) waitForCaughtUp(ctx context.Context) error {
341340
return errors.Errorf("bufferedRegistration %v failed to empty in time", br.Range())
342341
}
343342

344-
// detachCatchUpIter detaches the catchUpIter that was previously attached.
345-
func (br *bufferedRegistration) detachCatchUpIter() *CatchUpIterator {
343+
// detachCatchUpSnap detaches the catchUpSnap that was previously attached.
344+
func (br *bufferedRegistration) detachCatchUpSnap() *CatchUpSnapshot {
346345
br.mu.Lock()
347346
defer br.mu.Unlock()
348-
catchUpIter := br.mu.catchUpIter
349-
br.mu.catchUpIter = nil
350-
return catchUpIter
347+
catchUpSnap := br.mu.catchUpSnap
348+
br.mu.catchUpSnap = nil
349+
return catchUpSnap
351350
}
352351

353352
var overflowLogEvery = log.Every(5 * time.Second)

pkg/kv/kvserver/rangefeed/buffered_sender_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ func TestBufferedSenderOnOverflowMultiStream(t *testing.T) {
244244

245245
// Add our stream to the stream manager.
246246
sm.RegisteringStream(streamID1)
247-
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
247+
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpSnap */
248248
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
249249
sm.NewStream(streamID1, 1 /*rangeID*/))
250250
require.True(t, registered)

0 commit comments

Comments
 (0)