Skip to content

Commit 29df442

Browse files
garyrussellartembilan
authored andcommitted
Add KafkaStreamsConfiguration
See spring-projects/spring-boot#14021 (comment) Avoid using a bean of type `Properties`. * Polishing - PR Comments
1 parent b115a31 commit 29df442

File tree

9 files changed

+122
-45
lines changed

9 files changed

+122
-45
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaStreamsDefaultConfiguration.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,14 @@
1616

1717
package org.springframework.kafka.annotation;
1818

19-
import java.util.Properties;
20-
2119
import org.apache.kafka.streams.StreamsConfig;
2220

2321
import org.springframework.beans.factory.ObjectProvider;
2422
import org.springframework.beans.factory.UnsatisfiedDependencyException;
2523
import org.springframework.beans.factory.annotation.Qualifier;
2624
import org.springframework.context.annotation.Bean;
2725
import org.springframework.context.annotation.Configuration;
26+
import org.springframework.kafka.config.KafkaStreamsConfiguration;
2827
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
2928

3029
/**
@@ -57,8 +56,10 @@ public class KafkaStreamsDefaultConfiguration {
5756

5857
@Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
5958
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(
60-
@Qualifier(DEFAULT_STREAMS_CONFIG_BEAN_NAME) ObjectProvider<Properties> streamsConfigProvider) {
61-
Properties streamsConfig = streamsConfigProvider.getIfAvailable();
59+
@Qualifier(DEFAULT_STREAMS_CONFIG_BEAN_NAME)
60+
ObjectProvider<KafkaStreamsConfiguration> streamsConfigProvider) {
61+
62+
KafkaStreamsConfiguration streamsConfig = streamsConfigProvider.getIfAvailable();
6263
if (streamsConfig != null) {
6364
return new StreamsBuilderFactoryBean(streamsConfig);
6465
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.config;
18+
19+
import java.util.HashMap;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.Properties;
23+
24+
import org.apache.kafka.streams.StreamsBuilder;
25+
26+
import org.springframework.util.Assert;
27+
28+
/**
29+
* Wrapper for {@link StreamsBuilder} properties.
30+
*
31+
* @author Gary Russell
32+
* @since 2.2
33+
*
34+
*/
35+
public class KafkaStreamsConfiguration {
36+
37+
private final Map<String, Object> configs;
38+
39+
private Properties properties;
40+
41+
public KafkaStreamsConfiguration(Map<String, Object> configs) {
42+
Assert.notNull(configs, "Configuration map cannot be null");
43+
this.configs = new HashMap<>(configs);
44+
}
45+
46+
/**
47+
* Return the configuration map as a {@link Properties}.
48+
* @return the properties.
49+
*/
50+
public Properties asProperties() {
51+
if (this.properties == null) {
52+
Properties properties = new Properties();
53+
this.configs.forEach((k, v) -> {
54+
String value = v.toString();
55+
if (v instanceof List && value.length() > 1) {
56+
// trim [...] - revert to comma-delimited list
57+
value = value.substring(1, value.length() - 1);
58+
}
59+
properties.setProperty(k, value);
60+
});
61+
this.properties = properties;
62+
}
63+
return this.properties;
64+
}
65+
66+
}

spring-kafka/src/main/java/org/springframework/kafka/core/StreamsBuilderFactoryBean.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.springframework.beans.factory.config.AbstractFactoryBean;
3232
import org.springframework.context.SmartLifecycle;
3333
import org.springframework.kafka.KafkaException;
34+
import org.springframework.kafka.config.KafkaStreamsConfiguration;
3435
import org.springframework.lang.Nullable;
3536
import org.springframework.util.Assert;
3637

