|
36 | 36 | import org.apache.kafka.common.config.ConfigResource; |
37 | 37 | import org.apache.kafka.common.serialization.StringDeserializer; |
38 | 38 | import org.apache.kafka.common.serialization.StringSerializer; |
| 39 | +import org.awaitility.Awaitility; |
39 | 40 | import org.jspecify.annotations.Nullable; |
40 | 41 | import org.junit.jupiter.api.Test; |
41 | 42 |
|
|
44 | 45 | import org.springframework.context.annotation.Configuration; |
45 | 46 | import org.springframework.kafka.annotation.EnableKafka; |
46 | 47 | import org.springframework.kafka.annotation.KafkaListener; |
| 48 | +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; |
47 | 49 | import org.springframework.kafka.config.ShareKafkaListenerContainerFactory; |
48 | 50 | import org.springframework.kafka.core.DefaultKafkaProducerFactory; |
49 | 51 | import org.springframework.kafka.core.DefaultShareConsumerFactory; |
@@ -87,6 +89,9 @@ class ShareKafkaListenerIntegrationTests { |
87 | 89 | @Autowired |
88 | 90 | KafkaTemplate<String, String> kafkaTemplate; |
89 | 91 |
|
| 92 | + @Autowired |
| 93 | + KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; |
| 94 | + |
90 | 95 | @Test |
91 | 96 | void shouldSupportBasicShareKafkaListener() throws Exception { |
92 | 97 | final String topic = "share-listener-basic-test"; |
@@ -147,11 +152,15 @@ void shouldSupportAcknowledgingShareConsumerAwareListener() throws Exception { |
147 | 152 | final String groupId = "share-ack-consumer-aware-group"; |
148 | 153 | setShareAutoOffsetResetEarliest(this.broker.getBrokersAsString(), groupId); |
149 | 154 |
|
| 155 | + // wait for containers before producing so the share consumer is subscribed. |
| 156 | + awaitRunningShareListenerContainers(); |
| 157 | + |
150 | 158 | // Send test message |
151 | 159 | kafkaTemplate.send(topic, "ack-consumer-aware-message"); |
| 160 | + kafkaTemplate.flush(); |
152 | 161 |
|
153 | 162 | // Wait for processing |
154 | | - assertThat(AckShareConsumerAwareTestListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); |
| 163 | + assertThat(AckShareConsumerAwareTestListener.latch.await(30, TimeUnit.SECONDS)).isTrue(); |
155 | 164 | assertThat(AckShareConsumerAwareTestListener.received.get()).isEqualTo("ack-consumer-aware-message"); |
156 | 165 | assertThat(AckShareConsumerAwareTestListener.consumerReceived.get()).isNotNull(); |
157 | 166 | assertThat(AckShareConsumerAwareTestListener.acknowledgmentReceived.get()).isNotNull(); |
@@ -243,6 +252,20 @@ private boolean isAcknowledgedInternal(ShareAcknowledgment ack) { |
243 | 252 | } |
244 | 253 | } |
245 | 254 |
|
| 255 | + /** |
| 256 | + * Wait until all listener containers have started (needed when this test runs before others). |
| 257 | + */ |
| 258 | + private void awaitRunningShareListenerContainers() { |
| 259 | + Awaitility.await() |
| 260 | + .atMost(30, TimeUnit.SECONDS) |
| 261 | + .pollInterval(100, TimeUnit.MILLISECONDS) |
| 262 | + .untilAsserted(() -> { |
| 263 | + assertThat(this.kafkaListenerEndpointRegistry.getListenerContainerIds()).isNotEmpty(); |
| 264 | + this.kafkaListenerEndpointRegistry.getListenerContainers().forEach(container -> |
| 265 | + assertThat(container.isRunning()).isTrue()); |
| 266 | + }); |
| 267 | + } |
| 268 | + |
246 | 269 | @Configuration |
247 | 270 | @EnableKafka |
248 | 271 | static class TestConfig { |
|
0 commit comments