|
1 | 1 | /* |
2 | | - * Copyright 2020 the original author or authors. |
| 2 | + * Copyright 2020-2021 the original author or authors. |
3 | 3 | * |
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | 5 | * you may not use this file except in compliance with the License. |
|
17 | 17 | package org.springframework.kafka.listener; |
18 | 18 |
|
19 | 19 | import static org.assertj.core.api.Assertions.assertThat; |
| 20 | +import static org.mockito.ArgumentMatchers.any; |
| 21 | +import static org.mockito.ArgumentMatchers.isNull; |
| 22 | +import static org.mockito.BDDMockito.given; |
| 23 | +import static org.mockito.Mockito.mock; |
20 | 24 |
|
21 | 25 | import java.util.List; |
22 | 26 | import java.util.Map; |
|
28 | 32 | import org.apache.kafka.clients.consumer.Consumer; |
29 | 33 | import org.apache.kafka.clients.consumer.ConsumerConfig; |
30 | 34 | import org.apache.kafka.clients.consumer.ConsumerRecord; |
| 35 | +import org.apache.kafka.clients.consumer.ConsumerRecords; |
31 | 36 | import org.apache.kafka.common.TopicPartition; |
32 | 37 | import org.junit.jupiter.api.BeforeAll; |
33 | 38 | import org.junit.jupiter.api.Test; |
34 | 39 |
|
| 40 | +import org.springframework.kafka.core.ConsumerFactory; |
35 | 41 | import org.springframework.kafka.core.DefaultKafkaConsumerFactory; |
36 | 42 | import org.springframework.kafka.core.DefaultKafkaProducerFactory; |
37 | 43 | import org.springframework.kafka.core.KafkaOperations; |
38 | 44 | import org.springframework.kafka.core.KafkaTemplate; |
39 | 45 | import org.springframework.kafka.event.ConsumerStoppedEvent; |
| 46 | +import org.springframework.kafka.support.TopicPartitionOffset; |
40 | 47 | import org.springframework.kafka.test.EmbeddedKafkaBroker; |
41 | 48 | import org.springframework.kafka.test.condition.EmbeddedKafkaCondition; |
42 | 49 | import org.springframework.kafka.test.context.EmbeddedKafka; |
@@ -206,4 +213,31 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) { |
206 | 213 | assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue(); |
207 | 214 | } |
208 | 215 |
|
| 216 | + @SuppressWarnings("unchecked") |
| 217 | + @Test |
| 218 | + void consumerEx() throws InterruptedException { |
| 219 | + ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class); |
| 220 | + Consumer<Integer, String> consumer = mock(Consumer.class); |
| 221 | + given(consumer.poll(any())).willThrow(new RuntimeException("test")); |
| 222 | + given(cf.createConsumer(any(), any(), isNull(), any())).willReturn(consumer); |
| 223 | + ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset("foo", 0)); |
| 224 | + KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, |
| 225 | + containerProps); |
| 226 | + CountDownLatch called = new CountDownLatch(1); |
| 227 | + container.setBatchErrorHandler(new RetryingBatchErrorHandler() { |
| 228 | + |
| 229 | + @Override |
| 230 | + public void handle(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer, |
| 231 | + MessageListenerContainer container, Runnable invokeListener) { |
| 232 | + |
| 233 | + called.countDown(); |
| 234 | + super.handle(thrownException, records, consumer, container, invokeListener); |
| 235 | + } |
| 236 | + }); |
| 237 | + container.setupMessageListener((BatchMessageListener<Integer, String>) (recs -> { })); |
| 238 | + container.start(); |
| 239 | + assertThat(called.await(10, TimeUnit.SECONDS)).isTrue(); |
| 240 | + container.stop(); |
| 241 | + } |
| 242 | + |
209 | 243 | } |
0 commit comments