Skip to content

Commit 042f0c8

Browse files
wilkinsonamhalbritterphilwebb
committed
Add ConnectionDetail support to Kafka auto-configuration
Update Kafka auto-configuration so that `KafkaConnectionDetails` beans may be optionally used to provide connection details. See gh-34657 Co-Authored-By: Mortitz Halbritter <[email protected]> Co-Authored-By: Phillip Webb <[email protected]>
1 parent d860d87 commit 042f0c8

File tree

5 files changed

+325
-11
lines changed

5 files changed

+325
-11
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@
1818

1919
import java.io.IOException;
2020
import java.time.Duration;
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
import org.apache.kafka.clients.CommonClientConfigs;
25+
import org.apache.kafka.clients.consumer.ConsumerConfig;
26+
import org.apache.kafka.clients.producer.ProducerConfig;
2127

2228
import org.springframework.beans.factory.ObjectProvider;
2329
import org.springframework.boot.autoconfigure.AutoConfiguration;
@@ -26,6 +32,7 @@
2632
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
2733
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
2834
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate;
35+
import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails.Node;
2936
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
3037
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic;
3138
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -56,6 +63,9 @@
5663
* @author Eddú Meléndez
5764
* @author Nakul Mishra
5865
* @author Tomaz Fernandes
66+
* @author Moritz Halbritter
67+
* @author Andy Wilkinson
68+
* @author Phillip Webb
5969
* @since 1.5.0
6070
*/
6171
@AutoConfiguration
@@ -66,8 +76,12 @@ public class KafkaAutoConfiguration {
6676

6777
private final KafkaProperties properties;
6878

69-
public KafkaAutoConfiguration(KafkaProperties properties) {
79+
private final KafkaConnectionDetails connectionDetails;
80+
81+
KafkaAutoConfiguration(KafkaProperties properties, ObjectProvider<KafkaConnectionDetails> connectionDetails) {
7082
this.properties = properties;
83+
this.connectionDetails = connectionDetails
84+
.getIfAvailable(() -> new PropertiesKafkaConnectionDetails(properties));
7185
}
7286

7387
@Bean
@@ -94,8 +108,9 @@ public LoggingProducerListener<Object, Object> kafkaProducerListener() {
94108
@ConditionalOnMissingBean(ConsumerFactory.class)
95109
public DefaultKafkaConsumerFactory<?, ?> kafkaConsumerFactory(
96110
ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
97-
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(
98-
this.properties.buildConsumerProperties());
111+
Map<String, Object> properties = this.properties.buildConsumerProperties();
112+
applyKafkaConnectionDetailsForConsumer(properties);
113+
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(properties);
99114
customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
100115
return factory;
101116
}
@@ -104,8 +119,9 @@ public LoggingProducerListener<Object, Object> kafkaProducerListener() {
104119
@ConditionalOnMissingBean(ProducerFactory.class)
105120
public DefaultKafkaProducerFactory<?, ?> kafkaProducerFactory(
106121
ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
107-
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
108-
this.properties.buildProducerProperties());
122+
Map<String, Object> properties = this.properties.buildProducerProperties();
123+
applyKafkaConnectionDetailsForProducer(properties);
124+
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(properties);
109125
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
110126
if (transactionIdPrefix != null) {
111127
factory.setTransactionIdPrefix(transactionIdPrefix);
@@ -140,7 +156,9 @@ public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException
140156
@Bean
141157
@ConditionalOnMissingBean
142158
public KafkaAdmin kafkaAdmin() {
143-
KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
159+
Map<String, Object> properties = this.properties.buildAdminProperties();
160+
applyKafkaConnectionDetailsForAdmin(properties);
161+
KafkaAdmin kafkaAdmin = new KafkaAdmin(properties);
144162
KafkaProperties.Admin admin = this.properties.getAdmin();
145163
if (admin.getCloseTimeout() != null) {
146164
kafkaAdmin.setCloseTimeout((int) admin.getCloseTimeout().getSeconds());
@@ -168,6 +186,34 @@ public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaTemplate<?, ?>
168186
return builder.create(kafkaTemplate);
169187
}
170188

189+
private void applyKafkaConnectionDetailsForConsumer(Map<String, Object> properties) {
190+
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
191+
nodesToStringList(this.connectionDetails.getConsumerBootstrapNodes()));
192+
if (!(this.connectionDetails instanceof PropertiesKafkaConnectionDetails)) {
193+
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
194+
}
195+
}
196+
197+
private void applyKafkaConnectionDetailsForProducer(Map<String, Object> properties) {
198+
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
199+
nodesToStringList(this.connectionDetails.getProducerBootstrapNodes()));
200+
if (!(this.connectionDetails instanceof PropertiesKafkaConnectionDetails)) {
201+
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
202+
}
203+
}
204+
205+
private void applyKafkaConnectionDetailsForAdmin(Map<String, Object> properties) {
206+
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
207+
nodesToStringList(this.connectionDetails.getAdminBootstrapNodes()));
208+
if (!(this.connectionDetails instanceof PropertiesKafkaConnectionDetails)) {
209+
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
210+
}
211+
}
212+
213+
private List<String> nodesToStringList(List<Node> nodes) {
214+
return nodes.stream().map((node) -> node.host() + ":" + node.port()).toList();
215+
}
216+
171217
private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) {
172218
long delay = (retryTopic.getDelay() != null) ? retryTopic.getDelay().toMillis() : 0;
173219
if (delay > 0) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2012-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.kafka;
18+
19+
import java.util.List;
20+
21+
import org.springframework.boot.autoconfigure.service.connection.ConnectionDetails;
22+
23+
/**
24+
* Details required to establish a connection to a Kafka service.
25+
*
26+
* @author Moritz Halbritter
27+
* @author Andy Wilkinson
28+
* @author Phillip Webb
29+
* @since 3.1.0
30+
*/
31+
public interface KafkaConnectionDetails extends ConnectionDetails {
32+
33+
/**
34+
* Returns the list of bootstrap nodes.
35+
* @return the list of bootstrap nodes
36+
*/
37+
List<Node> getBootstrapNodes();
38+
39+
/**
40+
* Returns the list of bootstrap nodes used for consumers.
41+
* @return the list of bootstrap nodes used for consumers
42+
*/
43+
default List<Node> getConsumerBootstrapNodes() {
44+
return getBootstrapNodes();
45+
}
46+
47+
/**
48+
* Returns the list of bootstrap nodes used for producers.
49+
* @return the list of bootstrap nodes used for producers
50+
*/
51+
default List<Node> getProducerBootstrapNodes() {
52+
return getBootstrapNodes();
53+
}
54+
55+
/**
56+
* Returns the list of bootstrap nodes used for the admin.
57+
* @return the list of bootstrap nodes used for the admin
58+
*/
59+
default List<Node> getAdminBootstrapNodes() {
60+
return getBootstrapNodes();
61+
}
62+
63+
/**
64+
* Returns the list of bootstrap nodes used for Kafka Streams.
65+
* @return the list of bootstrap nodes used for Kafka Streams
66+
*/
67+
default List<Node> getStreamsBootstrapNodes() {
68+
return getBootstrapNodes();
69+
}
70+
71+
/**
72+
* A Kafka node.
73+
*
74+
* @param host the hostname
75+
* @param port the port
76+
*/
77+
record Node(String host, int port) {
78+
79+
}
80+
81+
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2020 the original author or authors.
2+
* Copyright 2012-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,8 +16,11 @@
1616

1717
package org.springframework.boot.autoconfigure.kafka;
1818

19+
import java.util.List;
1920
import java.util.Map;
2021

22+
import org.apache.kafka.clients.CommonClientConfigs;
23+
import org.apache.kafka.clients.consumer.ConsumerConfig;
2124
import org.apache.kafka.streams.StreamsBuilder;
2225
import org.apache.kafka.streams.StreamsConfig;
2326

@@ -27,6 +30,7 @@
2730
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
2831
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
2932
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
33+
import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails.Node;
3034
import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException;
3135
import org.springframework.context.annotation.Bean;
3236
import org.springframework.context.annotation.Configuration;
@@ -42,6 +46,8 @@
4246
* @author Gary Russell
4347
* @author Stephane Nicoll
4448
* @author Eddú Meléndez
49+
* @author Moritz Halbritter
50+
* @author Andy Wilkinson
4551
*/
4652
@Configuration(proxyBeanMethods = false)
4753
@ConditionalOnClass(StreamsBuilder.class)
@@ -56,17 +62,21 @@ class KafkaStreamsAnnotationDrivenConfiguration {
5662

5763
@ConditionalOnMissingBean
5864
@Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
59-
KafkaStreamsConfiguration defaultKafkaStreamsConfig(Environment environment) {
60-
Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
65+
KafkaStreamsConfiguration defaultKafkaStreamsConfig(Environment environment,
66+
ObjectProvider<KafkaConnectionDetails> connectionDetailsProvider) {
67+
KafkaConnectionDetails connectionDetails = connectionDetailsProvider
68+
.getIfAvailable(() -> new PropertiesKafkaConnectionDetails(this.properties));
69+
Map<String, Object> properties = this.properties.buildStreamsProperties();
70+
applyKafkaConnectionDetailsForStreams(connectionDetails, properties);
6171
if (this.properties.getStreams().getApplicationId() == null) {
6272
String applicationName = environment.getProperty("spring.application.name");
6373
if (applicationName == null) {
6474
throw new InvalidConfigurationPropertyValueException("spring.kafka.streams.application-id", null,
6575
"This property is mandatory and fallback 'spring.application.name' is not set either.");
6676
}
67-
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationName);
77+
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationName);
6878
}
69-
return new KafkaStreamsConfiguration(streamsProperties);
79+
return new KafkaStreamsConfiguration(properties);
7080
}
7181

7282
@Bean
@@ -77,6 +87,19 @@ KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
7787
return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean);
7888
}
7989

90+
private void applyKafkaConnectionDetailsForStreams(KafkaConnectionDetails connectionDetails,
91+
Map<String, Object> properties) {
92+
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
93+
nodesToStringList(connectionDetails.getStreamsBootstrapNodes()));
94+
if (!(connectionDetails instanceof PropertiesKafkaConnectionDetails)) {
95+
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
96+
}
97+
}
98+
99+
private List<String> nodesToStringList(List<Node> nodes) {
100+
return nodes.stream().map((node) -> node.host() + ":" + node.port()).toList();
101+
}
102+
80103
// Separate class required to avoid BeanCurrentlyInCreationException
81104
static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean {
82105

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2012-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.kafka;
18+
19+
import java.util.List;
20+
21+
/**
22+
* Adapts {@link KafkaProperties} to {@link KafkaConnectionDetails}.
23+
*
24+
* @author Moritz Halbritter
25+
* @author Andy Wilkinson
26+
* @author Phillip Webb
27+
*/
28+
class PropertiesKafkaConnectionDetails implements KafkaConnectionDetails {
29+
30+
private final int DEFAULT_PORT = 9092;
31+
32+
private final KafkaProperties properties;
33+
34+
PropertiesKafkaConnectionDetails(KafkaProperties properties) {
35+
this.properties = properties;
36+
}
37+
38+
@Override
39+
public List<Node> getBootstrapNodes() {
40+
return asNodes(this.properties.getBootstrapServers());
41+
}
42+
43+
@Override
44+
public List<Node> getConsumerBootstrapNodes() {
45+
return bootstrapNodes(this.properties.getConsumer().getBootstrapServers());
46+
}
47+
48+
@Override
49+
public List<Node> getProducerBootstrapNodes() {
50+
return bootstrapNodes(this.properties.getProducer().getBootstrapServers());
51+
}
52+
53+
@Override
54+
public List<Node> getStreamsBootstrapNodes() {
55+
return bootstrapNodes(this.properties.getStreams().getBootstrapServers());
56+
}
57+
58+
private List<Node> bootstrapNodes(List<String> bootstrapServers) {
59+
return (bootstrapServers != null) ? asNodes(bootstrapServers) : getBootstrapNodes();
60+
}
61+
62+
private List<Node> asNodes(List<String> bootstrapServers) {
63+
return bootstrapServers.stream().map(this::asNode).toList();
64+
}
65+
66+
private Node asNode(String bootstrapNode) {
67+
int separatorIndex = bootstrapNode.indexOf(':');
68+
if (separatorIndex == -1) {
69+
return new Node(bootstrapNode, this.DEFAULT_PORT);
70+
}
71+
return new Node(bootstrapNode.substring(0, separatorIndex),
72+
Integer.parseInt(bootstrapNode.substring(separatorIndex + 1)));
73+
}
74+
75+
}

0 commit comments

Comments
 (0)