Skip to content

Commit b7f2f1b

Browse files
wilkinsonasobychacko
authored andcommitted
Support Spring Boot's KafkaConnectionDetails for Kafka connections
- Integrate KafkaConnectionDetails, a Spring Boot component, in binder - Update KafkaBinderConfigurationProperties to use KafkaConnectionDetails - Modify KafkaTopicProvisioner to leverage KafkaConnectionDetails - Adjust Kafka binder configurations to pass KafkaConnectionDetails - Update tests to accommodate KafkaConnectionDetails changes - Add KafkaConnectionDetails to shared.beans for auto-configuration This change improves flexibility in configuring Kafka connections, allowing for better support of externalized configuration management and aligning with Spring Boot's connection abstraction model.
1 parent 4175513 commit b7f2f1b

File tree

23 files changed

+200
-71
lines changed

23 files changed

+200
-71
lines changed

binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.apache.kafka.clients.consumer.ConsumerConfig;
3737
import org.apache.kafka.clients.producer.ProducerConfig;
3838

39+
import org.springframework.beans.factory.ObjectProvider;
40+
import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails;
3941
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
4042
import org.springframework.cloud.stream.binder.HeaderMode;
4143
import org.springframework.cloud.stream.binder.ProducerProperties;
@@ -77,6 +79,8 @@ public class KafkaBinderConfigurationProperties {
7779

7880
private final KafkaProperties kafkaProperties;
7981

82+
private final KafkaConnectionDetails kafkaConnectionDetails;
83+
8084
/**
8185
* Arbitrary kafka properties that apply to both producers and consumers.
8286
*/
@@ -170,10 +174,12 @@ public class KafkaBinderConfigurationProperties {
170174
* https://github.com/spring-projects/spring-boot/issues/35564
171175
*
172176
* @param kafkaProperties Spring Kafka properties autoconfigured by Spring Boot
177+
* @param kafkaConnectionDetails Kafka connection details autoconfigured by Spring Boot
173178
*/
174-
public KafkaBinderConfigurationProperties(KafkaProperties kafkaProperties) {
179+
public KafkaBinderConfigurationProperties(KafkaProperties kafkaProperties, ObjectProvider<KafkaConnectionDetails> kafkaConnectionDetails) {
175180
Assert.notNull(kafkaProperties, "'kafkaProperties' cannot be null");
176181
this.kafkaProperties = kafkaProperties;
182+
this.kafkaConnectionDetails = kafkaConnectionDetails.getIfAvailable();
177183
}
178184

179185
public KafkaProperties getKafkaProperties() {
@@ -395,6 +401,9 @@ public void setProducerProperties(Map<String, String> producerProperties) {
395401
*/
396402
public Map<String, Object> mergedConsumerConfiguration() {
397403
Map<String, Object> consumerConfiguration = new HashMap<>(this.kafkaProperties.buildConsumerProperties(null));
404+
if (this.kafkaConnectionDetails != null) {
405+
consumerConfiguration.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaConnectionDetails.getConsumerBootstrapServers());
406+
}
398407
// Copy configured binder properties that apply to consumers
399408
// allow schema registry properties to be propagated to consumer configuration
400409
for (Map.Entry<String, String> configurationEntry : this.configuration
@@ -421,6 +430,9 @@ public Map<String, Object> mergedConsumerConfiguration() {
421430
*/
422431
public Map<String, Object> mergedProducerConfiguration() {
423432
Map<String, Object> producerConfiguration = new HashMap<>(this.kafkaProperties.buildProducerProperties(null));
433+
if (this.kafkaConnectionDetails != null) {
434+
producerConfiguration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaConnectionDetails.getProducerBootstrapServers());
435+
}
424436
// Copy configured binder properties that apply to producers
425437
for (Map.Entry<String, String> configurationEntry : this.configuration
426438
.entrySet()) {

binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
5656

5757
import org.springframework.beans.factory.InitializingBean;
58+
import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails;
5859
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
5960
import org.springframework.cloud.stream.binder.BinderException;
6061
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
@@ -122,10 +123,26 @@ public KafkaTopicProvisioner(
122123
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
123124
KafkaProperties kafkaProperties,
124125
AdminClientConfigCustomizer adminClientConfigCustomizer) {
125-
this(kafkaBinderConfigurationProperties, kafkaProperties, adminClientConfigCustomizer != null ?
126+
this(kafkaBinderConfigurationProperties, kafkaProperties, null, adminClientConfigCustomizer != null ?
126127
Arrays.asList(adminClientConfigCustomizer) : new ArrayList<>());
127128
}
128129

130+
/**
131+
* Create an instance.
132+
* @param kafkaBinderConfigurationProperties the binder configuration properties.
133+
* @param kafkaProperties the boot Kafka properties used to build the instance.
134+
* @parak kafkaConnectionDetails the Kafka connection details used to build the instance
135+
* @param adminClientConfigCustomizer to customize {@link AdminClient}.
136+
* @since 4.1.4
137+
*/
138+
public KafkaTopicProvisioner(
139+
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
140+
KafkaProperties kafkaProperties, KafkaConnectionDetails kafkaConnectionDetails,
141+
AdminClientConfigCustomizer adminClientConfigCustomizer) {
142+
this(kafkaBinderConfigurationProperties, kafkaProperties, kafkaConnectionDetails,
143+
adminClientConfigCustomizer != null ? Arrays.asList(adminClientConfigCustomizer) : new ArrayList<>());
144+
}
145+
129146
/**
130147
* Create an instance.
131148
*
@@ -138,17 +155,41 @@ public KafkaTopicProvisioner(
138155
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
139156
KafkaProperties kafkaProperties,
140157
List<AdminClientConfigCustomizer> adminClientConfigCustomizers) {
158+
this(kafkaBinderConfigurationProperties, kafkaProperties, null, adminClientConfigCustomizers);
159+
}
160+
161+
/**
162+
* Create an instance.
163+
*
164+
* @param kafkaBinderConfigurationProperties the binder configuration properties.
165+
* @param kafkaProperties the boot Kafka properties used to build the instance.
166+
* @param kafkaConnectionDetails the Kafka connection deatils used to build the instance.
167+
* @param adminClientConfigCustomizers to customize {@link AdminClient}.
168+
* @since 4.1.4
169+
*/
170+
public KafkaTopicProvisioner(
171+
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
172+
KafkaProperties kafkaProperties, KafkaConnectionDetails kafkaConnectionDetails,
173+
List<AdminClientConfigCustomizer> adminClientConfigCustomizers) {
141174

142175
Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null");
143176
this.configurationProperties = kafkaBinderConfigurationProperties;
144-
this.adminClientProperties = kafkaProperties.buildAdminProperties(null);
177+
this.adminClientProperties = createAdminClientProperties(kafkaProperties, kafkaConnectionDetails);
145178
normalalizeBootPropsWithBinder(this.adminClientProperties, kafkaProperties,
146179
kafkaBinderConfigurationProperties);
147180
// If the application provides AdminConfig customizers
148181
// and overrides properties, those take precedence.
149182
adminClientConfigCustomizers.forEach(customizer -> customizer.configure(this.adminClientProperties));
150183
}
151184

185+
private Map<String, Object> createAdminClientProperties(KafkaProperties properties, KafkaConnectionDetails connectionDetails) {
186+
Map<String, Object> adminProperties = properties.buildAdminProperties(null);
187+
if (connectionDetails != null) {
188+
adminProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, connectionDetails.getAdminBootstrapServers());
189+
}
190+
return adminProperties;
191+
}
192+
152193
/**
153194
* Return an unmodifiable map of merged admin properties.
154195
* @return the properties.

binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationPropertiesTest.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,22 @@
2828
import org.assertj.core.util.Files;
2929
import org.junit.jupiter.api.Test;
3030

31+
import org.springframework.beans.factory.ObjectProvider;
3132
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
3233
import org.springframework.core.io.ClassPathResource;
3334

3435
import static org.assertj.core.api.Assertions.assertThat;
36+
import static org.mockito.Mockito.mock;
3537

3638
class KafkaBinderConfigurationPropertiesTest {
3739

3840
@Test
41+
@SuppressWarnings("unchecked")
3942
void mergedConsumerConfigurationFiltersGroupIdFromKafkaProperties() {
4043
KafkaProperties kafkaProperties = new KafkaProperties();
4144
kafkaProperties.getConsumer().setGroupId("group1");
4245
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
43-
new KafkaBinderConfigurationProperties(kafkaProperties);
46+
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
4447

4548
Map<String, Object> mergedConsumerConfiguration =
4649
kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
@@ -49,11 +52,12 @@ void mergedConsumerConfigurationFiltersGroupIdFromKafkaProperties() {
4952
}
5053

5154
@Test
55+
@SuppressWarnings("unchecked")
5256
void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaProperties() {
5357
KafkaProperties kafkaProperties = new KafkaProperties();
5458
kafkaProperties.getConsumer().setEnableAutoCommit(true);
5559
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
56-
new KafkaBinderConfigurationProperties(kafkaProperties);
60+
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
5761

5862
Map<String, Object> mergedConsumerConfiguration =
5963
kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
@@ -62,10 +66,11 @@ void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaProperties() {
6266
}
6367

6468
@Test
69+
@SuppressWarnings("unchecked")
6570
void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationPropertiesConfiguration() {
6671
KafkaProperties kafkaProperties = new KafkaProperties();
6772
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
68-
new KafkaBinderConfigurationProperties(kafkaProperties);
73+
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
6974
kafkaBinderConfigurationProperties
7075
.setConfiguration(Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "group1"));
7176

@@ -75,10 +80,11 @@ void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationProper
7580
}
7681

7782
@Test
83+
@SuppressWarnings("unchecked")
7884
void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurationPropertiesConfiguration() {
7985
KafkaProperties kafkaProperties = new KafkaProperties();
8086
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
81-
new KafkaBinderConfigurationProperties(kafkaProperties);
87+
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
8288
kafkaBinderConfigurationProperties
8389
.setConfiguration(Collections.singletonMap(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"));
8490

@@ -88,10 +94,11 @@ void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurat
8894
}
8995

9096
@Test
97+
@SuppressWarnings("unchecked")
9198
void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationPropertiesConsumerProperties() {
9299
KafkaProperties kafkaProperties = new KafkaProperties();
93100
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
94-
new KafkaBinderConfigurationProperties(kafkaProperties);
101+
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
95102
kafkaBinderConfigurationProperties
96103
.setConsumerProperties(Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "group1"));
97104

@@ -101,10 +108,11 @@ void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationProper
101108
}
102109

103110
@Test
111+
@SuppressWarnings("unchecked")
104112
void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurationPropertiesConsumerProps() {
105113
KafkaProperties kafkaProperties = new KafkaProperties();
106114
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
107-
new KafkaBinderConfigurationProperties(kafkaProperties);
115+
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
108116
kafkaBinderConfigurationProperties
109117
.setConsumerProperties(Collections.singletonMap(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"));
110118

@@ -114,10 +122,11 @@ void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurat
114122
}
115123

116124
@Test
125+
@SuppressWarnings("unchecked")
117126
void certificateFilesAreConvertedToAbsolutePathsFromClassPathResources() {
118127
KafkaProperties kafkaProperties = new KafkaProperties();
119128
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
120-
new KafkaBinderConfigurationProperties(kafkaProperties);
129+
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
121130
final Map<String, String> configuration = kafkaBinderConfigurationProperties.getConfiguration();
122131
configuration.put("ssl.truststore.location", "classpath:testclient.truststore");
123132
configuration.put("ssl.keystore.location", "classpath:testclient.keystore");
@@ -132,6 +141,7 @@ void certificateFilesAreConvertedToAbsolutePathsFromClassPathResources() {
132141
}
133142

134143
@Test
144+
@SuppressWarnings("unchecked")
135145
void certificateFilesAreConvertedToAbsolutePathsFromHttpResources() throws IOException {
136146
HttpServer server = HttpServer.create(new InetSocketAddress("localhost", 5869), 0);
137147
createContextWithCertFileHandler(server, "testclient.truststore");
@@ -141,7 +151,7 @@ void certificateFilesAreConvertedToAbsolutePathsFromHttpResources() throws IOExc
141151

142152
KafkaProperties kafkaProperties = new KafkaProperties();
143153
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
144-
new KafkaBinderConfigurationProperties(kafkaProperties);
154+
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
145155
final Map<String, String> configuration = kafkaBinderConfigurationProperties.getConfiguration();
146156
configuration.put("ssl.truststore.location", "http://localhost:5869/testclient.truststore");
147157
configuration.put("ssl.keystore.location", "http://localhost:5869/testclient.keystore");
@@ -164,10 +174,11 @@ void certificateFilesAreConvertedToAbsolutePathsFromHttpResources() throws IOExc
164174
}
165175

166176
@Test
177+
@SuppressWarnings("unchecked")
167178
void certificateFilesAreConvertedToGivenAbsolutePathsFromClassPathResources() {
168179
KafkaProperties kafkaProperties = new KafkaProperties();
169180
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
170-
new KafkaBinderConfigurationProperties(kafkaProperties);
181+
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
171182
final Map<String, String> configuration = kafkaBinderConfigurationProperties.getConfiguration();
172183
configuration.put("ssl.truststore.location", "classpath:testclient.truststore");
173184
configuration.put("ssl.keystore.location", "classpath:testclient.keystore");
@@ -182,10 +193,11 @@ void certificateFilesAreConvertedToGivenAbsolutePathsFromClassPathResources() {
182193
}
183194

184195
@Test
196+
@SuppressWarnings("unchecked")
185197
void certificateFilesAreMovedForSchemaRegistryConfiguration() {
186198
KafkaProperties kafkaProperties = new KafkaProperties();
187199
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
188-
new KafkaBinderConfigurationProperties(kafkaProperties);
200+
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
189201
final Map<String, String> configuration = kafkaBinderConfigurationProperties.getConfiguration();
190202

191203
configuration.put("schema.registry.ssl.truststore.location", "classpath:testclient.truststore");
@@ -213,10 +225,11 @@ void certificateFilesAreMovedForSchemaRegistryConfiguration() {
213225
}
214226

215227
@Test
228+
@SuppressWarnings("unchecked")
216229
void schemaRegistryPropertiesPropagatedToMergedProducerProperties() {
217230
KafkaProperties kafkaProperties = new KafkaProperties();
218231
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
219-
new KafkaBinderConfigurationProperties(kafkaProperties);
232+
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
220233
final Map<String, String> configuration = kafkaBinderConfigurationProperties.getConfiguration();
221234

222235
configuration.put("schema.registry.url", "https://localhost:8081,https://localhost:8082");
@@ -254,10 +267,11 @@ void schemaRegistryPropertiesPropagatedToMergedProducerProperties() {
254267
}
255268

256269
@Test
270+
@SuppressWarnings("unchecked")
257271
public void testEmptyLocationsAreIgnored() {
258272
KafkaProperties kafkaProperties = new KafkaProperties();
259273
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
260-
new KafkaBinderConfigurationProperties(kafkaProperties);
274+
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
261275
final Map<String, String> configuration = kafkaBinderConfigurationProperties.getConfiguration();
262276
configuration.put("schema.registry.ssl.truststore.location", "");
263277
configuration.put("schema.registry.ssl.keystore.location", "");

binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisionerTests.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,13 +27,15 @@
2727
import org.apache.kafka.common.network.SslChannelBuilder;
2828
import org.junit.jupiter.api.Test;
2929

30+
import org.springframework.beans.factory.ObjectProvider;
3031
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
3132
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
3233
import org.springframework.core.io.ClassPathResource;
3334
import org.springframework.kafka.test.utils.KafkaTestUtils;
3435

3536
import static org.assertj.core.api.Assertions.assertThat;
3637
import static org.assertj.core.api.Assertions.fail;
38+
import static org.mockito.Mockito.mock;
3739

3840
/**
3941
* @author Gary Russell
@@ -44,15 +46,15 @@ class KafkaTopicProvisionerTests {
4446

4547
AdminClientConfigCustomizer adminClientConfigCustomizer = adminClientProperties -> adminClientProperties.put("foo", "bar");
4648

47-
@SuppressWarnings("rawtypes")
49+
@SuppressWarnings({ "rawtypes", "unchecked" })
4850
@Test
4951
void bootPropertiesOverriddenExceptServers() throws Exception {
5052
KafkaProperties bootConfig = new KafkaProperties();
5153
bootConfig.getProperties().put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
5254
"PLAINTEXT");
5355
bootConfig.setBootstrapServers(Collections.singletonList("localhost:1234"));
5456
KafkaBinderConfigurationProperties binderConfig = new KafkaBinderConfigurationProperties(
55-
bootConfig);
57+
bootConfig, mock(ObjectProvider.class));
5658
binderConfig.getConfiguration().put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
5759
"SSL");
5860
ClassPathResource ts = new ClassPathResource("test.truststore.ks");
@@ -73,15 +75,15 @@ void bootPropertiesOverriddenExceptServers() throws Exception {
7375
adminClient.close();
7476
}
7577

76-
@SuppressWarnings("rawtypes")
78+
@SuppressWarnings({ "rawtypes", "unchecked" })
7779
@Test
7880
void bootPropertiesOverriddenIncludingServers() throws Exception {
7981
KafkaProperties bootConfig = new KafkaProperties();
8082
bootConfig.getProperties().put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
8183
"PLAINTEXT");
8284
bootConfig.setBootstrapServers(Collections.singletonList("localhost:9092"));
8385
KafkaBinderConfigurationProperties binderConfig = new KafkaBinderConfigurationProperties(
84-
bootConfig);
86+
bootConfig, mock(ObjectProvider.class));
8587
binderConfig.getConfiguration().put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
8688
"SSL");
8789
ClassPathResource ts = new ClassPathResource("test.truststore.ks");
@@ -102,10 +104,11 @@ void bootPropertiesOverriddenIncludingServers() throws Exception {
102104
}
103105

104106
@Test
107+
@SuppressWarnings("unchecked")
105108
void brokersInvalid() throws Exception {
106109
KafkaProperties bootConfig = new KafkaProperties();
107110
KafkaBinderConfigurationProperties binderConfig = new KafkaBinderConfigurationProperties(
108-
bootConfig);
111+
bootConfig, mock(ObjectProvider.class));
109112
binderConfig.getConfiguration().put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
110113
"localhost:1234");
111114
try {

0 commit comments

Comments
 (0)