Skip to content

Commit b98a0ae

Browse files
craig[bot]dt
andcommitted
Merge #150371
150371: kvserver,rangefeed: enable bulk value delivery r=dt a=dt When many values are available at the same time, such as during catch-up scans, transmitting and delivering each as a separate event incurs significant overhead and contention. Instead, we introduce a new event type which is a BulkEvent that consists of many events. During catch-up scans we can buffer up value events and then emit many of them in a single bulk event. Not all callers can handle these new bulk events so the rangefeed server must be asked to emit them, however a client using the kvclient rangefeed wrapper can benefit from them even if it does not directly support them as the client wrapper will automatically enable bulk delivery and then unwrap them for delivery to the caller individually. Release note: none. Epic: none. Co-authored-by: David Taylor <[email protected]>
2 parents 4cd3a9b + 018dc58 commit b98a0ae

21 files changed

+284
-135
lines changed

pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,9 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
178178
// For now, we just error on SST ingestion, since we currently don't
179179
// expect SST ingestion into spans with active changefeeds.
180180
return errors.Errorf("unexpected SST ingestion: %v", t)
181+
case *kvpb.RangeFeedBulkEvents:
182+
// Should be disabled so this is unreachable.
183+
return errors.Errorf("unexpected bulk delivery: %v", t)
181184

182185
case *kvpb.RangeFeedDeleteRange:
183186
// For now, we just ignore on MVCC range tombstones. These are currently

pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error
283283

284284
for !s.transport.IsExhausted() {
285285
args := makeRangeFeedRequest(
286-
s.Span, s.token.Desc().RangeID, m.cfg.overSystemTable, s.startAfter, m.cfg.withDiff, m.cfg.withFiltering, m.cfg.withMatchingOriginIDs, m.cfg.consumerID)
286+
s.Span, s.token.Desc().RangeID, m.cfg.overSystemTable, s.startAfter, m.cfg.withDiff, m.cfg.withFiltering, m.cfg.withMatchingOriginIDs, m.cfg.consumerID, m.cfg.bulkDelivery)
287287
args.Replica = s.transport.NextReplica()
288288
args.StreamID = streamID
289289
s.ReplicaDescriptor = args.Replica

pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ type rangeFeedConfig struct {
6969
withMatchingOriginIDs []uint32
7070
rangeObserver RangeObserver
7171
consumerID int64
72+
bulkDelivery bool
7273

7374
knobs struct {
7475
// onRangefeedEvent invoked on each rangefeed event.
@@ -118,6 +119,12 @@ func WithFiltering() RangeFeedOption {
118119
})
119120
}
120121

