Skip to content

Commit af481ec

Browse files
committed
roachtest: Test message too large error formatting for kafka v2 sinks
We recently updated the message too large error on kafka v2 sinks to include additional details. This test ensures that when this error is triggered, the error string is formatted correctly and contains the key, size and mvcc timestamp. The roachtest sets up Kafka sinks, creates a table and a changefeed, inserts a value large enough to trigger a message too large error, and checks the logs to confirm the error message is formatted correctly. To access the logs, the FetchLogs method was added to the Cluster interface. Fixes #148000 Epic CRDB-51353 Release note: None
1 parent 06e2329 commit af481ec

File tree

3 files changed

+77
-0
lines changed

3 files changed

+77
-0
lines changed

pkg/cmd/roachtest/cluster/cluster_interface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ type Cluster interface {
168168
ctx context.Context, l *logger.Logger, src, dest, branch string, node option.NodeListOption,
169169
) error
170170

171+
FetchLogs(ctx context.Context, l *logger.Logger) error
171172
FetchTimeseriesData(ctx context.Context, l *logger.Logger) error
172173
FetchDebugZip(ctx context.Context, l *logger.Logger, dest string, opts ...option.Option) error
173174
RefetchCertsFromNode(ctx context.Context, node int) error

pkg/cmd/roachtest/clusterstats/mock_cluster_generated_test.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/cmd/roachtest/tests/cdc.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1640,6 +1640,58 @@ highwaterLoop:
16401640
}
16411641
}
16421642

1643+
func runMessageTooLarge(ctx context.Context, t test.Test, c cluster.Cluster) {
1644+
ct := newCDCTester(ctx, t, c)
1645+
db := ct.DB()
1646+
tdb := sqlutils.MakeSQLRunner(db)
1647+
1648+
settings := []string{
1649+
`SET CLUSTER SETTING changefeed.new_kafka_sink.enabled = true`,
1650+
`SET CLUSTER SETTING kv.rangefeed.enabled = true`,
1651+
`SET CLUSTER SETTING changefeed.batch_reduction_retry_enabled = true`,
1652+
}
1653+
for _, stmt := range settings {
1654+
if _, err := db.ExecContext(ctx, stmt); err != nil {
1655+
t.Fatalf("failed to run %q: %v", stmt, err)
1656+
}
1657+
}
1658+
tdb.Exec(t, `CREATE TABLE foo (id INT PRIMARY KEY, val STRING)`)
1659+
1660+
ct.newChangefeed(feedArgs{
1661+
sinkType: kafkaSink,
1662+
targets: []string{"foo"},
1663+
opts: map[string]string{
1664+
"min_checkpoint_frequency": "'2s'",
1665+
"kafka_sink_config": `'{"Flush": {"Messages": 1, "Frequency": "1s"}}'`,
1666+
},
1667+
})
1668+
1669+
buf := make([]byte, 1_048_600)
1670+
for i := range buf {
1671+
buf[i] = 'b'
1672+
}
1673+
tdb.Exec(t, `INSERT INTO foo VALUES (1, $1)`, string(buf))
1674+
1675+
t.Status("inserting large string to trigger Kafka message-too-large error")
1676+
time.Sleep(30 * time.Second)
1677+
1678+
if err := c.FetchLogs(ctx, t.L()); err != nil {
1679+
t.L().PrintfCtx(ctx, "could not fetch logs mid‐run: %v", err)
1680+
}
1681+
ct.Close()
1682+
1683+
logPath := filepath.Join(t.ArtifactsDir(), "logs", "1.cockroach.log")
1684+
logs, err := os.ReadFile(logPath)
1685+
if err != nil {
1686+
t.Fatalf("failed to read logs: %v at %s", err, logPath)
1687+
}
1688+
logStr := string(logs)
1689+
require.Contains(t, logStr, "Kafka message too large", "expected message too large error in logs")
1690+
require.Regexp(t, `key=[^ ]+`, logStr, "log should include key")
1691+
require.Regexp(t, `size=\d+`, logStr, "log should include size")
1692+
require.Regexp(t, `mvcc=[\d\.]+,\d+`, logStr, "log should include mvcc")
1693+
}
1694+
16431695
func registerCDC(r registry.Registry) {
16441696
r.Add(registry.TestSpec{
16451697
Name: "cdc/initial-scan-only",
@@ -2610,6 +2662,16 @@ func registerCDC(r registry.Registry) {
26102662
ct.waitForWorkload()
26112663
},
26122664
})
2665+
r.Add(registry.TestSpec{
2666+
Name: "cdc/message-too-large-error",
2667+
Owner: registry.OwnerCDC,
2668+
Cluster: r.MakeClusterSpec(3),
2669+
Leases: registry.MetamorphicLeases,
2670+
Suites: registry.Suites(registry.Nightly),
2671+
Timeout: 15 * time.Minute,
2672+
CompatibleClouds: registry.AllClouds,
2673+
Run: runMessageTooLarge,
2674+
})
26132675
}
26142676

26152677
const (

0 commit comments

Comments
 (0)