@@ -465,7 +465,7 @@ private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object
465465 }
466466
467467 RetryTopicConfigurer .EndpointProcessor endpointProcessor = endpointToProcess ->
468- this .doProcessKafkaListenerAnnotation (endpointToProcess , kafkaListener , bean );
468+ this .processKafkaListenerAnnotationForRetryTopic (endpointToProcess , kafkaListener , bean );
469469
470470 String beanRef = kafkaListener .beanRef ();
471471 this .listenerScope .addListener (beanRef , bean );
@@ -546,28 +546,26 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
546546 this .listenerScope .addListener (beanRef , bean );
547547 }
548548
549- doProcessKafkaListenerAnnotation (endpoint , kafkaListener , bean );
549+ processKafkaListenerAnnotationBeforeRegistration (endpoint , kafkaListener , bean );
550550
551551 String containerFactory = resolve (kafkaListener .containerFactory ());
552552 KafkaListenerContainerFactory <?> listenerContainerFactory = resolveContainerFactory (kafkaListener , containerFactory , beanName );
553553
554554 this .registrar .registerEndpoint (endpoint , listenerContainerFactory );
555555
556- endpoint .setBeanFactory (this .beanFactory );
557- String errorHandlerBeanName = resolveExpressionAsString (kafkaListener .errorHandler (), "errorHandler" );
558- if (StringUtils .hasText (errorHandlerBeanName )) {
559- resolveErrorHandler (endpoint , kafkaListener );
560- }
561- String converterBeanName = resolveExpressionAsString (kafkaListener .contentTypeConverter (), "contentTypeConverter" );
562- if (StringUtils .hasText (converterBeanName )) {
563- resolveContentTypeConverter (endpoint , kafkaListener );
564- }
556+ processKafkaListenerEndpointAfterRegistration (endpoint , kafkaListener );
557+
565558 if (StringUtils .hasText (beanRef )) {
566559 this .listenerScope .removeListener (beanRef );
567560 }
568561 }
569562
570- private void doProcessKafkaListenerAnnotation (MethodKafkaListenerEndpoint <?, ?> endpoint , KafkaListener kafkaListener , Object bean ) {
563+ private void processKafkaListenerAnnotationForRetryTopic (MethodKafkaListenerEndpoint <?, ?> endpoint , KafkaListener kafkaListener , Object bean ) {
564+ processKafkaListenerAnnotationBeforeRegistration (endpoint , kafkaListener , bean );
565+ processKafkaListenerEndpointAfterRegistration (endpoint , kafkaListener );
566+ }
567+
568+ private void processKafkaListenerAnnotationBeforeRegistration (MethodKafkaListenerEndpoint <?, ?> endpoint , KafkaListener kafkaListener , Object bean ) {
571569 endpoint .setBean (bean );
572570 endpoint .setMessageHandlerMethodFactory (this .messageHandlerMethodFactory );
573571 endpoint .setId (getEndpointId (kafkaListener ));
@@ -595,6 +593,18 @@ private void doProcessKafkaListenerAnnotation(MethodKafkaListenerEndpoint<?, ?>
595593 endpoint .setSplitIterables (kafkaListener .splitIterables ());
596594 }
597595
596+ private void processKafkaListenerEndpointAfterRegistration (MethodKafkaListenerEndpoint <?, ?> endpoint , KafkaListener kafkaListener ) {
597+ endpoint .setBeanFactory (this .beanFactory );
598+ String errorHandlerBeanName = resolveExpressionAsString (kafkaListener .errorHandler (), "errorHandler" );
599+ if (StringUtils .hasText (errorHandlerBeanName )) {
600+ resolveErrorHandler (endpoint , kafkaListener );
601+ }
602+ String converterBeanName = resolveExpressionAsString (kafkaListener .contentTypeConverter (), "contentTypeConverter" );
603+ if (StringUtils .hasText (converterBeanName )) {
604+ resolveContentTypeConverter (endpoint , kafkaListener );
605+ }
606+ }
607+
598608 private void resolveErrorHandler (MethodKafkaListenerEndpoint <?, ?> endpoint , KafkaListener kafkaListener ) {
599609 Object errorHandler = resolveExpression (kafkaListener .errorHandler ());
600610 if (errorHandler instanceof KafkaListenerErrorHandler ) {
0 commit comments