@@ -93,7 +94,9 @@ public StreamsBuilderFactoryBean() {
9394
/**
9495
* Construct an instance with the supplied streams configuration.
9596
* @param streamsConfig the streams configuration.
97+
* @deprecated in favor of {@link #StreamsBuilderFactoryBean(KafkaStreamsConfiguration)}
9698
*/
99+
@Deprecated
97100
public StreamsBuilderFactoryBean(StreamsConfig streamsConfig) {
98101
this(streamsConfig, new CleanupConfig());
99102
}
@@ -104,7 +107,7 @@ public StreamsBuilderFactoryBean(StreamsConfig streamsConfig) {
104107
* @param streamsConfig the streams configuration.
105108
* @param cleanupConfig the cleanup configuration.
106109
* @since 2.1.2.
107-
* @deprecated in favor of {@link #StreamsBuilderFactoryBean(Properties, CleanupConfig)}
110+
* @deprecated in favor of {@link #StreamsBuilderFactoryBean(KafkaStreamsConfiguration, CleanupConfig)}
108111
*/
109112
@Deprecated
110113
public StreamsBuilderFactoryBean(StreamsConfig streamsConfig, CleanupConfig cleanupConfig) {
@@ -121,17 +124,17 @@ public StreamsBuilderFactoryBean(StreamsConfig streamsConfig, CleanupConfig clea
121124
* @param cleanupConfig the cleanup configuration.
122125
* @since 2.2
123126
*/
124-
public StreamsBuilderFactoryBean(Properties streamsConfig, CleanupConfig cleanupConfig) {
127+
public StreamsBuilderFactoryBean(KafkaStreamsConfiguration streamsConfig, CleanupConfig cleanupConfig) {
125128
Assert.notNull(streamsConfig, "'streamsConfig' must not be null");
126129
Assert.notNull(cleanupConfig, "'cleanupConfig' must not be null");
127-
this.properties = streamsConfig;
130+
this.properties = streamsConfig.asProperties();
128131
this.cleanupConfig = cleanupConfig;
129132
}
130133

131134
/**
132135
* Construct an instance with the supplied streams configuration.
133136
* @param streamsConfig the streams configuration.
134-
* @deprecated in favor of {@link #StreamsBuilderFactoryBean(Properties)}.
137+
* @deprecated in favor of {@link #StreamsBuilderFactoryBean(KafkaStreamsConfiguration)}.
135138
*/
136139
@Deprecated
137140
public StreamsBuilderFactoryBean(Map<String, Object> streamsConfig) {
@@ -143,7 +146,7 @@ public StreamsBuilderFactoryBean(Map<String, Object> streamsConfig) {
143146
* @param streamsConfig the streams configuration.
144147
* @since 2.2
145148
*/
146-
public StreamsBuilderFactoryBean(Properties streamsConfig) {
149+
public StreamsBuilderFactoryBean(KafkaStreamsConfiguration streamsConfig) {
147150
this(streamsConfig, new CleanupConfig());
148151
}
149152

@@ -153,7 +156,7 @@ public StreamsBuilderFactoryBean(Properties streamsConfig) {
153156
* @param streamsConfig the streams configuration.
154157
* @param cleanupConfig the cleanup configuration.
155158
* @since 2.1.2.
156-
* @deprecated in favor of {@link #StreamsBuilderFactoryBean(Properties, CleanupConfig)}.
159+
* @deprecated in favor of {@link #StreamsBuilderFactoryBean(KafkaStreamsConfiguration, CleanupConfig)}.
157160
*/
158161
@Deprecated
159162
public StreamsBuilderFactoryBean(Map<String, Object> streamsConfig, CleanupConfig cleanupConfig) {

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaStreamsCustomizerTests.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21-
import java.util.Properties;
21+
import java.util.HashMap;
22+
import java.util.Map;
2223

2324
import org.apache.kafka.streams.KafkaStreams;
2425
import org.apache.kafka.streams.StreamsConfig;
@@ -31,6 +32,7 @@
3132
import org.springframework.kafka.annotation.EnableKafka;
3233
import org.springframework.kafka.annotation.EnableKafkaStreams;
3334
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
35+
import org.springframework.kafka.config.KafkaStreamsConfiguration;
3436
import org.springframework.kafka.test.EmbeddedKafkaBroker;
3537
import org.springframework.kafka.test.context.EmbeddedKafka;
3638
import org.springframework.test.annotation.DirtiesContext;
@@ -64,24 +66,25 @@ public void testKafkaStreamsCustomizer() {
6466
@Configuration
6567
@EnableKafka
6668
@EnableKafkaStreams
67-
public static class KafkaStreamsConfiguration {
69+
public static class KafkaStreamsConfig {
6870

6971
@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
7072
private String brokerAddresses;
7173

7274
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
7375
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder() {
74-
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(kStreamsConfigs());
76+
StreamsBuilderFactoryBean streamsBuilderFactoryBean =
77+
new StreamsBuilderFactoryBean(kStreamsConfigs());
7578
streamsBuilderFactoryBean.setKafkaStreamsCustomizer(customizer());
7679
return streamsBuilderFactoryBean;
7780
}
7881

7982
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
80-
public Properties kStreamsConfigs() {
81-
Properties props = new Properties();
83+
public KafkaStreamsConfiguration kStreamsConfigs() {
84+
Map<String, Object> props = new HashMap<>();
8285
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
8386
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
84-
return props;
87+
return new KafkaStreamsConfiguration(props);
8588
}
8689

8790

spring-kafka/src/test/java/org/springframework/kafka/core/StreamsBuilderFactoryBeanTests.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
import java.nio.file.Files;
2323
import java.nio.file.Path;
2424
import java.nio.file.Paths;
25-
import java.util.Properties;
25+
import java.util.HashMap;
26+
import java.util.Map;
2627

2728
import org.apache.kafka.streams.StreamsConfig;
2829
import org.junit.jupiter.api.BeforeAll;
@@ -36,6 +37,7 @@
3637
import org.springframework.context.annotation.Configuration;
3738
import org.springframework.kafka.annotation.EnableKafkaStreams;
3839
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
40+
import org.springframework.kafka.config.KafkaStreamsConfiguration;
3941
import org.springframework.kafka.test.EmbeddedKafkaBroker;
4042
import org.springframework.kafka.test.context.EmbeddedKafka;
4143
import org.springframework.test.annotation.DirtiesContext;
@@ -79,7 +81,7 @@ public void testCleanupStreams() throws IOException {
7981

8082
@Configuration
8183
@EnableKafkaStreams
82-
public static class KafkaStreamsConfiguration {
84+
public static class KafkaStreamsConfig {
8385

8486
@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
8587
private String brokerAddresses;
@@ -90,12 +92,12 @@ public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder() {
9092
}
9193

9294
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
93-
public Properties kStreamsConfigs() {
94-
Properties props = new Properties();
95+
public KafkaStreamsConfiguration kStreamsConfigs() {
96+
Map<String, Object> props = new HashMap<>();
9597
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
9698
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
9799
props.put(StreamsConfig.STATE_DIR_CONFIG, stateStoreDir.toString());
98-
return props;
100+
return new KafkaStreamsConfiguration(props);
99101
}
100102

101103
}

spring-kafka/src/test/java/org/springframework/kafka/kstream/KafkaStreamsBranchTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020

2121
import java.util.ArrayList;
22+
import java.util.HashMap;
2223
import java.util.List;
2324
import java.util.Map;
24-
import java.util.Properties;
2525
import java.util.UUID;
2626

2727
import org.apache.kafka.clients.consumer.Consumer;
@@ -43,6 +43,7 @@
4343
import org.springframework.context.annotation.Configuration;
4444
import org.springframework.kafka.annotation.EnableKafkaStreams;
4545
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
46+
import org.springframework.kafka.config.KafkaStreamsConfiguration;
4647
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
4748
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
4849
import org.springframework.kafka.core.KafkaTemplate;
@@ -56,7 +57,6 @@
5657
/**
5758
* @author Elliot Kennedy
5859
* @author Artem Bilan
59-
* @author Gary Russell
6060
*
6161
* @since 1.3.3
6262
*/
@@ -144,13 +144,13 @@ public Map<String, Object> producerConfigs() {
144144
}
145145

146146
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
147-
public Properties kStreamsConfigs() {
148-
Properties props = new Properties();
147+
public KafkaStreamsConfiguration kStreamsConfigs() {
148+
Map<String, Object> props = new HashMap<>();
149149
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
150150
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
151151
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
152152
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
153-
return props;
153+
return new KafkaStreamsConfiguration(props);
154154
}
155155

156156
@Bean

spring-kafka/src/test/java/org/springframework/kafka/kstream/KafkaStreamsJsonSerializationTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.util.HashMap;
2122
import java.util.Map;
22-
import java.util.Properties;
2323
import java.util.UUID;
2424

2525
import org.apache.kafka.clients.consumer.Consumer;
@@ -45,6 +45,7 @@
4545
import org.springframework.context.annotation.Configuration;
4646
import org.springframework.kafka.annotation.EnableKafkaStreams;
4747
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
48+
import org.springframework.kafka.config.KafkaStreamsConfiguration;
4849
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
4950
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
5051
import org.springframework.kafka.core.KafkaTemplate;
@@ -203,11 +204,11 @@ public Map<String, Object> producerConfigs() {
203204
}
204205

205206
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
206-
public Properties kStreamsConfigs() {
207-
Properties props = new Properties();
207+
public KafkaStreamsConfiguration kStreamsConfigs() {
208+
Map<String, Object> props = new HashMap<>();
208209
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
209210
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
210-
return props;
211+
return new KafkaStreamsConfiguration(props);
211212
}
212213

213214
@Bean

spring-kafka/src/test/java/org/springframework/kafka/kstream/KafkaStreamsTests.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.mockito.Mockito.mock;
2121

22+
import java.util.HashMap;
2223
import java.util.Map;
23-
import java.util.Properties;
2424
import java.util.UUID;
2525
import java.util.concurrent.CountDownLatch;
2626
import java.util.concurrent.TimeUnit;
@@ -53,6 +53,7 @@
5353
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
5454
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
5555
import org.springframework.kafka.config.KafkaListenerContainerFactory;
56+
import org.springframework.kafka.config.KafkaStreamsConfiguration;
5657
import org.springframework.kafka.core.ConsumerFactory;
5758
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
5859
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -153,7 +154,7 @@ public void testKStreams() throws Exception {
153154
@Configuration
154155
@EnableKafka
155156
@EnableKafkaStreams
156-
public static class KafkaStreamsConfiguration {
157+
public static class KafkaStreamsConfig {
157158

158159
@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
159160
private String brokerAddresses;
@@ -179,16 +180,16 @@ public KafkaTemplate<Integer, String> template() {
179180
}
180181

181182
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
182-
public Properties kStreamsConfigs() {
183-
Properties props = new Properties();
183+
public KafkaStreamsConfiguration kStreamsConfigs() {
184+
Map<String, Object> props = new HashMap<>();
184185
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
185186
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
186187
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
187188
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
188189
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
189190
WallclockTimestampExtractor.class.getName());
190191
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100");
191-
return props;
192+
return new KafkaStreamsConfiguration(props);
192193
}
193194

194195
@Bean

0 commit comments

Comments
 (0)