Skip to content

Commit 87fb29a

Browse files
committed
changefeedccl: fix mock kafka server shutdown
Fix a test timeout due to a batching sink worker being blocked on Flush after the context was cancelled. Fixes: #144213 Release note: None
1 parent e1a9fa7 commit 87fb29a

File tree

1 file changed

+5
-1
lines changed

1 file changed

+5
-1
lines changed

pkg/ccl/changefeedccl/testfeed_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1876,12 +1876,16 @@ func (s *fakeKafkaSinkV2) Dial() error {
18761876
})
18771877
}
18781878

1879-
s.feedCh <- &sarama.ProducerMessage{
1879+
select {
1880+
case <-ctx.Done():
1881+
return kgo.ProduceResults{kgo.ProduceResult{Err: ctx.Err()}}
1882+
case s.feedCh <- &sarama.ProducerMessage{
18801883
Topic: m.Topic,
18811884
Key: key,
18821885
Value: sarama.ByteEncoder(m.Value),
18831886
Partition: m.Partition,
18841887
Headers: headers,
1888+
}:
18851889
}
18861890
}
18871891
return nil

0 commit comments

Comments
 (0)