Skip to content

Commit de7c778

Browse files
committed
changefeedccl: make bulk delivery of rangefeed events optional
This is a temporary opt-out until we can properly test the performance impact of bulk delivery. Epic: none Release note (general change): The changefeed bulk delivery setting was made optional.
1 parent 29de882 commit de7c778

File tree

5 files changed

+25
-4
lines changed

5 files changed

+25
-4
lines changed

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
597597
WithDiff: filters.WithDiff,
598598
WithFiltering: filters.WithFiltering,
599599
WithFrontierQuantize: changefeedbase.Quantize.Get(&cfg.Settings.SV),
600+
WithBulkDelivery: changefeedbase.BulkDelivery.Get(&cfg.Settings.SV),
600601
NeedsInitialScan: needsInitialScan,
601602
SchemaChangeEvents: schemaChange.EventClass,
602603
SchemaChangePolicy: schemaChange.Policy,

pkg/ccl/changefeedccl/changefeedbase/settings.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,14 @@ var PerTableProtectedTimestamps = settings.RegisterBoolSetting(
224224
"if false, uses a single protected timestamp record for all tables",
225225
metamorphic.ConstantWithTestBool("changefeed.protect_timestamp.per_table.enabled", false))
226226

227+
// BulkDelivery enables bulk delivery of rangefeed events, which can improve performance during catchup scans.
228+
var BulkDelivery = settings.RegisterBoolSetting(
229+
settings.ApplicationLevel,
230+
"changefeed.bulk_delivery.enabled",
231+
"if true, rangefeed events are delivered in bulk during catchup scans; "+
232+
"if false, rangefeed events are delivered individually",
233+
metamorphic.ConstantWithTestBool("changefeed.bulk_delivery.enabled", true))
234+
227235
// MaxProtectedTimestampAge controls the frequency of protected timestamp record updates
228236
var MaxProtectedTimestampAge = settings.RegisterDurationSetting(
229237
settings.ApplicationLevel,

pkg/ccl/changefeedccl/kvfeed/kv_feed.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ type Config struct {
9494
// enables filtering out any transactional writes with that flag set to true.
9595
WithFiltering bool
9696

97+
// WithBulkDelivery is propagated via the RangefeedRequest to the rangefeed
98+
// server, where if true, the server will deliver rangefeed events in bulk
99+
// during catchup scans.
100+
WithBulkDelivery bool
101+
97102
// WithFrontierQuantize specifies the resolved timestamp quantization
98103
// granularity. If non-zero, resolved timestamps from rangefeed checkpoint
99104
// events will be rounded down to the nearest multiple of the quantization
@@ -135,7 +140,7 @@ func Run(ctx context.Context, cfg Config) error {
135140
f := newKVFeed(
136141
cfg.Writer, cfg.Spans,
137142
cfg.SchemaChangeEvents, cfg.SchemaChangePolicy,
138-
cfg.NeedsInitialScan, cfg.WithDiff, cfg.WithFiltering,
143+
cfg.NeedsInitialScan, cfg.WithDiff, cfg.WithFiltering, cfg.WithBulkDelivery,
139144
cfg.WithFrontierQuantize,
140145
cfg.ConsumerID,
141146
cfg.InitialHighWater, cfg.InitialSpanTimePairs, cfg.EndTime,
@@ -260,6 +265,7 @@ type kvFeed struct {
260265
withDiff bool
261266
withFiltering bool
262267
withInitialBackfill bool
268+
withBulkDelivery bool
263269
consumerID int64
264270
initialHighWater hlc.Timestamp
265271
initialSpanTimePairs []kvcoord.SpanTimePair
@@ -289,7 +295,7 @@ func newKVFeed(
289295
spans []roachpb.Span,
290296
schemaChangeEvents changefeedbase.SchemaChangeEventClass,
291297
schemaChangePolicy changefeedbase.SchemaChangePolicy,
292-
withInitialBackfill, withDiff, withFiltering bool,
298+
withInitialBackfill, withDiff, withFiltering, withBulkDelivery bool,
293299
withFrontierQuantize time.Duration,
294300
consumerID int64,
295301
initialHighWater hlc.Timestamp,
@@ -312,6 +318,7 @@ func newKVFeed(
312318
withDiff: withDiff,
313319
withFiltering: withFiltering,
314320
withFrontierQuantize: withFrontierQuantize,
321+
withBulkDelivery: withBulkDelivery,
315322
consumerID: consumerID,
316323
initialHighWater: initialHighWater,
317324
endTime: endTime,
@@ -615,6 +622,7 @@ func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Fro
615622
WithDiff: f.withDiff,
616623
WithFiltering: f.withFiltering,
617624
WithFrontierQuantize: f.withFrontierQuantize,
625+
WithBulkDelivery: f.withBulkDelivery,
618626
ConsumerID: f.consumerID,
619627
Knobs: f.knobs,
620628
Timers: f.timers,

pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func TestKVFeed(t *testing.T) {
148148
st := timers.New(time.Minute).GetOrCreateScopedTimers("")
149149
f := newKVFeed(buf, tc.spans,
150150
tc.schemaChangeEvents, tc.schemaChangePolicy,
151-
tc.needsInitialScan, tc.withDiff, true /* withFiltering */, tc.withFrontierQuantize,
151+
tc.needsInitialScan, tc.withDiff, true /* withFiltering */, changefeedbase.BulkDelivery.Get(&settings.SV), tc.withFrontierQuantize,
152152
0, /* consumerID */
153153
tc.initialHighWater, tc.initialSpanTimePairs, tc.endTime,
154154
codec,

pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type rangeFeedConfig struct {
3434
WithDiff bool
3535
WithFiltering bool
3636
WithFrontierQuantize time.Duration
37+
WithBulkDelivery bool
3738
ConsumerID int64
3839
RangeObserver kvcoord.RangeObserver
3940
Knobs TestingKnobs
@@ -97,7 +98,10 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang
9798
// Bulk delivery is an optimization that decreases rangefeed overhead during
9899
// catchup scans. It results in the emission of BulkEvents instead of
99100
// individual events where possible.
100-
rfOpts := []kvcoord.RangeFeedOption{kvcoord.WithBulkDelivery()}
101+
var rfOpts []kvcoord.RangeFeedOption
102+
if cfg.WithBulkDelivery {
103+
rfOpts = append(rfOpts, kvcoord.WithBulkDelivery())
104+
}
101105
if cfg.WithDiff {
102106
rfOpts = append(rfOpts, kvcoord.WithDiff())
103107
}

0 commit comments

Comments
 (0)