Skip to content

Commit f8d3c66

Browse files
committed
Kafka requires SslBundles for consumer and producer properties
1 parent b06ca44 commit f8d3c66

File tree

4 files changed

+51
-7
lines changed

4 files changed

+51
-7
lines changed

spring-cloud-starter-single-step-batch-job/src/main/java/org/springframework/cloud/task/batch/autoconfigure/kafka/KafkaItemReaderAutoConfiguration.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@
3131
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
3232
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
3333
import org.springframework.boot.context.properties.EnableConfigurationProperties;
34+
import org.springframework.boot.ssl.SslBundles;
3435
import org.springframework.context.annotation.Bean;
3536
import org.springframework.util.StringUtils;
3637

@@ -50,13 +51,16 @@ public class KafkaItemReaderAutoConfiguration {
5051
@Autowired
5152
private KafkaProperties kafkaProperties;
5253

54+
@Autowired
55+
private SslBundles sslBundles;
56+
5357
@Bean
5458
@ConditionalOnMissingBean
5559
@ConditionalOnProperty(prefix = "spring.batch.job.kafkaitemreader", name = "name")
5660
public KafkaItemReader<Object, Map<String, Object>> kafkaItemReader(
5761
KafkaItemReaderProperties kafkaItemReaderProperties) {
5862
Properties consumerProperties = new Properties();
59-
consumerProperties.putAll(this.kafkaProperties.getConsumer().buildProperties());
63+
consumerProperties.putAll(this.kafkaProperties.getConsumer().buildProperties(sslBundles));
6064
validateProperties(kafkaItemReaderProperties);
6165
if (kafkaItemReaderProperties.getPartitions() == null
6266
|| kafkaItemReaderProperties.getPartitions().size() == 0) {

spring-cloud-starter-single-step-batch-job/src/main/java/org/springframework/cloud/task/batch/autoconfigure/kafka/KafkaItemWriterAutoConfiguration.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,6 +30,7 @@
3030
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
3131
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
3232
import org.springframework.boot.context.properties.EnableConfigurationProperties;
33+
import org.springframework.boot.ssl.SslBundles;
3334
import org.springframework.context.annotation.Bean;
3435
import org.springframework.core.convert.converter.Converter;
3536
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -84,9 +85,9 @@ public Object convert(Map<String, Object> source) {
8485

8586
@Bean
8687
@ConditionalOnMissingBean
87-
ProducerFactory<Object, Map<String, Object>> producerFactory() {
88+
ProducerFactory<Object, Map<String, Object>> producerFactory(SslBundles sslBundles) {
8889
Map<String, Object> configs = new HashMap<>();
89-
configs.putAll(this.kafkaProperties.getProducer().buildProperties());
90+
configs.putAll(this.kafkaProperties.getProducer().buildProperties(sslBundles));
9091
return new DefaultKafkaProducerFactory<>(configs, null, new JsonSerializer<>());
9192
}
9293

spring-cloud-starter-single-step-batch-job/src/test/java/org/springframework/cloud/task/batch/autoconfigure/kafka/KafkaItemReaderAutoConfigurationTests.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
2121
import java.util.HashMap;
2222
import java.util.List;
2323
import java.util.Map;
24+
import java.util.function.Consumer;
2425

2526
import org.apache.kafka.clients.admin.NewTopic;
2627
import org.apache.kafka.clients.producer.Producer;
@@ -40,6 +41,9 @@
4041
import org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration;
4142
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
4243
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
44+
import org.springframework.boot.ssl.NoSuchSslBundleException;
45+
import org.springframework.boot.ssl.SslBundle;
46+
import org.springframework.boot.ssl.SslBundles;
4347
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
4448
import org.springframework.cloud.task.batch.autoconfigure.SingleStepJobAutoConfiguration;
4549
import org.springframework.context.annotation.Bean;
@@ -227,6 +231,22 @@ public ListItemWriter<Map<String, Object>> itemWriter() {
227231
return new ListItemWriter<>();
228232
}
229233

234+
@Bean
235+
public SslBundles sslBundles() {
236+
return new SslBundles() {
237+
@Override
238+
public SslBundle getBundle(String name) throws NoSuchSslBundleException {
239+
return null;
240+
}
241+
242+
@Override
243+
public void addBundleUpdateHandler(String name, Consumer<SslBundle> updateHandler)
244+
throws NoSuchSslBundleException {
245+
246+
}
247+
};
248+
}
249+
230250
}
231251

232252
}

spring-cloud-starter-single-step-batch-job/src/test/java/org/springframework/cloud/task/batch/autoconfigure/kafka/KafkaItemWriterTests.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,6 +38,9 @@
3838
import org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration;
3939
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
4040
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
41+
import org.springframework.boot.ssl.NoSuchSslBundleException;
42+
import org.springframework.boot.ssl.SslBundle;
43+
import org.springframework.boot.ssl.SslBundles;
4144
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
4245
import org.springframework.cloud.task.batch.autoconfigure.SingleStepJobAutoConfiguration;
4346
import org.springframework.context.ApplicationContext;
@@ -143,6 +146,22 @@ private void addNameToReaderList(List<Map<String, Object>> itemReaderList, Strin
143146
itemReaderList.add(prepMap);
144147
}
145148

149+
@Bean
150+
public SslBundles sslBundles() {
151+
return new SslBundles() {
152+
@Override
153+
public SslBundle getBundle(String name) throws NoSuchSslBundleException {
154+
return null;
155+
}
156+
157+
@Override
158+
public void addBundleUpdateHandler(String name, java.util.function.Consumer<SslBundle> updateHandler)
159+
throws NoSuchSslBundleException {
160+
161+
}
162+
};
163+
}
164+
146165
}
147166

148167
}

0 commit comments

Comments
 (0)