|
22 | 22 | import java.util.regex.Pattern; |
23 | 23 |
|
24 | 24 | import org.springframework.beans.BeanUtils; |
| 25 | +import org.springframework.beans.factory.InitializingBean; |
25 | 26 | import org.springframework.context.ApplicationEventPublisher; |
26 | 27 | import org.springframework.context.ApplicationEventPublisherAware; |
27 | 28 | import org.springframework.kafka.core.ConsumerFactory; |
|
38 | 39 | import org.springframework.kafka.support.converter.MessageConverter; |
39 | 40 | import org.springframework.retry.RecoveryCallback; |
40 | 41 | import org.springframework.retry.support.RetryTemplate; |
| 42 | +import org.springframework.util.Assert; |
41 | 43 |
|
42 | 44 | /** |
43 | 45 | * Base {@link KafkaListenerContainerFactory} for Spring's base container implementation. |
|
53 | 55 | * @see AbstractMessageListenerContainer |
54 | 56 | */ |
55 | 57 | public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K, V>, K, V> |
56 | | - implements KafkaListenerContainerFactory<C>, ApplicationEventPublisherAware { |
| 58 | + implements KafkaListenerContainerFactory<C>, ApplicationEventPublisherAware, InitializingBean { |
57 | 59 |
|
58 | 60 | private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null); |
59 | 61 |
|
@@ -251,6 +253,20 @@ public ContainerProperties getContainerProperties() { |
251 | 253 | return this.containerProperties; |
252 | 254 | } |
253 | 255 |
|
| 256 | + @Override |
| 257 | + public void afterPropertiesSet() throws Exception { |
| 258 | + if (this.errorHandler != null) { |
| 259 | + if (Boolean.TRUE.equals(this.batchListener)) { |
| 260 | + Assert.state(this.errorHandler instanceof BatchErrorHandler, |
| 261 | + "The error handler must be a BatchErrorHandler, not " + this.errorHandler.getClass().getName()); |
| 262 | + } |
| 263 | + else { |
| 264 | + Assert.state(this.errorHandler instanceof ErrorHandler, |
| 265 | + "The error handler must be an ErrorHandler, not " + this.errorHandler.getClass().getName()); |
| 266 | + } |
| 267 | + } |
| 268 | + } |
| 269 | + |
254 | 270 | @SuppressWarnings("unchecked") |
255 | 271 | @Override |
256 | 272 | public C createListenerContainer(KafkaListenerEndpoint endpoint) { |
|
0 commit comments