Skip to content

Commit 06deee5

Browse files
authored
Merge pull request #150162 from elizaMkraule/backport25.3-147543-148753
release-25.3: changefeedccl: gate Kafka v2 message too large error detail behind cluster setting
2 parents e654291 + ef8e5d3 commit 06deee5

File tree

5 files changed

+33
-11
lines changed

5 files changed

+33
-11
lines changed

docs/generated/settings/settings-for-tenants.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consu
1818
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled application
1919
changefeed.fast_gzip.enabled boolean true use fast gzip implementation application
2020
changefeed.span_checkpoint.lag_threshold (alias: changefeed.frontier_highwater_lag_checkpoint_threshold) duration 10m0s the amount of time a changefeed's lagging (slowest) spans must lag behind its leading (fastest) spans before a span-level checkpoint to save leading span progress is written; if 0, span-level checkpoints due to lagging spans is disabled application
21+
changefeed.kafka_v2_error_details.enabled boolean true if enabled, Kafka v2 sinks will include the message key, size, and MVCC timestamp in message too large errors application
2122
changefeed.memory.per_changefeed_limit byte size 512 MiB controls amount of data that can be buffered per changefeed application
2223
changefeed.resolved_timestamp.min_update_interval (alias: changefeed.min_highwater_advance) duration 0s minimum amount of time that must have elapsed since the last time a changefeed's resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress application
2324
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds application

docs/generated/settings/settings.html

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
<tr><td><div id="setting-changefeed-event-consumer-workers" class="anchored"><code>changefeed.event_consumer_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
2424
<tr><td><div id="setting-changefeed-fast-gzip-enabled" class="anchored"><code>changefeed.fast_gzip.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
2525
<tr><td><div id="setting-changefeed-frontier-highwater-lag-checkpoint-threshold" class="anchored"><code>changefeed.span_checkpoint.lag_threshold<br />(alias: changefeed.frontier_highwater_lag_checkpoint_threshold)</code></div></td><td>duration</td><td><code>10m0s</code></td><td>the amount of time a changefeed&#39;s lagging (slowest) spans must lag behind its leading (fastest) spans before a span-level checkpoint to save leading span progress is written; if 0, span-level checkpoints due to lagging spans is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
26+
<tr><td><div id="setting-changefeed-kafka-v2-error-details-enabled" class="anchored"><code>changefeed.kafka_v2_error_details.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled, Kafka v2 sinks will include the message key, size, and MVCC timestamp in message too large errors</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
2627
<tr><td><div id="setting-changefeed-memory-per-changefeed-limit" class="anchored"><code>changefeed.memory.per_changefeed_limit</code></div></td><td>byte size</td><td><code>512 MiB</code></td><td>controls amount of data that can be buffered per changefeed</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
2728
<tr><td><div id="setting-changefeed-min-highwater-advance" class="anchored"><code>changefeed.resolved_timestamp.min_update_interval<br />(alias: changefeed.min_highwater_advance)</code></div></td><td>duration</td><td><code>0s</code></td><td>minimum amount of time that must have elapsed since the last time a changefeed&#39;s resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
2829
<tr><td><div id="setting-changefeed-node-throttle-config" class="anchored"><code>changefeed.node_throttle_config</code></div></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td><td>Serverless/Dedicated/Self-Hosted</td></tr>

pkg/ccl/changefeedccl/changefeedbase/settings.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,3 +349,14 @@ var Quantize = settings.RegisterDurationSettingWithExplicitUnit(
349349
time.Duration(metamorphic.ConstantWithTestRange("changefeed.resolved_timestamp.granularity", 1, 0, 10))*time.Second,
350350
settings.DurationWithMinimum(0),
351351
)
352+
353+
// KafkaV2IncludeErrorDetails enables detailed error messages for Kafka v2 sinks
354+
// when message_too_large errors occur. This includes the message key, size,
355+
// and MVCC timestamp in the error.
356+
var KafkaV2ErrorDetailsEnabled = settings.RegisterBoolSetting(
357+
settings.ApplicationLevel,
358+
"changefeed.kafka_v2_error_details.enabled",
359+
"if enabled, Kafka v2 sinks will include the message key, size, and MVCC timestamp in message too large errors",
360+
true,
361+
settings.WithPublic,
362+
)

