Skip to content

Commit e292987

Browse files
garyrussellartembilan
authored andcommitted
GH-1386: Fix StreamsBuilder customization
Resolves #1386 The builder was incorrectly customized in `start()`. It needs to be customized after creation so that when it is used to create `KStream`s elsewhere, it is already customized.
1 parent a7be6ac commit e292987

File tree

2 files changed

+45
-14
lines changed

2 files changed

+45
-14
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,9 @@ protected synchronized StreamsBuilder createInstance() {
224224
Assert.state(this.properties != null,
225225
"streams configuration properties must not be null");
226226
}
227-
return new StreamsBuilder();
227+
StreamsBuilder builder = new StreamsBuilder();
228+
this.infrastructureCustomizer.configureBuilder(builder);
229+
return builder;
228230
}
229231

230232
@Override
@@ -246,9 +248,7 @@ public synchronized void start() {
246248
try {
247249
Assert.state(this.properties != null,
248250
"streams configuration properties must not be null");
249-
StreamsBuilder builder = getObject();
250-
this.infrastructureCustomizer.configureBuilder(builder);
251-
Topology topology = builder.build(this.properties); // NOSONAR: getObject() cannot return null
251+
Topology topology = getObject().build(this.properties); // NOSONAR: getObject() cannot return null
252252
this.infrastructureCustomizer.configureTopology(topology);
253253
LOGGER.debug(() -> topology.describe().toString());
254254
this.kafkaStreams = new KafkaStreams(topology, this.properties, this.clientSupplier);

spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,21 @@
2525
import java.util.concurrent.atomic.AtomicBoolean;
2626

2727
import org.apache.kafka.clients.consumer.ConsumerRecord;
28+
import org.apache.kafka.common.serialization.Serdes;
2829
import org.apache.kafka.streams.KafkaStreams;
30+
import org.apache.kafka.streams.KeyValue;
2931
import org.apache.kafka.streams.StreamsBuilder;
3032
import org.apache.kafka.streams.StreamsConfig;
3133
import org.apache.kafka.streams.Topology;
3234
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
35+
import org.apache.kafka.streams.kstream.KStream;
36+
import org.apache.kafka.streams.kstream.Transformer;
3337
import org.apache.kafka.streams.processor.ProcessorContext;
38+
import org.apache.kafka.streams.state.StoreBuilder;
39+
import org.apache.kafka.streams.state.Stores;
3440
import org.junit.jupiter.api.Test;
3541

3642
import org.springframework.beans.factory.annotation.Autowired;
37-
import org.springframework.beans.factory.annotation.Value;
3843
import org.springframework.context.annotation.Bean;
3944
import org.springframework.context.annotation.Configuration;
4045
import org.springframework.kafka.annotation.EnableKafka;
@@ -45,7 +50,6 @@
4550
import org.springframework.test.annotation.DirtiesContext;
4651
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
4752

48-
4953
/**
5054
* @author Nurettin Yilmaz
5155
* @author Artem Bilan
@@ -75,11 +79,11 @@ public void testKafkaStreamsCustomizer(@Autowired KafkaStreamsConfiguration conf
7579
assertThat(STATE_LISTENER.getCurrentState()).isEqualTo(state);
7680
Properties properties = configuration.asProperties();
7781
assertThat(properties.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG))
78-
.isEqualTo(Collections.singletonList(config.brokerAddresses));
82+
.isEqualTo(Collections.singletonList(config.broker.getBrokersAsString()));
7983
assertThat(properties.get(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG))
8084
.isEqualTo(Foo.class);
8185
assertThat(properties.get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG))
82-
.isEqualTo(1000);
86+
.isEqualTo(1000);
8387
assertThat(this.config.builderConfigured.get()).isTrue();
8488
assertThat(this.config.topologyConfigured.get()).isTrue();
8589
}
@@ -93,20 +97,24 @@ public static class KafkaStreamsConfig {
9397

9498
final AtomicBoolean topologyConfigured = new AtomicBoolean();
9599

96-
@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
97-
private String brokerAddresses;
100+
@Autowired
101+
EmbeddedKafkaBroker broker;
98102

99103
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
100104
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder() {
101-
StreamsBuilderFactoryBean streamsBuilderFactoryBean =
102-
new StreamsBuilderFactoryBean(kStreamsConfigs());
105+
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(kStreamsConfigs());
103106
streamsBuilderFactoryBean.setKafkaStreamsCustomizer(customizer());
104107
streamsBuilderFactoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
105108

106109
@SuppressWarnings("unchecked")
107110
@Override
108111
public void configureBuilder(StreamsBuilder builder) {
109112
KafkaStreamsConfig.this.builderConfigured.set(true);
113+
StoreBuilder<?> storeBuilder = Stores.keyValueStoreBuilder(
114+
Stores.persistentKeyValueStore("testStateStore"),
115+
Serdes.Integer(),
116+
Serdes.String());
117+
builder.addStateStore(storeBuilder);
110118
}
111119

112120
@Override
@@ -122,17 +130,40 @@ public void configureTopology(Topology topology) {
122130
public KafkaStreamsConfiguration kStreamsConfigs() {
123131
Map<String, Object> props = new HashMap<>();
124132
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
125-
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Collections.singletonList(this.brokerAddresses));
133+
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
134+
Collections.singletonList(this.broker.getBrokersAsString()));
126135
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, Foo.class);
127136
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1000);
128137
return new KafkaStreamsConfiguration(props);
129138
}
130139

131-
132140
private KafkaStreamsCustomizer customizer() {
133141
return kafkaStreams -> kafkaStreams.setStateListener(STATE_LISTENER);
134142
}
135143

144+
@Bean
145+
public KStream<String, String> testStream(StreamsBuilder kStreamBuilder) {
146+
KStream<String, String> stream = kStreamBuilder.stream("test_topic");
147+
148+
stream
149+
.transform(() -> new Transformer<String, String, KeyValue<String, String>>() {
150+
@Override
151+
public void init(ProcessorContext context) {
152+
}
153+
154+
@Override
155+
public KeyValue<String, String> transform(String key, String value) {
156+
return null;
157+
}
158+
159+
@Override
160+
public void close() {
161+
}
162+
}, "testStateStore")
163+
.to("test_output");
164+
165+
return stream;
166+
}
136167
}
137168

138169
static class TestStateListener implements KafkaStreams.StateListener {

0 commit comments

Comments
 (0)