Skip to content

Commit 669be1a

Browse files
committed
kvserver: Revert "kvserver: use Pebble snapshot for catchup scans"
This reverts commit 6e59022. Due to test failures #154566, #154575. Epic: none Release note: None
1 parent 325aac0 commit 669be1a

26 files changed

+252
-389
lines changed

pkg/backup/backupsink/file_sst_sink.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ func (s *FileSSTSink) copyPointKeys(ctx context.Context, dataSST []byte) (roachp
361361

362362
empty := true
363363
for iter.SeekGE(storage.MVCCKey{Key: keys.MinKey}); ; iter.Next() {
364-
if _, err := s.pacer.Pace(ctx); err != nil {
364+
if err := s.pacer.Pace(ctx); err != nil {
365365
return nil, err
366366
}
367367
if valid, err := iter.Valid(); !valid || err != nil {

pkg/backup/compaction_processor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ func compactSpanEntry(
422422
scratch = append(scratch, prefix...)
423423
iter := sstIter.iter
424424
for iter.SeekGE(trimmedStart); ; iter.NextKey() {
425-
if _, err := pacer.Pace(ctx); err != nil {
425+
if err := pacer.Pace(ctx); err != nil {
426426
return err
427427
}
428428
var key storage.MVCCKey

pkg/ccl/changefeedccl/batching_sink.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
476476
// TODO(yevgeniy): rework this function: this function should simply
477477
// return an error, and not rely on "handleError".
478478
// It's hard to reason about this functions correctness otherwise.
479-
_, _ = s.pacer.Pace(ctx)
479+
_ = s.pacer.Pace(ctx)
480480

481481
switch r := req.(type) {
482482
case *rowEvent:

pkg/ccl/changefeedccl/event_processing.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Even
343343
// Request CPU time to use for event consumption, block if this time is
344344
// unavailable. If there is unused CPU time left from the last call to
345345
// Pace, then use that time instead of blocking.
346-
if _, err := c.pacer.Pace(ctx); err != nil {
346+
if err := c.pacer.Pace(ctx); err != nil {
347347
return err
348348
}
349349

pkg/kv/bulk/sst_batcher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ func (b *SSTBatcher) AddMVCCKeyLDR(ctx context.Context, key storage.MVCCKey, val
445445
// Keys must be added in order.
446446
func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value []byte) error {
447447
// Pace based on admission control before adding the key.
448-
if _, err := b.pacer.Pace(ctx); err != nil {
448+
if err := b.pacer.Pace(ctx); err != nil {
449449
return err
450450
}
451451

pkg/kv/kvserver/rangefeed/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ go_library(
3939
"//pkg/storage",
4040
"//pkg/storage/enginepb",
4141
"//pkg/storage/fs",
42-
"//pkg/util",
4342
"//pkg/util/admission",
4443
"//pkg/util/bufalloc",
4544
"//pkg/util/buildutil",

pkg/kv/kvserver/rangefeed/buffered_registration.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,11 @@ type bufferedRegistration struct {
5656
outputLoopCancelFn func()
5757
disconnected bool
5858

59-
// catchUpSnap is created by a replica under the raftMu lock when a
60-
// registration is created. It is detached by the output loop for
61-
// processing and closed. If the output loop was not started and
62-
// catchUpSnap is non-nil at the time that disconnect is called, it is
63-
// closed by disconnect.
64-
catchUpSnap *CatchUpSnapshot
59+
// catchUpIter is created by replcia under raftMu lock when registration is
60+
// created. It is detached by output loop for processing and closed.
61+
// If output loop was not started and catchUpIter is non-nil at the time
62+
// that disconnect is called, it is closed by disconnect.
63+
catchUpIter *CatchUpIterator
6564
}
6665

6766
// Number of events that have been written to the buffer but
@@ -75,7 +74,7 @@ func newBufferedRegistration(
7574
streamCtx context.Context,
7675
span roachpb.Span,
7776
startTS hlc.Timestamp,
78-
catchUpSnap *CatchUpSnapshot,
77+
catchUpIter *CatchUpIterator,
7978
withDiff bool,
8079
withFiltering bool,
8180
withOmitRemote bool,
@@ -102,7 +101,7 @@ func newBufferedRegistration(
102101
buf: make(chan *sharedEvent, bufferSz),
103102
blockWhenFull: blockWhenFull,
104103
}
105-
br.mu.catchUpSnap = catchUpSnap
104+
br.mu.catchUpIter = catchUpIter
106105
return br
107106
}
108107

@@ -167,9 +166,9 @@ func (br *bufferedRegistration) Disconnect(pErr *kvpb.Error) {
167166
br.mu.Lock()
168167
defer br.mu.Unlock()
169168
if !br.mu.disconnected {
170-
if br.mu.catchUpSnap != nil {
171-
br.mu.catchUpSnap.Close()
172-
br.mu.catchUpSnap = nil
169+
if br.mu.catchUpIter != nil {
170+
br.mu.catchUpIter.Close()
171+
br.mu.catchUpIter = nil
173172
}
174173
if br.mu.outputLoopCancelFn != nil {
175174
br.mu.outputLoopCancelFn()
@@ -298,19 +297,20 @@ func (br *bufferedRegistration) drainAllocations(ctx context.Context) {
298297
// This uses the iterator provided when the registration was originally created;
299298
// after the scan completes, the iterator will be closed.
300299
//
301-
// If the registration does not have a catchUpSnap, this method is a no-op.
300+
// If the registration does not have a catchUpIteratorConstructor, this method
301+
// is a no-op.
302302
func (br *bufferedRegistration) maybeRunCatchUpScan(ctx context.Context) error {
303-
catchUpSnap := br.detachCatchUpSnap()
304-
if catchUpSnap == nil {
303+
catchUpIter := br.detachCatchUpIter()
304+
if catchUpIter == nil {
305305
return nil
306306
}
307307
start := crtime.NowMono()
308308
defer func() {
309-
catchUpSnap.Close()
309+
catchUpIter.Close()
310310
br.metrics.RangeFeedCatchUpScanNanos.Inc(start.Elapsed().Nanoseconds())
311311
}()
312312

313-
return catchUpSnap.CatchUpScan(ctx, br.stream.SendUnbuffered, br.withDiff, br.withFiltering, br.withOmitRemote, br.bulkDelivery)
313+
return catchUpIter.CatchUpScan(ctx, br.stream.SendUnbuffered, br.withDiff, br.withFiltering, br.withOmitRemote, br.bulkDelivery)
314314
}
315315

316316
// Wait for this registration to completely process its internal
@@ -335,13 +335,13 @@ func (br *bufferedRegistration) waitForCaughtUp(ctx context.Context) error {
335335
return errors.Errorf("bufferedRegistration %v failed to empty in time", br.Range())
336336
}
337337

338-
// detachCatchUpSnap detaches the catchUpSnap that was previously attached.
339-
func (br *bufferedRegistration) detachCatchUpSnap() *CatchUpSnapshot {
338+
// detachCatchUpIter detaches the catchUpIter that was previously attached.
339+
func (br *bufferedRegistration) detachCatchUpIter() *CatchUpIterator {
340340
br.mu.Lock()
341341
defer br.mu.Unlock()
342-
catchUpSnap := br.mu.catchUpSnap
343-
br.mu.catchUpSnap = nil
344-
return catchUpSnap
342+
catchUpIter := br.mu.catchUpIter
343+
br.mu.catchUpIter = nil
344+
return catchUpIter
345345
}
346346

347347
var overflowLogEvery = log.Every(5 * time.Second)

pkg/kv/kvserver/rangefeed/buffered_sender_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func TestBufferedSenderOnStreamShutdown(t *testing.T) {
258258
}
259259

260260
// Add our stream to the stream manager.
261-
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpSnap */
261+
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
262262
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
263263
sm.NewStream(streamID, 1 /*rangeID*/))
264264
require.True(t, registered)

0 commit comments

Comments
 (0)