pkg/ccl/changefeedccl/sink_kafka_v2.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@ type kafkaSinkClientV2 struct {
4141
client KafkaClientV2
4242
adminClient KafkaAdminClientV2
4343

44-
knobs kafkaSinkV2Knobs
45-
canTryResizing bool
46-
recordResize func(numRecords int64)
44+
knobs kafkaSinkV2Knobs
45+
canTryResizing bool
46+
includeErrorDetails bool
47+
recordResize func(numRecords int64)
4748

4849
topicsForConnectionCheck []string
4950

@@ -124,6 +125,7 @@ func newKafkaSinkClientV2(
124125
knobs: knobs,
125126
batchCfg: batchCfg,
126127
canTryResizing: changefeedbase.BatchReductionRetryEnabled.Get(&settings.SV),
128+
includeErrorDetails: changefeedbase.KafkaV2ErrorDetailsEnabled.Get(&settings.SV),
127129
recordResize: recordResize,
128130
topicsForConnectionCheck: topicsForConnectionCheck,
129131
}
@@ -165,7 +167,7 @@ func (k *kafkaSinkClientV2) Flush(ctx context.Context, payload SinkPayload) (ret
165167
}
166168
return nil
167169
} else {
168-
if len(msgs) == 1 && errors.Is(err, kerr.MessageTooLarge) {
170+
if len(msgs) == 1 && errors.Is(err, kerr.MessageTooLarge) && k.includeErrorDetails {
169171
msg := msgs[0]
170172
mvccVal := msg.Context.Value(mvccTSKey{})
171173
var ts hlc.Timestamp
@@ -273,7 +275,7 @@ func (k *kafkaSinkClientV2) maybeUpdateTopicPartitions(
273275

274276
// MakeBatchBuffer implements SinkClient.
275277
func (k *kafkaSinkClientV2) MakeBatchBuffer(topic string) BatchBuffer {
276-
return &kafkaBuffer{topic: topic, batchCfg: k.batchCfg}
278+
return &kafkaBuffer{topic: topic, batchCfg: k.batchCfg, includeErrorDetails: k.includeErrorDetails}
277279
}
278280

279281
func (k *kafkaSinkClientV2) shouldTryResizing(err error, msgs []*kgo.Record) bool {
@@ -311,7 +313,8 @@ type kafkaBuffer struct {
311313
messages []*kgo.Record
312314
byteCount int
313315

314-
batchCfg sinkBatchConfig
316+
batchCfg sinkBatchConfig
317+
includeErrorDetails bool
315318
}
316319

317320
type mvccTSKey struct{}
@@ -328,7 +331,10 @@ func (b *kafkaBuffer) Append(ctx context.Context, key []byte, value []byte, attr
328331
headers = append(headers, kgo.RecordHeader{Key: k, Value: v})
329332
}
330333

331-
rctx := context.WithValue(ctx, mvccTSKey{}, attrs.mvcc)
334+
var rctx context.Context
335+
if b.includeErrorDetails {
336+
rctx = context.WithValue(ctx, mvccTSKey{}, attrs.mvcc)
337+
}
332338

333339
b.messages = append(b.messages, &kgo.Record{Key: key, Value: value, Topic: b.topic, Headers: headers, Context: rctx})
334340
b.byteCount += len(value)

pkg/ccl/changefeedccl/sink_kafka_v2_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,14 @@ func TestKafkaSinkClientV2_Resize(t *testing.T) {
114114
defer leaktest.AfterTest(t)()
115115
defer log.Scope(t).Close(t)
116116

117-
setup := func(t *testing.T, canResize bool) (*kafkaSinkV2Fx, SinkPayload, []any) {
117+
setup := func(t *testing.T, canResize bool, errorDetails bool) (*kafkaSinkV2Fx, SinkPayload, []any) {
118118
fx := newKafkaSinkV2Fx(t, withSettings(func(settings *cluster.Settings) {
119119
if canResize {
120120
changefeedbase.BatchReductionRetryEnabled.Override(context.Background(), &settings.SV, true)
121121
}
122+
if errorDetails {
123+
changefeedbase.KafkaV2ErrorDetailsEnabled.Override(context.Background(), &settings.SV, true)
124+
}
122125
}))
123126
defer fx.close()
124127

@@ -144,14 +147,14 @@ func TestKafkaSinkClientV2_Resize(t *testing.T) {
144147
}
145148

146149
t.Run("resize disabled", func(t *testing.T) {
147-
fx, payload, payloadAnys := setup(t, false)
150+
fx, payload, payloadAnys := setup(t, false, false)
148151
pr := kgo.ProduceResults{kgo.ProduceResult{Err: fmt.Errorf("..: %w", kerr.MessageTooLarge)}}
149152
fx.kc.EXPECT().ProduceSync(fx.ctx, payloadAnys...).Times(1).Return(pr)
150153
require.Error(t, fx.sink.Flush(fx.ctx, payload))
151154
})
152155

153156
t.Run("resize enabled and it keeps failing", func(t *testing.T) {
154-
fx, payload, payloadAnys := setup(t, true)
157+
fx, payload, payloadAnys := setup(t, true, true)
155158

156159
pr := kgo.ProduceResults{kgo.ProduceResult{Err: fmt.Errorf("..: %w", kerr.MessageTooLarge)}}
157160
// it should keep splitting it in two until it hits size=1
@@ -176,7 +179,7 @@ func TestKafkaSinkClientV2_Resize(t *testing.T) {
176179
})
177180

178181
t.Run("resize enabled and it gets everything", func(t *testing.T) {
179-
fx, payload, payloadAnys := setup(t, true)
182+
fx, payload, payloadAnys := setup(t, true, true)
180183

181184
prErr := kgo.ProduceResults{kgo.ProduceResult{Err: fmt.Errorf("..: %w", kerr.MessageTooLarge)}}
182185
prOk := kgo.ProduceResults{}

0 commit comments

Comments
 (0)