@@ -185,6 +185,7 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>
185185
186186 private String defaultContainerFactoryBeanName = DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME ;
187187
188+ @ Nullable
188189 private ApplicationContext applicationContext ;
189190
190191 private BeanFactory beanFactory ;
@@ -197,6 +198,7 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>
197198
198199 private AnnotationEnhancer enhancer ;
199200
201+ @ Nullable
200202 private RetryTopicConfigurer retryTopicConfigurer ;
201203
202204 @ Override
@@ -273,9 +275,11 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
273275 public synchronized void setBeanFactory (BeanFactory beanFactory ) {
274276 this .beanFactory = beanFactory ;
275277 if (beanFactory instanceof ConfigurableListableBeanFactory clbf ) {
276- this .resolver = clbf .getBeanExpressionResolver ();
277- this .expressionContext = new BeanExpressionContext ((ConfigurableListableBeanFactory ) beanFactory ,
278- this .listenerScope );
278+ BeanExpressionResolver beanExpressionResolver = clbf .getBeanExpressionResolver ();
279+ if (beanExpressionResolver != null ) {
280+ this .resolver = beanExpressionResolver ;
281+ }
282+ this .expressionContext = new BeanExpressionContext (clbf , this .listenerScope );
279283 }
280284 }
281285
@@ -333,9 +337,11 @@ public void afterSingletonsInstantiated() {
333337
334338 // Actually register all listeners
335339 this .registrar .afterPropertiesSet ();
336- Map <String , ContainerGroupSequencer > sequencers =
337- this .applicationContext .getBeansOfType (ContainerGroupSequencer .class , false , false );
338- sequencers .values ().forEach (ContainerGroupSequencer ::initialize );
340+ if (this .applicationContext != null ) {
341+ Map <String , ContainerGroupSequencer > sequencers =
342+ this .applicationContext .getBeansOfType (ContainerGroupSequencer .class , false , false );
343+ sequencers .values ().forEach (ContainerGroupSequencer ::initialize );
344+ }
339345 }
340346
341347 private void buildEnhancer () {
@@ -368,36 +374,36 @@ public Object postProcessAfterInitialization(final Object bean, final String bea
368374 if (!this .nonAnnotatedClasses .contains (bean .getClass ())) {
369375 Class <?> targetClass = AopUtils .getTargetClass (bean );
370376 Collection <KafkaListener > classLevelListeners = findListenerAnnotations (targetClass );
371- final boolean hasClassLevelListeners = !classLevelListeners .isEmpty ();
372- final List <Method > multiMethods = new ArrayList <>();
373377 Map <Method , Set <KafkaListener >> annotatedMethods = MethodIntrospector .selectMethods (targetClass ,
374378 (MethodIntrospector .MetadataLookup <Set <KafkaListener >>) method -> {
375379 Set <KafkaListener > listenerMethods = findListenerAnnotations (method );
376380 return (!listenerMethods .isEmpty () ? listenerMethods : null );
377381 });
378- if (hasClassLevelListeners ) {
379- Set <Method > methodsWithHandler = MethodIntrospector .selectMethods (targetClass ,
380- (ReflectionUtils .MethodFilter ) method ->
381- AnnotationUtils .findAnnotation (method , KafkaHandler .class ) != null );
382- multiMethods .addAll (methodsWithHandler );
383- }
384- if (annotatedMethods .isEmpty () && !hasClassLevelListeners ) {
382+ boolean hasClassLevelListeners = !classLevelListeners .isEmpty ();
383+ boolean hasMethodLevelListeners = !annotatedMethods .isEmpty ();
384+ if (!hasMethodLevelListeners && !hasClassLevelListeners ) {
385385 this .nonAnnotatedClasses .add (bean .getClass ());
386386 this .logger .trace (() -> "No @KafkaListener annotations found on bean type: " + bean .getClass ());
387387 }
388388 else {
389- // Non-empty set of methods
390- for (Map .Entry <Method , Set <KafkaListener >> entry : annotatedMethods .entrySet ()) {
391- Method method = entry .getKey ();
392- for (KafkaListener listener : entry .getValue ()) {
393- processKafkaListener (listener , method , bean , beanName );
389+ if (hasMethodLevelListeners ) {
390+ // Non-empty set of methods
391+ for (Map .Entry <Method , Set <KafkaListener >> entry : annotatedMethods .entrySet ()) {
392+ Method method = entry .getKey ();
393+ for (KafkaListener listener : entry .getValue ()) {
394+ processKafkaListener (listener , method , bean , beanName );
395+ }
394396 }
397+ this .logger .debug (() -> annotatedMethods .size () + " @KafkaListener methods processed on bean '"
398+ + beanName + "': " + annotatedMethods );
399+ }
400+ if (hasClassLevelListeners ) {
401+ Set <Method > methodsWithHandler = MethodIntrospector .selectMethods (targetClass ,
402+ (ReflectionUtils .MethodFilter ) method ->
403+ AnnotationUtils .findAnnotation (method , KafkaHandler .class ) != null );
404+ List <Method > multiMethods = new ArrayList <>(methodsWithHandler );
405+ processMultiMethodListeners (classLevelListeners , multiMethods , targetClass , bean , beanName );
395406 }
396- this .logger .debug (() -> annotatedMethods .size () + " @KafkaListener methods processed on bean '"
397- + beanName + "': " + annotatedMethods );
398- }
399- if (hasClassLevelListeners ) {
400- processMultiMethodListeners (classLevelListeners , multiMethods , bean , beanName );
401407 }
402408 }
403409 return bean ;
@@ -444,30 +450,25 @@ private KafkaListener enhance(AnnotatedElement element, KafkaListener ann) {
444450 }
445451
446452 private synchronized void processMultiMethodListeners (Collection <KafkaListener > classLevelListeners ,
447- List <Method > multiMethods , Object bean , String beanName ) {
453+ List <Method > multiMethods , Class <?> clazz , Object bean , String beanName ) {
448454
449455 List <Method > checkedMethods = new ArrayList <>();
450456 Method defaultMethod = null ;
451457 for (Method method : multiMethods ) {
452458 Method checked = checkProxy (method , bean );
453459 KafkaHandler annotation = AnnotationUtils .findAnnotation (method , KafkaHandler .class );
454460 if (annotation != null && annotation .isDefault ()) {
455- final Method toAssert = defaultMethod ;
461+ Method toAssert = defaultMethod ;
456462 Assert .state (toAssert == null , () -> "Only one @KafkaHandler can be marked 'isDefault', found: "
457- + toAssert .toString () + " and " + method . toString () );
463+ + toAssert .toString () + " and " + method );
458464 defaultMethod = checked ;
459465 }
460466 checkedMethods .add (checked );
461467 }
462468 for (KafkaListener classLevelListener : classLevelListeners ) {
463469 MultiMethodKafkaListenerEndpoint <K , V > endpoint =
464470 new MultiMethodKafkaListenerEndpoint <>(checkedMethods , defaultMethod , bean );
465- String beanRef = classLevelListener .beanRef ();
466- this .listenerScope .addListener (beanRef , bean );
467- endpoint .setId (getEndpointId (classLevelListener ));
468- processListener (endpoint , classLevelListener , bean , beanName , resolveTopics (classLevelListener ),
469- resolveTopicPartitions (classLevelListener ));
470- this .listenerScope .removeListener (beanRef );
471+ processMainAndRetryListeners (classLevelListener , bean , beanName , endpoint , null , clazz );
471472 }
472473 }
473474
@@ -477,39 +478,34 @@ protected synchronized void processKafkaListener(KafkaListener kafkaListener, Me
477478 Method methodToUse = checkProxy (method , bean );
478479 MethodKafkaListenerEndpoint <K , V > endpoint = new MethodKafkaListenerEndpoint <>();
479480 endpoint .setMethod (methodToUse );
481+ processMainAndRetryListeners (kafkaListener , bean , beanName , endpoint , methodToUse , null );
482+ }
483+
484+ private void processMainAndRetryListeners (KafkaListener kafkaListener , Object bean , String beanName ,
485+ MethodKafkaListenerEndpoint <K , V > endpoint , @ Nullable Method methodToUse , @ Nullable Class <?> clazz ) {
480486
481487 String beanRef = kafkaListener .beanRef ();
482488 this .listenerScope .addListener (beanRef , bean );
483489 endpoint .setId (getEndpointId (kafkaListener ));
484490 String [] topics = resolveTopics (kafkaListener );
485491 TopicPartitionOffset [] tps = resolveTopicPartitions (kafkaListener );
486- if (!processMainAndRetryListeners (kafkaListener , bean , beanName , methodToUse , endpoint , topics , tps )) {
492+ if (!processMainAndRetryListeners (kafkaListener , bean , beanName , endpoint , topics , tps , methodToUse , clazz )) {
487493 processListener (endpoint , kafkaListener , bean , beanName , topics , tps );
488494 }
489495 this .listenerScope .removeListener (beanRef );
490496 }
491497
492498 private boolean processMainAndRetryListeners (KafkaListener kafkaListener , Object bean , String beanName ,
493- Method methodToUse , MethodKafkaListenerEndpoint <K , V > endpoint , String [] topics ,
494- TopicPartitionOffset [] tps ) {
495-
496- String [] retryableCandidates = topics ;
497- if (retryableCandidates .length == 0 && tps .length > 0 ) {
498- retryableCandidates = Arrays .stream (tps )
499- .map (tp -> tp .getTopic ())
500- .distinct ()
501- .toList ()
502- .toArray (new String [0 ]);
503- }
499+ MethodKafkaListenerEndpoint <K , V > endpoint , String [] topics , TopicPartitionOffset [] tps ,
500+ @ Nullable Method methodToUse , @ Nullable Class <?> clazz ) {
504501
502+ String [] retryableCandidates = getTopicsFromTopicPartitionOffset (topics , tps );
505503 RetryTopicConfiguration retryTopicConfiguration = new RetryTopicConfigurationProvider (this .beanFactory ,
506504 this .resolver , this .expressionContext )
507- .findRetryConfigurationFor (retryableCandidates , methodToUse , bean );
508-
505+ .findRetryConfigurationFor (retryableCandidates , methodToUse , clazz , bean );
509506 if (retryTopicConfiguration == null ) {
510- String [] candidates = retryableCandidates ;
511507 this .logger .debug (() ->
512- "No retry topic configuration found for topics " + Arrays .toString (candidates ));
508+ "No retry topic configuration found for topics " + Arrays .toString (retryableCandidates ));
513509 return false ;
514510 }
515511
@@ -525,6 +521,18 @@ private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object
525521 return true ;
526522 }
527523
524+ private String [] getTopicsFromTopicPartitionOffset (String [] topics , TopicPartitionOffset [] tps ) {
525+ String [] retryableCandidates = topics ;
526+ if (retryableCandidates .length == 0 && tps .length > 0 ) {
527+ retryableCandidates = Arrays .stream (tps )
528+ .map (TopicPartitionOffset ::getTopic )
529+ .distinct ()
530+ .toList ()
531+ .toArray (new String [0 ]);
532+ }
533+ return retryableCandidates ;
534+ }
535+
528536 private RetryTopicConfigurer getRetryTopicConfigurer () {
529537 if (this .retryTopicConfigurer == null ) {
530538 try {
@@ -737,7 +745,7 @@ private KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListener k
737745 private void resolveContainerPostProcessor (MethodKafkaListenerEndpoint <?, ?> endpoint ,
738746 KafkaListener kafkaListener ) {
739747
740- final String containerPostProcessor = kafkaListener .containerPostProcessor ();
748+ String containerPostProcessor = kafkaListener .containerPostProcessor ();
741749 if (StringUtils .hasText (containerPostProcessor )) {
742750 endpoint .setContainerPostProcessor (this .beanFactory .getBean (containerPostProcessor ,
743751 ContainerPostProcessor .class ));
@@ -804,7 +812,8 @@ private String getEndpointId(KafkaListener kafkaListener) {
804812 }
805813 }
806814
807- private String getEndpointGroupId (KafkaListener kafkaListener , String id ) {
815+ @ Nullable
816+ private String getEndpointGroupId (KafkaListener kafkaListener , @ Nullable String id ) {
808817 String groupId = null ;
809818 if (StringUtils .hasText (kafkaListener .groupId ())) {
810819 groupId = resolveExpressionAsString (kafkaListener .groupId (), "groupId" );
@@ -1086,8 +1095,7 @@ private void addFormatters(FormatterRegistry registry) {
10861095
10871096 private <T > Collection <T > getBeansOfType (Class <T > type ) {
10881097 if (KafkaListenerAnnotationBeanPostProcessor .this .beanFactory instanceof ListableBeanFactory lbf ) {
1089- return lbf .getBeansOfType (type )
1090- .values ();
1098+ return lbf .getBeansOfType (type ).values ();
10911099 }
10921100 else {
10931101 return Collections .emptySet ();
@@ -1241,7 +1249,7 @@ public interface AnnotationEnhancer extends BiFunction<Map<String, Object>, Anno
12411249
12421250 }
12431251
1244- private final class BytesToNumberConverter implements ConditionalGenericConverter {
1252+ private static final class BytesToNumberConverter implements ConditionalGenericConverter {
12451253
12461254 BytesToNumberConverter () {
12471255 }
@@ -1265,6 +1273,9 @@ public Set<ConvertiblePair> getConvertibleTypes() {
12651273 @ Nullable
12661274 public Object convert (@ Nullable Object source , TypeDescriptor sourceType , TypeDescriptor targetType ) {
12671275 byte [] bytes = (byte []) source ;
1276+ if (bytes == null ) {
1277+ return null ;
1278+ }
12681279 if (targetType .getType ().equals (long .class ) || targetType .getType ().equals (Long .class )) {
12691280 Assert .state (bytes .length >= 8 , "At least 8 bytes needed to convert a byte[] to a long" ); // NOSONAR
12701281 return ByteBuffer .wrap (bytes ).getLong ();
0 commit comments