Skip to content

Commit a7dbc81

Browse files
committed
changefeedccl: gate Kafka v2 message too large error detail behind
cluster setting A recent change added detailed logging for Kafka v2 changefeed messages that exceed the broker's size limit. These logs now include the message key, size, and MVCC timestamp to aid in debugging. To make this safe for backporting, the behavior is now gated behind the cluster setting: changefeed.kafka_sink.log_message_too_large_details.enabled In the main branch, this setting defaults to true to preserve the enhanced observability. In release branch backports, it will default to false. When enabled, the log will include: - The key of the offending message - Combined key + value size - MVCC timestamp When disabled, the log reverts to the previous, minimal format. Release note (general change): Kafka v2 changefeed sinks now support a cluster setting that enables detailed error logging for messages exceeding Kafka v2 size limit.
1 parent 90ae675 commit a7dbc81

File tree

4 files changed

+26
-7
lines changed

4 files changed

+26
-7
lines changed

docs/generated/settings/settings-for-tenants.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consu
1818
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled application
1919
changefeed.fast_gzip.enabled boolean true use fast gzip implementation application
2020
changefeed.span_checkpoint.lag_threshold (alias: changefeed.frontier_highwater_lag_checkpoint_threshold) duration 10m0s the amount of time a changefeed's lagging (slowest) spans must lag behind its leading (fastest) spans before a span-level checkpoint to save leading span progress is written; if 0, span-level checkpoints due to lagging spans is disabled application
21+
changefeed.kafka_v2_error_details.enabled boolean true if enabled, Kafka v2 sinks will include the message key, size, and MVCC timestamp in message too large errors application
2122
changefeed.memory.per_changefeed_limit byte size 512 MiB controls amount of data that can be buffered per changefeed application
2223
changefeed.resolved_timestamp.min_update_interval (alias: changefeed.min_highwater_advance) duration 0s minimum amount of time that must have elapsed since the last time a changefeed's resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress application
2324
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds application

docs/generated/settings/settings.html

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
<tr><td><div id="setting-changefeed-event-consumer-workers" class="anchored"><code>changefeed.event_consumer_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
2424
<tr><td><div id="setting-changefeed-fast-gzip-enabled" class="anchored"><code>changefeed.fast_gzip.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
2525
<tr><td><div id="setting-changefeed-frontier-highwater-lag-checkpoint-threshold" class="anchored"><code>changefeed.span_checkpoint.lag_threshold<br />(alias: changefeed.frontier_highwater_lag_checkpoint_threshold)</code></div></td><td>duration</td><td><code>10m0s</code></td><td>the amount of time a changefeed&#39;s lagging (slowest) spans must lag behind its leading (fastest) spans before a span-level checkpoint to save leading span progress is written; if 0, span-level checkpoints due to lagging spans is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
26+
<tr><td><div id="setting-changefeed-kafka-v2-error-details-enabled" class="anchored"><code>changefeed.kafka_v2_error_details.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled, Kafka v2 sinks will include the message key, size, and MVCC timestamp in message too large errors</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
2627
<tr><td><div id="setting-changefeed-memory-per-changefeed-limit" class="anchored"><code>changefeed.memory.per_changefeed_limit</code></div></td><td>byte size</td><td><code>512 MiB</code></td><td>controls amount of data that can be buffered per changefeed</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
2728
<tr><td><div id="setting-changefeed-min-highwater-advance" class="anchored"><code>changefeed.resolved_timestamp.min_update_interval<br />(alias: changefeed.min_highwater_advance)</code></div></td><td>duration</td><td><code>0s</code></td><td>minimum amount of time that must have elapsed since the last time a changefeed&#39;s resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
2829
<tr><td><div id="setting-changefeed-node-throttle-config" class="anchored"><code>changefeed.node_throttle_config</code></div></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td><td>Serverless/Dedicated/Self-Hosted</td></tr>