122+
func WithBulkDelivery() RangeFeedOption {
123+
return optionFunc(func(c *rangeFeedConfig) {
124+
c.bulkDelivery = true
125+
})
126+
}
127+
121128
// WithMatchingOriginIDs opts the rangefeed into emitting events originally written by
122129
// clusters with the assoicated origin IDs during logical data replication.
123130
func WithMatchingOriginIDs(originIDs ...uint32) RangeFeedOption {
@@ -633,6 +640,7 @@ func makeRangeFeedRequest(
633640
withFiltering bool,
634641
withMatchingOriginIDs []uint32,
635642
consumerID int64,
643+
withBulkDelivery bool,
636644
) kvpb.RangeFeedRequest {
637645
admissionPri := admissionpb.BulkNormalPri
638646
if isSystemRange {
@@ -648,6 +656,7 @@ func makeRangeFeedRequest(
648656
WithDiff: withDiff,
649657
WithFiltering: withFiltering,
650658
WithMatchingOriginIDs: withMatchingOriginIDs,
659+
WithBulkDelivery: withBulkDelivery,
651660
AdmissionHeader: kvpb.AdmissionHeader{
652661
// NB: AdmissionHeader is used only at the start of the range feed
653662
// stream since the initial catch-up scan is expensive.

pkg/kv/kvclient/rangefeed/rangefeed.go

Lines changed: 95 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,12 @@ func (f *RangeFeed) run(ctx context.Context, frontier span.Frontier, resumeWithF
332332
eventCh := make(chan kvcoord.RangeFeedMessage)
333333

334334
var rangefeedOpts []kvcoord.RangeFeedOption
335+
// We can unconditionally enable bulk-delivery from the server at least as far
336+
// as to this client; if an onValues is configured we can also bulk-process
337+
// values, but even if it isn't we know how to unwrap a bulk delivery and pass
338+
// each event to the caller's individual event handlers.
339+
rangefeedOpts = append(rangefeedOpts, kvcoord.WithBulkDelivery())
340+
335341
if f.scanConfig.overSystemTable {
336342
rangefeedOpts = append(rangefeedOpts, kvcoord.WithSystemTablePriority())
337343
}
@@ -427,54 +433,98 @@ func (f *RangeFeed) processEvents(
427433
for {
428434
select {
429435
case ev := <-eventCh:
430-
switch {
431-
case ev.Val != nil:
432-
f.onValue(ctx, ev.Val)
433-
case ev.Checkpoint != nil:
434-
ts := ev.Checkpoint.ResolvedTS
435-
if f.frontierQuantize != 0 {
436-
ts.Logical = 0
437-
ts.WallTime -= ts.WallTime % int64(f.frontierQuantize)
438-
}
439-
advanced, err := frontier.Forward(ev.Checkpoint.Span, ts)
440-
if err != nil {
441-
return err
442-
}
443-
if f.onCheckpoint != nil {
444-
f.onCheckpoint(ctx, ev.Checkpoint)
445-
}
446-
if advanced && f.onFrontierAdvance != nil {
447-
f.onFrontierAdvance(ctx, frontier.Frontier())
448-
}
449-
if f.frontierVisitor != nil {
450-
f.frontierVisitor(ctx, advanced, frontier)
451-
}
452-
case ev.SST != nil:
453-
if f.onSSTable == nil {
454-
return errors.AssertionFailedf(
455-
"received unexpected rangefeed SST event with no OnSSTable handler")
456-
}
457-
f.onSSTable(ctx, ev.SST, ev.RegisteredSpan)
458-
case ev.DeleteRange != nil:
459-
if f.onDeleteRange == nil {
460-
if f.knobs != nil && f.knobs.IgnoreOnDeleteRangeError {
461-
continue
462-
}
463-
return errors.AssertionFailedf(
464-
"received unexpected rangefeed DeleteRange event with no OnDeleteRange handler: %s", ev)
465-
}
466-
f.onDeleteRange(ctx, ev.DeleteRange)
467-
case ev.Metadata != nil:
468-
if f.onMetadata == nil {
469-
return errors.AssertionFailedf("received unexpected metadata event with no OnMetadata handler")
470-
}
471-
f.onMetadata(ctx, ev.Metadata)
472-
case ev.Error != nil:
473-
// Intentionally do nothing, we'll get an error returned from the
474-
// call to RangeFeed.
436+
if err := f.processEvent(ctx, frontier, ev.RangeFeedEvent, ev.RegisteredSpan); err != nil {
437+
return err
475438
}
476439
case <-ctx.Done():
477440
return ctx.Err()
478441
}
479442
}
480443
}
444+
445+
func (f *RangeFeed) processEvent(
446+
ctx context.Context, frontier span.Frontier, ev *kvpb.RangeFeedEvent, registeredSpan roachpb.Span,
447+
) error {
448+
switch {
449+
case ev.Val != nil:
450+
f.onValue(ctx, ev.Val)
451+
case ev.Checkpoint != nil:
452+
ts := ev.Checkpoint.ResolvedTS
453+
if f.frontierQuantize != 0 {
454+
ts.Logical = 0
455+
ts.WallTime -= ts.WallTime % int64(f.frontierQuantize)
456+
}
457+
advanced, err := frontier.Forward(ev.Checkpoint.Span, ts)
458+
if err != nil {
459+
return err
460+
}
461+
if f.onCheckpoint != nil {
462+
f.onCheckpoint(ctx, ev.Checkpoint)
463+
}
464+
if advanced && f.onFrontierAdvance != nil {
465+
f.onFrontierAdvance(ctx, frontier.Frontier())
466+
}
467+
if f.frontierVisitor != nil {
468+
f.frontierVisitor(ctx, advanced, frontier)
469+
}
470+
case ev.SST != nil:
471+
if f.onSSTable == nil {
472+
return errors.AssertionFailedf(
473+
"received unexpected rangefeed SST event with no OnSSTable handler")
474+
}
475+
f.onSSTable(ctx, ev.SST, registeredSpan)
476+
case ev.DeleteRange != nil:
477+
if f.onDeleteRange == nil {
478+
if f.knobs != nil && f.knobs.IgnoreOnDeleteRangeError {
479+
return nil
480+
}
481+
return errors.AssertionFailedf(
482+
"received unexpected rangefeed DeleteRange event with no OnDeleteRange handler: %s", ev)
483+
}
484+
f.onDeleteRange(ctx, ev.DeleteRange)
485+
case ev.Metadata != nil:
486+
if f.onMetadata == nil {
487+
return errors.AssertionFailedf("received unexpected metadata event with no OnMetadata handler")
488+
}
489+
f.onMetadata(ctx, ev.Metadata)
490+
case ev.Error != nil:
491+
// Intentionally do nothing, we'll get an error returned from the
492+
// call to RangeFeed.
493+
case ev.BulkEvents != nil:
494+
if f.onValues != nil {
495+
// We can optimistically assume the bulk event consists of all value
496+
// events, and allocate a buffer for them to be passed to onValues. In the
497+
// rare case we hit a non-value event (it would have to be a range key as
498+
// only a catch-up scan currently produces bulk events), we will throw out
499+
// this buffer and any events we might have copied to it so far and just
500+
// fallback to to processing each event, but this should be so uncommon it
501+
// is not worth worrying about the potential wasted work.
502+
allValues := true
503+
buf := make([]kv.KeyValue, len(ev.BulkEvents.Events))
504+
for i := range ev.BulkEvents.Events {
505+
if ev.BulkEvents.Events[i].Val != nil {
506+
buf[i] = kv.KeyValue{
507+
Key: ev.BulkEvents.Events[i].Val.Key,
508+
Value: &ev.BulkEvents.Events[i].Val.Value,
509+
}
510+
} else {
511+
allValues = false
512+
break
513+
}
514+
}
515+
if allValues {
516+
f.onValues(ctx, buf)
517+
return nil
518+
}
519+
}
520+
// Either the bulk event contains non-value events or a onValues handler is
521+
// not configured, so process each event individually.
522+
for _, e := range ev.BulkEvents.Events {
523+
if err := f.processEvent(ctx, frontier, e, registeredSpan); err != nil {
524+
return err
525+
}
526+
}
527+
528+
}
529+
return nil
530+
}

pkg/kv/kvpb/api.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2160,6 +2160,9 @@ func (e *RangeFeedEvent) ShallowCopy() *RangeFeedEvent {
21602160
case *RangeFeedError:
21612161
cpyErr := *t
21622162
cpy.MustSetValue(&cpyErr)
2163+
case *RangeFeedBulkEvents:
2164+
cpyVals := *t
2165+
cpy.MustSetValue(&cpyVals)
21632166
default:
21642167
panic(fmt.Sprintf("unexpected RangeFeedEvent variant: %v", t))
21652168
}

pkg/kv/kvpb/api.proto

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3280,7 +3280,11 @@ message RangeFeedRequest {
32803280
// ConsumerID is set by the caller to identify itself.
32813281
int64 consumer_id = 9 [(gogoproto.customname) = "ConsumerID"];
32823282

3283-
// NextID = 10;
3283+
// WithBulkDelivery requests the stream include "bulk" events which wrap many
3284+
// events in a single event to reduce overhead, e.g. during scans.
3285+
bool with_bulk_delivery = 10;
3286+
3287+
// NextID = 11;
32843288
}
32853289

32863290
// RangeFeedValue is a variant of RangeFeedEvent that represents an update to
@@ -3296,6 +3300,14 @@ message RangeFeedValue {
32963300
Value prev_value = 3 [(gogoproto.nullable) = false];
32973301
}
32983302

3303+
// RangeFeedBulkEvents is a variant of RangeFeedEvent that represetns a
3304+
// collection of many RangeFeedEvent delivered as a single event rather than
3305+
// individually for reduced delivery overhead when many events are available,
3306+
// such as during catch-up scans.
3307+
message RangeFeedBulkEvents {
3308+
repeated RangeFeedEvent events = 1;
3309+
}
3310+
32993311
// RangeFeedCheckpoint is a variant of RangeFeedEvent that represents the
33003312
// promise that no more RangeFeedValue events with keys in the specified span
33013313
// with timestamps less than or equal to the specified resolved timestamp will
@@ -3365,6 +3377,7 @@ message RangeFeedEvent {
33653377
RangeFeedSSTable sst = 4 [(gogoproto.customname) = "SST"];
33663378
RangeFeedDeleteRange delete_range = 5;
33673379
RangeFeedMetadata metadata = 6;
3380+
RangeFeedBulkEvents bulk_events = 7;
33683381
}
33693382

33703383
// MuxRangeFeedEvent is a response generated by MuxRangeFeed RPC. It tags

pkg/kv/kvserver/rangefeed/bench_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) {
104104
const withFiltering = false
105105
streams[i] = &noopStream{ctx: ctx, done: make(chan *kvpb.Error, 1)}
106106
ok, _, _ := p.Register(ctx, span, hlc.MinTimestamp, nil,
107-
withDiff, withFiltering, false, /* withOmitRemote */
107+
withDiff, withFiltering, false /* withOmitRemote */, false, /* withBulkDelivery */
108108
streams[i])
109109
require.True(b, ok)
110110
}

pkg/kv/kvserver/rangefeed/buffered_registration.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func newBufferedRegistration(
7878
withDiff bool,
7979
withFiltering bool,
8080
withOmitRemote bool,
81+
withBulkDelivery bool,
8182
bufferSz int,
8283
blockWhenFull bool,
8384
metrics *Metrics,
@@ -93,6 +94,7 @@ func newBufferedRegistration(
9394
withFiltering: withFiltering,
9495
withOmitRemote: withOmitRemote,
9596
removeRegFromProcessor: removeRegFromProcessor,
97+
bulkDelivery: withBulkDelivery,
9698
},
9799
metrics: metrics,
98100
stream: stream,
@@ -308,7 +310,7 @@ func (br *bufferedRegistration) maybeRunCatchUpScan(ctx context.Context) error {
308310
br.metrics.RangeFeedCatchUpScanNanos.Inc(timeutil.Since(start).Nanoseconds())
309311
}()
310312

311-
return catchUpIter.CatchUpScan(ctx, br.stream.SendUnbuffered, br.withDiff, br.withFiltering, br.withOmitRemote)
313+
return catchUpIter.CatchUpScan(ctx, br.stream.SendUnbuffered, br.withDiff, br.withFiltering, br.withOmitRemote, br.bulkDelivery)
312314
}
313315

314316
// Wait for this registration to completely process its internal

pkg/kv/kvserver/rangefeed/catchup_scan.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ func (i *CatchUpIterator) Close() {
119119
// TODO(ssd): Clarify memory ownership. Currently, the memory backing
120120
// the RangeFeedEvents isn't modified by the caller after this
121121
// returns. However, we may revist this in #69596.
122+
// TODO(dt): Does this really need to be a pointer to a struct containing all
123+
// pointers? can we pass by value instead?
122124
type outputEventFn func(e *kvpb.RangeFeedEvent) error
123125

124126
// CatchUpScan iterates over all changes in the configured key/time span, and
@@ -137,10 +139,11 @@ type outputEventFn func(e *kvpb.RangeFeedEvent) error
137139
// to SimpleMVCCIterator to replace the context.
138140
func (i *CatchUpIterator) CatchUpScan(
139141
ctx context.Context,
140-
outputFn outputEventFn,
142+
emitFn outputEventFn,
141143
withDiff bool,
142144
withFiltering bool,
143145
withOmitRemote bool,
146+
bulkDelivery bool,
144147
) error {
145148
var a bufalloc.ByteAllocator
146149
// MVCCIterator will encounter historical values for each key in
@@ -150,6 +153,26 @@ func (i *CatchUpIterator) CatchUpScan(
150153
// as we fill in previous values.
151154
reorderBuf := make([]kvpb.RangeFeedEvent, 0, 5)
152155

156+
outputFn := emitFn
157+
var emitBufSize int
158+
var emitBuf []*kvpb.RangeFeedEvent
159+
160+
if bulkDelivery {
161+
outputFn = func(event *kvpb.RangeFeedEvent) error {
162+
emitBuf = append(emitBuf, event)
163+
emitBufSize += event.Size()
164+
// If there are ~2MB of buffered events, flush them.
165+
if emitBufSize >= 2<<20 {
166+
if err := emitFn(&kvpb.RangeFeedEvent{BulkEvents: &kvpb.RangeFeedBulkEvents{Events: emitBuf}}); err != nil {
167+
return err
168+
}
169+
emitBuf = make([]*kvpb.RangeFeedEvent, 0, len(emitBuf))
170+
emitBufSize = 0
171+
}
172+
return nil
173+
}
174+
}
175+
153176
outputEvents := func() error {
154177
for i := len(reorderBuf) - 1; i >= 0; i-- {
155178
e := reorderBuf[i]
@@ -383,5 +406,16 @@ func (i *CatchUpIterator) CatchUpScan(
383406
}
384407

385408
// Output events for the last key encountered.
386-
return outputEvents()
409+
if err := outputEvents(); err != nil {
410+
return err
411+
}
412+
// If bulk delivery has buffered anything for emission, flush it.
413+
if len(emitBuf) > 0 {
414+
// Flush any remaining buffered events.
415+
if err := emitFn(&kvpb.RangeFeedEvent{BulkEvents: &kvpb.RangeFeedBulkEvents{Events: emitBuf}}); err != nil {
416+
return err
417+
}
418+
}
419+
420+
return nil
387421
}

pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func runCatchUpBenchmark(b *testing.B, emk engineMaker, opts benchOptions) (numE
5454
err = iter.CatchUpScan(ctx, func(*kvpb.RangeFeedEvent) error {
5555
counter++
5656
return nil
57-
}, opts.withDiff, false /* withFiltering */, false /* withOmitRemote */)
57+
}, opts.withDiff, false /* withFiltering */, false /* withOmitRemote */, false)
5858
if err != nil {
5959
b.Fatalf("failed catchUp scan: %+v", err)
6060
}

0 commit comments

Comments
 (0)