Skip to content

Commit 06e2329

Browse files
committed
changefeedccl: include key, size, and mvcc timestamp in Kafka v2
message too large errors Kafka v1 error messages for oversized messages included the message key and size, which made it easier to identify problematic data. Kafka v2 was missing this information. This change restores that missing context. It modifies the Append and Flush method signatures to pass context, allowing the MVCC timestamp to be attached to each Kafka message. The error message for MESSAGE_TOO_LARGE errors is also updated to include: - The message key - The combined size of the key and value - The MVCC timestamp This helps users identify what data could not be delivered. Fixes #144994 Epic: CRDB-49646 Release note (general change): Kafka v2 changefeed sinks now include the message key, size, and MVCC timestamp in message too large error logs.
1 parent 4eb8e22 commit 06e2329

File tree

5 files changed

+49
-13
lines changed

5 files changed

+49
-13
lines changed

pkg/ccl/changefeedccl/batching_sink.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type SinkClient interface {
4545
// BatchBuffer is an interface to aggregate KVs into a payload that can be sent
4646
// to the sink.
4747
type BatchBuffer interface {
48-
Append(key []byte, value []byte, attributes attributes)
48+
Append(ctx context.Context, key []byte, value []byte, attributes attributes)
4949
ShouldFlush() bool
5050

5151
// Once all data has been Append'ed, Close can be called to return a finalized
@@ -103,6 +103,7 @@ type flushReq struct {
103103
type attributes struct {
104104
tableName string
105105
headers map[string][]byte
106+
mvcc hlc.Timestamp
106107
}
107108

108109
type rowEvent struct {
@@ -303,14 +304,15 @@ func hashToInt(h hash.Hash32, buf []byte) int {
303304
}
304305

305306
// Append adds the contents of a kvEvent to the batch, merging its alloc pool.
306-
func (sb *sinkBatch) Append(e *rowEvent) {
307+
func (sb *sinkBatch) Append(ctx context.Context, e *rowEvent) {
307308
if sb.isEmpty() {
308309
sb.bufferTime = timeutil.Now()
309310
}
310311

311-
sb.buffer.Append(e.key, e.val, attributes{
312+
sb.buffer.Append(ctx, e.key, e.val, attributes{
312313
tableName: e.topicDescriptor.GetTableName(),
313314
headers: e.headers,
315+
mvcc: e.mvcc,
314316
})
315317

316318
sb.keys.Add(hashToInt(sb.hasher, e.key))
@@ -502,7 +504,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
502504
topicBatches[topic] = batchBuffer
503505
}
504506

505-
batchBuffer.Append(r)
507+
batchBuffer.Append(ctx, r)
506508
if s.knobs.OnAppend != nil {
507509
s.knobs.OnAppend(r)
508510
}

pkg/ccl/changefeedccl/sink_kafka_v2.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2222
"github.com/cockroachdb/cockroach/pkg/util/admission"
2323
"github.com/cockroachdb/cockroach/pkg/util/cidr"
24+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2425
"github.com/cockroachdb/cockroach/pkg/util/log"
2526
"github.com/cockroachdb/cockroach/pkg/util/retry"
2627
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
@@ -164,6 +165,18 @@ func (k *kafkaSinkClientV2) Flush(ctx context.Context, payload SinkPayload) (ret
164165
}
165166
return nil
166167
} else {
168+
if len(msgs) == 1 && errors.Is(err, kerr.MessageTooLarge) {
169+
msg := msgs[0]
170+
mvccVal := msg.Context.Value(mvccTSKey{})
171+
var ts hlc.Timestamp
172+
if mvccVal != nil {
173+
ts = mvccVal.(hlc.Timestamp)
174+
}
175+
err = errors.Wrapf(err,
176+
"Kafka message too large: key=%s size=%d mvcc=%s",
177+
string(msg.Key), len(msg.Key)+len(msg.Value), ts,
178+
)
179+
}
167180
return err
168181
}
169182
}
@@ -301,7 +314,9 @@ type kafkaBuffer struct {
301314
batchCfg sinkBatchConfig
302315
}
303316

304-
func (b *kafkaBuffer) Append(key []byte, value []byte, attrs attributes) {
317+
type mvccTSKey struct{}
318+
319+
func (b *kafkaBuffer) Append(ctx context.Context, key []byte, value []byte, attrs attributes) {
305320
// HACK: kafka sink v1 encodes nil keys as sarama.ByteEncoder(key) which is != nil, and unit tests rely on this.
306321
// So do something equivalent.
307322
if key == nil {
@@ -313,7 +328,9 @@ func (b *kafkaBuffer) Append(key []byte, value []byte, attrs attributes) {
313328
headers = append(headers, kgo.RecordHeader{Key: k, Value: v})
314329
}
315330

316-
b.messages = append(b.messages, &kgo.Record{Key: key, Value: value, Topic: b.topic, Headers: headers})
331+
rctx := context.WithValue(ctx, mvccTSKey{}, attrs.mvcc)
332+
333+
b.messages = append(b.messages, &kgo.Record{Key: key, Value: value, Topic: b.topic, Headers: headers, Context: rctx})
317334
b.byteCount += len(value)
318335
}
319336

pkg/ccl/changefeedccl/sink_kafka_v2_test.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/mocks"
2121
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2222
"github.com/cockroachdb/cockroach/pkg/testutils"
23+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2324
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2425
"github.com/cockroachdb/cockroach/pkg/util/log"
2526
"github.com/cockroachdb/cockroach/pkg/util/randutil"
@@ -99,7 +100,7 @@ func TestKafkaSinkClientV2_Basic(t *testing.T) {
99100
buf := fx.sink.MakeBatchBuffer("t")
100101
keys := []string{"k1", "k2", "k3"}
101102
for i, key := range keys {
102-
buf.Append([]byte(key), []byte(strconv.Itoa(i)), attributes{})
103+
buf.Append(context.Background(), []byte(key), []byte(strconv.Itoa(i)), attributes{})
103104
}
104105
payload, err := buf.Close()
105106
require.NoError(t, err)
@@ -123,7 +124,13 @@ func TestKafkaSinkClientV2_Resize(t *testing.T) {
123124

124125
buf := fx.sink.MakeBatchBuffer("t")
125126
for i := range 100 {
126-
buf.Append([]byte("k1"), []byte(strconv.Itoa(i)), attributes{})
127+
buf.Append(context.Background(),
128+
[]byte("k1"),
129+
[]byte(strconv.Itoa(i)),
130+
attributes{
131+
mvcc: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()},
132+
},
133+
)
127134
}
128135
payload, err := buf.Close()
129136
require.NoError(t, err)
@@ -157,7 +164,15 @@ func TestKafkaSinkClientV2_Resize(t *testing.T) {
157164
fx.kc.EXPECT().ProduceSync(fx.ctx, payloadAnys[:3]...).Times(1).Return(pr),
158165
fx.kc.EXPECT().ProduceSync(fx.ctx, payloadAnys[:1]...).Times(1).Return(pr),
159166
)
160-
require.Error(t, fx.sink.Flush(fx.ctx, payload))
167+
168+
err := fx.sink.Flush(fx.ctx, payload)
169+
require.Error(t, err)
170+
171+
// Validate that error includes key, size, and mvcc
172+
errStr := err.Error()
173+
require.Contains(t, errStr, "k1", "error should include key")
174+
require.Regexp(t, `size=\d+`, errStr, "error should include size")
175+
require.Regexp(t, `mvcc=\d+`, errStr, "error should include mvcc timestamp")
161176
})
162177

163178
t.Run("resize enabled and it gets everything", func(t *testing.T) {
@@ -556,7 +571,7 @@ func TestKafkaSinkClientV2_ErrorsEventually(t *testing.T) {
556571
defer fx.close()
557572

558573
buf := fx.sink.MakeBatchBuffer("t")
559-
buf.Append([]byte("k1"), []byte("v1"), attributes{})
574+
buf.Append(context.Background(), []byte("k1"), []byte("v1"), attributes{})
560575
payload, err := buf.Close()
561576
require.NoError(t, err)
562577

pkg/ccl/changefeedccl/sink_pubsub_v2.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,9 @@ type pubsubBuffer struct {
238238
var _ BatchBuffer = (*pubsubBuffer)(nil)
239239

240240
// Append implements the BatchBuffer interface
241-
func (psb *pubsubBuffer) Append(key []byte, value []byte, attributes attributes) {
241+
func (psb *pubsubBuffer) Append(
242+
ctx context.Context, key []byte, value []byte, attributes attributes,
243+
) {
242244
var content []byte
243245
switch psb.sc.format {
244246
case changefeedbase.OptFormatJSON:

pkg/ccl/changefeedccl/sink_webhook_v2.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ type webhookCSVBuffer struct {
356356
var _ BatchBuffer = (*webhookCSVBuffer)(nil)
357357

358358
// Append implements the BatchBuffer interface.
359-
func (cb *webhookCSVBuffer) Append(key []byte, value []byte, _ attributes) {
359+
func (cb *webhookCSVBuffer) Append(ctx context.Context, key []byte, value []byte, _ attributes) {
360360
cb.bytes = append(cb.bytes, value...)
361361
cb.messageCount += 1
362362
}
@@ -380,7 +380,7 @@ type webhookJSONBuffer struct {
380380
var _ BatchBuffer = (*webhookJSONBuffer)(nil)
381381

382382
// Append implements the BatchBuffer interface.
383-
func (jb *webhookJSONBuffer) Append(key []byte, value []byte, _ attributes) {
383+
func (jb *webhookJSONBuffer) Append(ctx context.Context, key []byte, value []byte, _ attributes) {
384384
jb.messages = append(jb.messages, value)
385385
jb.numBytes += len(value)
386386
}

0 commit comments

Comments
 (0)