Skip to content

Commit 9e3bd8c

Browse files
artembilangaryrussell
authored andcommitted
Process errorHandler in class level KafkaListener
The `errorHandler` attributed has been missed from the `KafkaListenerAnnotationBeanPostProcessor.processMultiMethodListeners()` logic. * Move `errorHandler` and `BeanFactory` population into the `processListener()` method in the `KafkaListenerAnnotationBeanPostProcessor` **Cherry-pick to 2.1.x, 2.0.x & 1.3.x** Fix Test
1 parent d9248b6 commit 9e3bd8c

File tree

2 files changed

+24
-8
lines changed

2 files changed

+24
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,6 @@ private void processMultiMethodListeners(Collection<KafkaListener> classLevelLis
333333
for (KafkaListener classLevelListener : classLevelListeners) {
334334
MultiMethodKafkaListenerEndpoint<K, V> endpoint = new MultiMethodKafkaListenerEndpoint<>(checkedMethods,
335335
bean);
336-
endpoint.setBeanFactory(this.beanFactory);
337336
processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName);
338337
}
339338
}
@@ -342,11 +341,6 @@ protected void processKafkaListener(KafkaListener kafkaListener, Method method,
342341
Method methodToUse = checkProxy(method, bean);
343342
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
344343
endpoint.setMethod(methodToUse);
345-
endpoint.setBeanFactory(this.beanFactory);
346-
String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
347-
if (StringUtils.hasText(errorHandlerBeanName)) {
348-
endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
349-
}
350344
processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
351345
}
352346

@@ -413,6 +407,11 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
413407
}
414408
}
415409

410+
endpoint.setBeanFactory(this.beanFactory);
411+
String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
412+
if (StringUtils.hasText(errorHandlerBeanName)) {
413+
endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
414+
}
416415
this.registrar.registerEndpoint(endpoint, factory);
417416
}
418417

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,8 @@ public void testMulti() throws Exception {
255255
template.flush();
256256
assertThat(this.multiListener.latch1.await(60, TimeUnit.SECONDS)).isTrue();
257257
assertThat(this.multiListener.latch2.await(60, TimeUnit.SECONDS)).isTrue();
258+
template.send("annotated8", 0, 1, "junk");
259+
assertThat(this.multiListener.errorLatch.await(60, TimeUnit.SECONDS)).isTrue();
258260
}
259261

260262
@Test
@@ -607,6 +609,14 @@ public KafkaListenerErrorHandler consumeException(Listener listener) {
607609
};
608610
}
609611

612+
@Bean
613+
public KafkaListenerErrorHandler consumeMultiMethodException(MultiListenerBean listener) {
614+
return (m, e) -> {
615+
listener.errorLatch.countDown();
616+
return null;
617+
};
618+
}
619+
610620
}
611621

612622
static class Listener implements ConsumerSeekAware {
@@ -864,16 +874,23 @@ public CountDownLatch getLatch2() {
864874

865875
}
866876

867-
@KafkaListener(id = "multi", topics = "annotated8")
877+
@KafkaListener(id = "multi", topics = "annotated8", errorHandler = "consumeMultiMethodException")
868878
static class MultiListenerBean {
869879

870880
private final CountDownLatch latch1 = new CountDownLatch(1);
871881

872882
private final CountDownLatch latch2 = new CountDownLatch(1);
873883

884+
private final CountDownLatch errorLatch = new CountDownLatch(1);
885+
874886
@KafkaHandler
875887
public void bar(@Payload String bar) {
876-
latch1.countDown();
888+
if ("junk".equals(bar)) {
889+
throw new RuntimeException("intentional");
890+
}
891+
else {
892+
this.latch1.countDown();
893+
}
877894
}
878895

879896
@KafkaHandler

0 commit comments

Comments
 (0)