pkg/ccl/changefeedccl/changefeedbase/settings.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,3 +369,14 @@ var RetryBackoffReset = settings.RegisterDurationSettingWithExplicitUnit(
369369
10*time.Minute, /* defaultValue */
370370
settings.DurationInRange(1*time.Second, 1*time.Hour),
371371
)
372+
373+
// KafkaV2IncludeErrorDetails enables detailed error messages for Kafka v2 sinks
374+
// when message_too_large errors occur. This includes the message key, size,
375+
// and MVCC timestamp in the error.
376+
var KafkaV2ErrorDetailsEnabled = settings.RegisterBoolSetting(
377+
settings.ApplicationLevel,
378+
"changefeed.kafka_v2_error_details.enabled",
379+
"if enabled, Kafka v2 sinks will include the message key, size, and MVCC timestamp in message too large errors",
380+
true,
381+
settings.WithPublic,
382+
)

pkg/ccl/changefeedccl/sink_kafka_v2.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@ type kafkaSinkClientV2 struct {
4141
client KafkaClientV2
4242
adminClient KafkaAdminClientV2
4343

44-
knobs kafkaSinkV2Knobs
45-
canTryResizing bool
46-
recordResize func(numRecords int64)
44+
knobs kafkaSinkV2Knobs
45+
canTryResizing bool
46+
includeErrorDetails bool
47+
recordResize func(numRecords int64)
4748

4849
topicsForConnectionCheck []string
4950

@@ -124,6 +125,7 @@ func newKafkaSinkClientV2(
124125
knobs: knobs,
125126
batchCfg: batchCfg,
126127
canTryResizing: changefeedbase.BatchReductionRetryEnabled.Get(&settings.SV),
128+
includeErrorDetails: changefeedbase.KafkaV2ErrorDetailsEnabled.Get(&settings.SV),
127129
recordResize: recordResize,
128130
topicsForConnectionCheck: topicsForConnectionCheck,
129131
}
@@ -165,7 +167,7 @@ func (k *kafkaSinkClientV2) Flush(ctx context.Context, payload SinkPayload) (ret
165167
}
166168
return nil
167169
} else {
168-
if len(msgs) == 1 && errors.Is(err, kerr.MessageTooLarge) {
170+
if len(msgs) == 1 && errors.Is(err, kerr.MessageTooLarge) && k.includeErrorDetails {
169171
msg := msgs[0]
170172
mvccVal := msg.Context.Value(mvccTSKey{})
171173
var ts hlc.Timestamp
@@ -273,7 +275,7 @@ func (k *kafkaSinkClientV2) maybeUpdateTopicPartitions(
273275

274276
// MakeBatchBuffer implements SinkClient.
275277
func (k *kafkaSinkClientV2) MakeBatchBuffer(topic string) BatchBuffer {
276-
return &kafkaBuffer{topic: topic, batchCfg: k.batchCfg}
278+
return &kafkaBuffer{topic: topic, batchCfg: k.batchCfg, includeErrorDetails: k.includeErrorDetails}
277279
}
278280

279281
func (k *kafkaSinkClientV2) shouldTryResizing(err error, msgs []*kgo.Record) bool {
@@ -311,7 +313,8 @@ type kafkaBuffer struct {
311313
messages []*kgo.Record
312314
byteCount int
313315

314-
batchCfg sinkBatchConfig
316+
batchCfg sinkBatchConfig
317+
includeErrorDetails bool
315318
}
316319

317320
type mvccTSKey struct{}
@@ -328,7 +331,10 @@ func (b *kafkaBuffer) Append(ctx context.Context, key []byte, value []byte, attr
328331
headers = append(headers, kgo.RecordHeader{Key: k, Value: v})
329332
}
330333

331-
rctx := context.WithValue(ctx, mvccTSKey{}, attrs.mvcc)
334+
var rctx context.Context
335+
if b.includeErrorDetails {
336+
rctx = context.WithValue(ctx, mvccTSKey{}, attrs.mvcc)
337+
}
332338

333339
b.messages = append(b.messages, &kgo.Record{Key: key, Value: value, Topic: b.topic, Headers: headers, Context: rctx})
334340
b.byteCount += len(value)

0 commit comments

Comments
 (0)