Skip to content

Commit 544e728

Browse files
committed
Merge pull request #46675 from nosan
* pr/46675: Polish "Use Kafka's StreamsBuilderFactoryBeanConfigurer" Use Kafka's StreamsBuilderFactoryBeanConfigurer Closes gh-46675
2 parents 95d47bc + a648d9e commit 544e728

File tree

5 files changed

+43
-55
lines changed

5 files changed

+43
-55
lines changed

documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/kafka.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ include-code::MyKafkaStreamsConfiguration[]
7474
By default, the streams managed by the javadoc:org.apache.kafka.streams.StreamsBuilder[] object are started automatically.
7575
You can customize this behavior using the configprop:spring.kafka.streams.auto-startup[] property.
7676

77+
TIP: You can also register an arbitrary number of beans that implement javadoc:org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer[] for more advanced customizations.
78+
7779

7880

7981
[[messaging.kafka.additional-properties]]

module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaStreamsAnnotationDrivenConfiguration.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,18 @@
2222
import org.apache.kafka.streams.StreamsBuilder;
2323
import org.apache.kafka.streams.StreamsConfig;
2424

25-
import org.springframework.beans.factory.InitializingBean;
26-
import org.springframework.beans.factory.ObjectProvider;
27-
import org.springframework.beans.factory.annotation.Qualifier;
2825
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
2926
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
3027
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
3128
import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException;
3229
import org.springframework.context.annotation.Bean;
3330
import org.springframework.context.annotation.Configuration;
31+
import org.springframework.core.Ordered;
3432
import org.springframework.core.env.Environment;
3533
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
3634
import org.springframework.kafka.config.KafkaStreamsConfiguration;
3735
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
36+
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;
3837
import org.springframework.kafka.core.CleanupConfig;
3938

4039
/**
@@ -76,11 +75,8 @@ KafkaStreamsConfiguration defaultKafkaStreamsConfig(Environment environment,
7675
}
7776

7877
@Bean
79-
KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
80-
@Qualifier(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME) StreamsBuilderFactoryBean factoryBean,
81-
ObjectProvider<StreamsBuilderFactoryBeanCustomizer> customizers) {
82-
customizers.orderedStream().forEach((customizer) -> customizer.customize(factoryBean));
83-
return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean);
78+
StreamsBuilderFactoryBeanConfigurer kafkaPropertiesStreamsBuilderFactoryBeanConfigurer() {
79+
return new KafkaPropertiesStreamsBuilderFactoryBeanConfigurer(this.properties);
8480
}
8581

8682
private void applyKafkaConnectionDetailsForStreams(Map<String, Object> properties,
@@ -91,24 +87,25 @@ private void applyKafkaConnectionDetailsForStreams(Map<String, Object> propertie
9187
KafkaAutoConfiguration.applySslBundle(properties, streams.getSslBundle());
9288
}
9389

94-
// Separate class required to avoid BeanCurrentlyInCreationException
95-
static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean {
90+
static class KafkaPropertiesStreamsBuilderFactoryBeanConfigurer implements StreamsBuilderFactoryBeanConfigurer {
9691

9792
private final KafkaProperties properties;
9893

99-
private final StreamsBuilderFactoryBean factoryBean;
100-
101-
KafkaStreamsFactoryBeanConfigurer(KafkaProperties properties, StreamsBuilderFactoryBean factoryBean) {
94+
KafkaPropertiesStreamsBuilderFactoryBeanConfigurer(KafkaProperties properties) {
10295
this.properties = properties;
103-
this.factoryBean = factoryBean;
10496
}
10597

10698
@Override
107-
public void afterPropertiesSet() {
108-
this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup());
99+
public void configure(StreamsBuilderFactoryBean factoryBean) {
100+
factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup());
109101
KafkaProperties.Cleanup cleanup = this.properties.getStreams().getCleanup();
110102
CleanupConfig cleanupConfig = new CleanupConfig(cleanup.isOnStartup(), cleanup.isOnShutdown());
111-
this.factoryBean.setCleanupConfig(cleanupConfig);
103+
factoryBean.setCleanupConfig(cleanupConfig);
104+
}
105+
106+
@Override
107+
public int getOrder() {
108+
return Ordered.HIGHEST_PRECEDENCE;
112109
}
113110

114111
}

module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/StreamsBuilderFactoryBeanCustomizer.java

Lines changed: 0 additions & 36 deletions
This file was deleted.

module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@
2626
import org.springframework.boot.kafka.autoconfigure.DefaultKafkaConsumerFactoryCustomizer;
2727
import org.springframework.boot.kafka.autoconfigure.DefaultKafkaProducerFactoryCustomizer;
2828
import org.springframework.boot.kafka.autoconfigure.KafkaAutoConfiguration;
29-
import org.springframework.boot.kafka.autoconfigure.StreamsBuilderFactoryBeanCustomizer;
3029
import org.springframework.context.annotation.Bean;
3130
import org.springframework.context.annotation.Configuration;
3231
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
32+
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;
3333
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
3434
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
3535
import org.springframework.kafka.core.MicrometerConsumerListener;
@@ -75,7 +75,7 @@ private <K, V> void addListener(DefaultKafkaProducerFactory<K, V> factory, Meter
7575
static class KafkaStreamsMetricsConfiguration {
7676

7777
@Bean
78-
StreamsBuilderFactoryBeanCustomizer kafkaStreamsMetrics(MeterRegistry meterRegistry) {
78+
StreamsBuilderFactoryBeanConfigurer kafkaStreamsMetrics(MeterRegistry meterRegistry) {
7979
return (factoryBean) -> factoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry));
8080
}
8181

module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfigurationTests.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@
6969
import org.springframework.kafka.config.KafkaListenerContainerFactory;
7070
import org.springframework.kafka.config.KafkaStreamsConfiguration;
7171
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
72+
import org.springframework.kafka.config.StreamsBuilderFactoryBean.Listener;
73+
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;
7274
import org.springframework.kafka.core.CleanupConfig;
7375
import org.springframework.kafka.core.ConsumerFactory;
7476
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@@ -458,6 +460,29 @@ void streamsProperties() {
458460
});
459461
}
460462

463+
@Test
464+
void streamsBuilderFactoryBeanConfigurerIsApplied() {
465+
Listener listener = mock(Listener.class);
466+
this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class)
467+
// user's StreamsBuilderFactoryBeanConfigurer must be invoked after the
468+
// default one
469+
.withBean(StreamsBuilderFactoryBeanConfigurer.class, () -> (factoryBean) -> {
470+
assertThat(factoryBean.isAutoStartup()).isFalse();
471+
assertThat(factoryBean).extracting("cleanupConfig.onStart").isEqualTo(true);
472+
assertThat(factoryBean).extracting("cleanupConfig.onStop").isEqualTo(true);
473+
factoryBean.addListener(listener);
474+
})
475+
.withPropertyValues("spring.kafka.client-id=cid",
476+
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.application.name=appName",
477+
"spring.kafka.streams.auto-startup=false", "spring.kafka.streams.cleanup.on-shutdown=true",
478+
"spring.kafka.streams.cleanup.on-startup=true")
479+
.run((context) -> {
480+
assertThat(context).hasSingleBean(StreamsBuilderFactoryBean.class);
481+
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean(StreamsBuilderFactoryBean.class);
482+
assertThat(streamsBuilderFactoryBean.getListeners()).hasSize(1);
483+
});
484+
}
485+
461486
@Test
462487
void connectionDetailsAreAppliedToStreams() {
463488
this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class)

0 commit comments

Comments
 (0)