Skip to content

Commit 4f1e340

Browse files
changefeedccl: prevent deadlock in TestChangefeedKafkaMessageTooLarge
Previously, this test would deadlock due to kafka retrying messages too many times. These messages are stored in a buffer of size 1024 created by the CDC testing infra: https://github.com/cockroachdb/cockroach/blob/5c3f96d38cdc3a2d953ca3ffb1e39e97d7e5110e/pkg/ccl/changefeedccl/testfeed_test.go#L1819 The test asserts that 2000 messages pass through the buffer. When the test finishes, it stops reading from the buffer. The problem is that due to retries, there may be more messages sent to the buffer than that are read out of the buffer. Even after the 2000 messages are read and the test is shutting down, the sink may be blocked trying to put resolved messages (plus retries) in the buffer. If this happens, the changefeed resumer (same goroutine as the kafka sink) gets blocked and does not terminate when the job is cancelled at the end of the test. This change caps the number of retries at 200 for this test, so there should be no more than 200 extra messages plus a few resolved messages during this test. This is far less than the buffer size of 1024. See detailed explanation in cockroachdb#107591. Fixes: cockroachdb#107591 Epic: none Release note: None
1 parent 0742137 commit 4f1e340

File tree

1 file changed

+3
-1
lines changed

1 file changed

+3
-1
lines changed

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7702,8 +7702,10 @@ func TestChangefeedKafkaMessageTooLarge(t *testing.T) {
77027702

77037703
rnd, _ := randutil.NewPseudoRand()
77047704

7705+
maxFailures := int32(200)
7706+
var numFailures atomic.Int32
77057707
knobs.kafkaInterceptor = func(m *sarama.ProducerMessage, client kafkaClient) error {
7706-
if client.Config().Producer.Flush.MaxMessages > 1 && rnd.Int()%5 == 0 {
7708+
if client.Config().Producer.Flush.MaxMessages > 1 && numFailures.Add(1) < maxFailures && rnd.Int()%10 == 0 {
77077709
return sarama.ErrMessageSizeTooLarge
77087710
}
77097711
return nil

0 commit comments

Comments
 (0)