Skip to content

Commit ffa3d5c

Browse files
craig[bot]dt
andcommitted
Merge #150738
150738: kvserver/rangefeed: enable bulk delivery by default r=dt a=dt This enables bulk delivery during catch up scans by default, so long as it is also requested by the client, with a ~2MB size target. Release note (performance improvement): the efficiency and throughput of catch-up scans used by CDC and PCR is improved in cases where substantial catch-up work is required. Epic: none. While I'm here: since we have a setting we may as well use it to control the buffer's size target rather than just on vs off. Co-authored-by: David Taylor <[email protected]>
2 parents 3c1974c + 4c933b8 commit ffa3d5c

14 files changed

+115
-92
lines changed

pkg/kv/kvserver/rangefeed/bench_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) {
104104
const withFiltering = false
105105
streams[i] = &noopStream{ctx: ctx, done: make(chan *kvpb.Error, 1)}
106106
ok, _, _ := p.Register(ctx, span, hlc.MinTimestamp, nil,
107-
withDiff, withFiltering, false /* withOmitRemote */, false, /* withBulkDelivery */
107+
withDiff, withFiltering, false /* withOmitRemote */, noBulkDelivery,
108108
streams[i])
109109
require.True(b, ok)
110110
}

pkg/kv/kvserver/rangefeed/buffered_registration.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func newBufferedRegistration(
7878
withDiff bool,
7979
withFiltering bool,
8080
withOmitRemote bool,
81-
withBulkDelivery bool,
81+
withBulkDelivery int,
8282
bufferSz int,
8383
blockWhenFull bool,
8484
metrics *Metrics,

pkg/kv/kvserver/rangefeed/catchup_scan.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func (i *CatchUpIterator) CatchUpScan(
143143
withDiff bool,
144144
withFiltering bool,
145145
withOmitRemote bool,
146-
bulkDelivery bool,
146+
bulkDeliverySize int,
147147
) error {
148148
var a bufalloc.ByteAllocator
149149
// MVCCIterator will encounter historical values for each key in
@@ -157,12 +157,12 @@ func (i *CatchUpIterator) CatchUpScan(
157157
var emitBufSize int
158158
var emitBuf []*kvpb.RangeFeedEvent
159159

160-
if bulkDelivery {
160+
if bulkDeliverySize > 0 {
161161
outputFn = func(event *kvpb.RangeFeedEvent) error {
162162
emitBuf = append(emitBuf, event)
163163
emitBufSize += event.Size()
164164
// If there are ~2MB of buffered events, flush them.
165-
if emitBufSize >= 2<<20 {
165+
if emitBufSize >= bulkDeliverySize {
166166
if err := emitFn(&kvpb.RangeFeedEvent{BulkEvents: &kvpb.RangeFeedBulkEvents{Events: emitBuf}}); err != nil {
167167
return err
168168
}

pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import (
3131
"github.com/stretchr/testify/require"
3232
)
3333

34+
const noBulkDelivery = 0
35+
3436
func runCatchUpBenchmark(b *testing.B, emk engineMaker, opts benchOptions) (numEvents int) {
3537
eng, _ := setupData(context.Background(), b, emk, opts.dataOpts)
3638
defer eng.Close()
@@ -54,7 +56,7 @@ func runCatchUpBenchmark(b *testing.B, emk engineMaker, opts benchOptions) (numE
5456
err = iter.CatchUpScan(ctx, func(*kvpb.RangeFeedEvent) error {
5557
counter++
5658
return nil
57-
}, opts.withDiff, false /* withFiltering */, false /* withOmitRemote */, false)
59+
}, opts.withDiff, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery)
5860
if err != nil {
5961
b.Fatalf("failed catchUp scan: %+v", err)
6062
}

pkg/kv/kvserver/rangefeed/catchup_scan_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import (
2929
// bugs in time-bound iterators.
3030
var smallEngineBlocks = metamorphic.ConstantWithTestBool("small-engine-blocks", false)
3131

32+
const noBulkDelivery = 0
33+
3234
// TODO(erikgrinaker): This should be migrated to a data-driven test harness for
3335
// end-to-end rangefeed testing, with more exhaustive test cases. See:
3436
// https://github.com/cockroachdb/cockroach/issues/82715
@@ -160,7 +162,7 @@ func TestCatchupScan(t *testing.T) {
160162
require.NoError(t, iter.CatchUpScan(ctx, func(e *kvpb.RangeFeedEvent) error {
161163
events = append(events, *e.Val)
162164
return nil
163-
}, withDiff, withFiltering, false /* withOmitRemote */, false))
165+
}, withDiff, withFiltering, false /* withOmitRemote */, noBulkDelivery))
164166
if !(withFiltering && omitInRangefeeds) {
165167
require.Equal(t, 7, len(events))
166168
} else {
@@ -238,7 +240,7 @@ func TestCatchupScanOriginID(t *testing.T) {
238240
require.NoError(t, iter.CatchUpScan(ctx, func(e *kvpb.RangeFeedEvent) error {
239241
events = append(events, *e.Val)
240242
return nil
241-
}, false /* withDiff */, false /* withFiltering */, omitRemote, false))
243+
}, false /* withDiff */, false /* withFiltering */, omitRemote, noBulkDelivery))
242244
if omitRemote {
243245
require.Equal(t, 1, len(events))
244246
} else {
@@ -269,7 +271,7 @@ func TestCatchupScanInlineError(t *testing.T) {
269271
require.NoError(t, err)
270272
defer iter.Close()
271273

272-
err = iter.CatchUpScan(ctx, nil, false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, false)
274+
err = iter.CatchUpScan(ctx, nil, false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery)
273275
require.Error(t, err)
274276
require.Contains(t, err.Error(), "unexpected inline value")
275277
}
@@ -317,7 +319,7 @@ func TestCatchupScanSeesOldIntent(t *testing.T) {
317319
require.NoError(t, iter.CatchUpScan(ctx, func(e *kvpb.RangeFeedEvent) error {
318320
keys[string(e.Val.Key)] = struct{}{}
319321
return nil
320-
}, true /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, false))
322+
}, true /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery))
321323
require.Equal(t, map[string]struct{}{
322324
"b": {},
323325
"e": {},

pkg/kv/kvserver/rangefeed/processor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ type Processor interface {
193193
withDiff bool,
194194
withFiltering bool,
195195
withOmitRemote bool,
196-
withBulkDelivery bool,
196+
bulkDeliverySize int,
197197
stream Stream,
198198
) (bool, Disconnector, *Filter)
199199

pkg/kv/kvserver/rangefeed/processor_test.go

Lines changed: 76 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,11 @@ func TestProcessorBasic(t *testing.T) {
6565
r1Stream.ctx,
6666
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
6767
hlc.Timestamp{WallTime: 1},
68-
nil, /* catchUpIter */
69-
false, /* withDiff */
70-
false, /* withFiltering */
71-
false /* withOmitRemote */, false, /* withBulkDelivery */
68+
nil, /* catchUpIter */
69+
false, /* withDiff */
70+
false, /* withFiltering */
71+
false, /* withOmitRemote */
72+
noBulkDelivery,
7273
h.toBufferedStreamIfNeeded(r1Stream),
7374
)
7475
require.True(t, r1OK)
@@ -199,10 +200,11 @@ func TestProcessorBasic(t *testing.T) {
199200
r2Stream.ctx,
200201
roachpb.RSpan{Key: roachpb.RKey("c"), EndKey: roachpb.RKey("z")},
201202
hlc.Timestamp{WallTime: 1},
202-
nil, /* catchUpIter */
203-
true, /* withDiff */
204-
true, /* withFiltering */
205-
false /* withOmitRemote */, false, /* withBulkDelivery */
203+
nil, /* catchUpIter */
204+
true, /* withDiff */
205+
true, /* withFiltering */
206+
false, /* withOmitRemote */
207+
noBulkDelivery,
206208
h.toBufferedStreamIfNeeded(r2Stream),
207209
)
208210
require.True(t, r2OK)
@@ -311,10 +313,11 @@ func TestProcessorBasic(t *testing.T) {
311313
r3Stream.ctx,
312314
roachpb.RSpan{Key: roachpb.RKey("c"), EndKey: roachpb.RKey("z")},
313315
hlc.Timestamp{WallTime: 1},
314-
nil, /* catchUpIter */
315-
false, /* withDiff */
316-
false, /* withFiltering */
317-
false /* withOmitRemote */, false, /* withBulkDelivery */
316+
nil, /* catchUpIter */
317+
false, /* withDiff */
318+
false, /* withFiltering */
319+
false, /* withOmitRemote */
320+
noBulkDelivery,
318321
h.toBufferedStreamIfNeeded(r3Stream),
319322
)
320323
require.True(t, r30K)
@@ -332,10 +335,11 @@ func TestProcessorBasic(t *testing.T) {
332335
r4Stream.ctx,
333336
roachpb.RSpan{Key: roachpb.RKey("c"), EndKey: roachpb.RKey("z")},
334337
hlc.Timestamp{WallTime: 1},
335-
nil, /* catchUpIter */
336-
false, /* withDiff */
337-
false, /* withFiltering */
338-
false /* withOmitRemote */, false, /* withBulkDelivery */
338+
nil, /* catchUpIter */
339+
false, /* withDiff */
340+
false, /* withFiltering */
341+
false, /* withOmitRemote */
342+
noBulkDelivery,
339343
h.toBufferedStreamIfNeeded(r4Stream),
340344
)
341345
require.False(t, r4OK)
@@ -357,10 +361,11 @@ func TestProcessorOmitRemote(t *testing.T) {
357361
r1Stream.ctx,
358362
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
359363
hlc.Timestamp{WallTime: 1},
360-
nil, /* catchUpIter */
361-
false, /* withDiff */
362-
false, /* withFiltering */
363-
false /* withOmitRemote */, false, /* withBulkDelivery */
364+
nil, /* catchUpIter */
365+
false, /* withDiff */
366+
false, /* withFiltering */
367+
false, /* withOmitRemote */
368+
noBulkDelivery,
364369
h.toBufferedStreamIfNeeded(r1Stream),
365370
)
366371
require.True(t, r1OK)
@@ -386,7 +391,7 @@ func TestProcessorOmitRemote(t *testing.T) {
386391
false, /* withDiff */
387392
false, /* withFiltering */
388393
true, /* withOmitRemote */
389-
false, /* withBulkDelivery */
394+
noBulkDelivery,
390395
h.toBufferedStreamIfNeeded(r2Stream),
391396
)
392397
require.True(t, r2OK)
@@ -440,21 +445,23 @@ func TestProcessorSlowConsumer(t *testing.T) {
440445
r1Stream.ctx,
441446
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
442447
hlc.Timestamp{WallTime: 1},
443-
nil, /* catchUpIter */
444-
false, /* withDiff */
445-
false, /* withFiltering */
446-
false /* withOmitRemote */, false, /* withBulkDelivery */
448+
nil, /* catchUpIter */
449+
false, /* withDiff */
450+
false, /* withFiltering */
451+
false, /* withOmitRemote */
452+
noBulkDelivery,
447453
h.toBufferedStreamIfNeeded(r1Stream),
448454
)
449455
r2Stream := newTestStream()
450456
p.Register(
451457
r2Stream.ctx,
452458
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
453459
hlc.Timestamp{WallTime: 1},
454-
nil, /* catchUpIter */
455-
false, /* withDiff */
456-
false, /* withFiltering */
457-
false /* withOmitRemote */, false, /* withBulkDelivery */
460+
nil, /* catchUpIter */
461+
false, /* withDiff */
462+
false, /* withFiltering */
463+
false, /* withOmitRemote */
464+
noBulkDelivery,
458465
h.toBufferedStreamIfNeeded(r2Stream),
459466
)
460467
h.syncEventAndRegistrations()
@@ -546,10 +553,11 @@ func TestProcessorMemoryBudgetExceeded(t *testing.T) {
546553
r1Stream.ctx,
547554
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
548555
hlc.Timestamp{WallTime: 1},
549-
nil, /* catchUpIter */
550-
false, /* withDiff */
551-
false, /* withFiltering */
552-
false /* withOmitRemote */, false, /* withBulkDelivery */
556+
nil, /* catchUpIter */
557+
false, /* withDiff */
558+
false, /* withFiltering */
559+
false, /* withOmitRemote */
560+
noBulkDelivery,
553561
h.toBufferedStreamIfNeeded(r1Stream),
554562
)
555563
h.syncEventAndRegistrations()
@@ -601,10 +609,11 @@ func TestProcessorMemoryBudgetReleased(t *testing.T) {
601609
r1Stream.ctx,
602610
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
603611
hlc.Timestamp{WallTime: 1},
604-
nil, /* catchUpIter */
605-
false, /* withDiff */
606-
false, /* withFiltering */
607-
false /* withOmitRemote */, false, /* withBulkDelivery */
612+
nil, /* catchUpIter */
613+
false, /* withDiff */
614+
false, /* withFiltering */
615+
false, /* withOmitRemote */
616+
noBulkDelivery,
608617
h.toBufferedStreamIfNeeded(r1Stream),
609618
)
610619
h.syncEventAndRegistrations()
@@ -682,10 +691,11 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) {
682691
r1Stream.ctx,
683692
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
684693
hlc.Timestamp{WallTime: 1},
685-
nil, /* catchUpIter */
686-
false, /* withDiff */
687-
false, /* withFiltering */
688-
false /* withOmitRemote */, false, /* withBulkDelivery */
694+
nil, /* catchUpIter */
695+
false, /* withDiff */
696+
false, /* withFiltering */
697+
false, /* withOmitRemote */
698+
noBulkDelivery,
689699
h.toBufferedStreamIfNeeded(r1Stream),
690700
)
691701
h.syncEventAndRegistrations()
@@ -993,7 +1003,8 @@ func TestProcessorConcurrentStop(t *testing.T) {
9931003
runtime.Gosched()
9941004
s := newTestStream()
9951005
p.Register(s.ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
996-
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, false, /* withBulkDelivery */
1006+
false /* withDiff */, false /* withFiltering */, false, /* withOmitRemote */
1007+
noBulkDelivery,
9971008
h.toBufferedStreamIfNeeded(s))
9981009
}()
9991010
go func() {
@@ -1066,7 +1077,8 @@ func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) {
10661077
s := newTestStream()
10671078
regs[s] = firstIdx
10681079
p.Register(s.ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
1069-
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, false, /* withBulkDelivery */
1080+
false /* withDiff */, false /* withFiltering */, false, /* withOmitRemote */
1081+
noBulkDelivery,
10701082
h.toBufferedStreamIfNeeded(s))
10711083
regDone <- struct{}{}
10721084
}
@@ -1123,10 +1135,11 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) {
11231135
rStream.ctx,
11241136
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
11251137
hlc.Timestamp{WallTime: 1},
1126-
nil, /* catchUpIter */
1127-
false, /* withDiff */
1128-
false, /* withFiltering */
1129-
false /* withOmitRemote */, false, /* withBulkDelivery */
1138+
nil, /* catchUpIter */
1139+
false, /* withDiff */
1140+
false, /* withFiltering */
1141+
false, /* withOmitRemote */
1142+
noBulkDelivery,
11301143
h.toBufferedStreamIfNeeded(rStream),
11311144
)
11321145
h.syncEventAndRegistrations()
@@ -1203,10 +1216,11 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) {
12031216
rStream.ctx,
12041217
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
12051218
hlc.Timestamp{WallTime: 1},
1206-
nil, /* catchUpIter */
1207-
false, /* withDiff */
1208-
false, /* withFiltering */
1209-
false /* withOmitRemote */, false, /* withBulkDelivery */
1219+
nil, /* catchUpIter */
1220+
false, /* withDiff */
1221+
false, /* withFiltering */
1222+
false, /* withOmitRemote */
1223+
noBulkDelivery,
12101224
h.toBufferedStreamIfNeeded(rStream),
12111225
)
12121226
h.syncEventAndRegistrations()
@@ -1273,10 +1287,11 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) {
12731287
r1Stream.ctx,
12741288
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
12751289
hlc.Timestamp{WallTime: 1},
1276-
nil, /* catchUpIter */
1277-
false, /* withDiff */
1278-
false, /* withFiltering */
1279-
false /* withOmitRemote */, false, /* withBulkDelivery */
1290+
nil, /* catchUpIter */
1291+
false, /* withDiff */
1292+
false, /* withFiltering */
1293+
false, /* withOmitRemote */
1294+
noBulkDelivery,
12801295
h.toBufferedStreamIfNeeded(r1Stream),
12811296
)
12821297

@@ -1286,10 +1301,10 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) {
12861301
r2Stream.ctx,
12871302
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
12881303
hlc.Timestamp{WallTime: 1},
1289-
nil, /* catchUpIter */
1290-
false, /* withDiff */
1291-
false, /* withFiltering */
1292-
false /* withOmitRemote */, false, /* withBulkDelivery */
1304+
nil, /* catchUpIter */
1305+
false, /* withDiff */
1306+
false, /* withFiltering */
1307+
false /* withOmitRemote */, noBulkDelivery,
12931308
h.toBufferedStreamIfNeeded(r2Stream),
12941309
)
12951310
h.syncEventAndRegistrations()
@@ -1458,7 +1473,8 @@ func TestProcessorBackpressure(t *testing.T) {
14581473
// Add a registration.
14591474
stream := newTestStream()
14601475
ok, _, _ := p.Register(stream.ctx, span, hlc.MinTimestamp, nil, /* catchUpIter */
1461-
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, false, /* withBulkDelivery */
1476+
false /* withDiff */, false /* withFiltering */, false, /* withOmitRemote */
1477+
noBulkDelivery,
14621478
h.toBufferedStreamIfNeeded(stream))
14631479
require.True(t, ok)
14641480

pkg/kv/kvserver/rangefeed/registry.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ type baseRegistration struct {
8686
withDiff bool
8787
withFiltering bool
8888
withOmitRemote bool
89-
bulkDelivery bool
89+
bulkDelivery int
9090
// removeRegFromProcessor is called to remove the registration from its
9191
// processor. This is provided by the creator of the registration and called
9292
// during disconnect(). Since it is called during disconnect it must be

pkg/kv/kvserver/rangefeed/registry_helper_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ type testRegistrationConfig struct {
353353
withDiff bool
354354
withFiltering bool
355355
withOmitRemote bool
356-
withBulkDelivery bool
356+
withBulkDelivery int
357357
withRegistrationTestTypes registrationType
358358
metrics *Metrics
359359
}

0 commit comments

Comments
 (0)