Skip to content

Commit d1e0450

Browse files
tomazfernandesgaryrussell
authored andcommitted
GH-1745 - Let users provide a RetryTopicConfigurer bean
1 parent e1d2c7b commit d1e0450

File tree

3 files changed

+34
-18
lines changed

3 files changed

+34
-18
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,14 @@
5151
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
5252
import org.springframework.beans.factory.ObjectFactory;
5353
import org.springframework.beans.factory.SmartInitializingSingleton;
54-
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
5554
import org.springframework.beans.factory.config.BeanExpressionContext;
5655
import org.springframework.beans.factory.config.BeanExpressionResolver;
5756
import org.springframework.beans.factory.config.BeanPostProcessor;
5857
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
5958
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
6059
import org.springframework.beans.factory.config.Scope;
60+
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
61+
import org.springframework.beans.factory.support.RootBeanDefinition;
6162
import org.springframework.context.ApplicationContext;
6263
import org.springframework.context.ApplicationContextAware;
6364
import org.springframework.context.ConfigurableApplicationContext;
@@ -84,6 +85,7 @@
8485
import org.springframework.kafka.retrytopic.RetryTopicBootstrapper;
8586
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
8687
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
88+
import org.springframework.kafka.retrytopic.RetryTopicInternalBeanNames;
8789
import org.springframework.kafka.support.KafkaNull;
8890
import org.springframework.kafka.support.TopicPartitionOffset;
8991
import org.springframework.lang.Nullable;
@@ -481,19 +483,25 @@ private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object
481483
}
482484

483485
private RetryTopicConfigurer getRetryTopicConfigurer() {
484-
try {
485-
return this.beanFactory.getBean(RetryTopicConfigurer.class);
486-
}
487-
catch (NoSuchBeanDefinitionException e) {
488-
if (!(this.beanFactory instanceof AutowireCapableBeanFactory)) {
489-
throw new IllegalStateException("BeanFactory must be an instance of "
490-
+ AutowireCapableBeanFactory.class.getSimpleName()
491-
+ " Provided beanFactory: " + this.beanFactory.getClass().getSimpleName(), e);
492-
}
493-
((AutowireCapableBeanFactory) this.beanFactory)
494-
.createBean(RetryTopicBootstrapper.class)
495-
.bootstrapRetryTopic();
496-
return this.beanFactory.getBean(RetryTopicConfigurer.class);
486+
bootstrapRetryTopicIfNecessary();
487+
return this.beanFactory.getBean(RetryTopicInternalBeanNames.RETRY_TOPIC_CONFIGURER, RetryTopicConfigurer.class);
488+
}
489+
490+
private void bootstrapRetryTopicIfNecessary() {
491+
if (!(this.beanFactory instanceof BeanDefinitionRegistry)) {
492+
throw new IllegalStateException("BeanFactory must be an instance of "
493+
+ BeanDefinitionRegistry.class.getSimpleName()
494+
+ " to bootstrap the RetryTopic functionality. Provided beanFactory: "
495+
+ this.beanFactory.getClass().getSimpleName());
496+
}
497+
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) this.beanFactory;
498+
if (!registry.containsBeanDefinition(RetryTopicInternalBeanNames
499+
.RETRY_TOPIC_BOOTSTRAPPER)) {
500+
registry.registerBeanDefinition(RetryTopicInternalBeanNames
501+
.RETRY_TOPIC_BOOTSTRAPPER,
502+
new RootBeanDefinition(RetryTopicBootstrapper.class));
503+
this.beanFactory.getBean(RetryTopicInternalBeanNames
504+
.RETRY_TOPIC_BOOTSTRAPPER, RetryTopicBootstrapper.class).bootstrapRetryTopic();
497505
}
498506
}
499507

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicInternalBeanNames.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,8 +33,6 @@ public abstract class RetryTopicInternalBeanNames {
3333

3434
static final String KAFKA_CONSUMER_BACKOFF_MANAGER = "internalKafkaConsumerBackoffManager";
3535

36-
static final String RETRY_TOPIC_CONFIGURER = "internalRetryTopicConfigurer";
37-
3836
static final String LISTENER_CONTAINER_FACTORY_RESOLVER_NAME = "internalListenerContainerFactoryResolver";
3937

4038
static final String LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME = "internalListenerContainerFactoryConfigurer";
@@ -65,4 +63,14 @@ public abstract class RetryTopicInternalBeanNames {
6563
*/
6664
public static final String DEFAULT_KAFKA_TEMPLATE_BEAN_NAME = "retryTopicDefaultKafkaTemplate";
6765

66+
/**
67+
* RetryTopicBootstrapper bean name.
68+
*/
69+
public static final String RETRY_TOPIC_BOOTSTRAPPER = "internalRetryTopicBootstrapper";
70+
71+
/**
72+
* RetryTopicConfigurer bean name.
73+
*/
74+
public static final String RETRY_TOPIC_CONFIGURER = "internalRetryTopicConfigurer";
75+
6876
}

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

0 commit comments

Comments
 (0)