Skip to content

Commit e03cac5

Browse files
garyrussellartembilan
authored andcommitted
GH-1236: Add Streams infrastructure customizer
Resolves #1236 Allow modification of the builder and/or topolgy before the stream is created. * Fix typo, docs, javadocs
1 parent a080919 commit e03cac5

File tree

7 files changed

+186
-4
lines changed

7 files changed

+186
-4
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.config;
18+
19+
import java.util.ArrayList;
20+
import java.util.Arrays;
21+
import java.util.List;
22+
23+
import org.apache.kafka.streams.KafkaStreams;
24+
import org.apache.kafka.streams.StreamsBuilder;
25+
import org.apache.kafka.streams.Topology;
26+
27+
/**
28+
* Composite {@link KafkaStreamsInfrastructureCustomizer} customizes {@link KafkaStreams}
29+
* by delegating to a list of provided {@link KafkaStreamsInfrastructureCustomizer}.
30+
*
31+
* @author Gary Russell
32+
*
33+
* @since 2.4.1
34+
*/
35+
public class CompositeKafkaStreamsInfrastructureCustomizer implements KafkaStreamsInfrastructureCustomizer {
36+
37+
private final List<KafkaStreamsInfrastructureCustomizer> infrastructureCustomizers = new ArrayList<>();
38+
39+
/**
40+
* Construct an instance with the provided customizers.
41+
* @param customizers the customizers;
42+
*/
43+
public CompositeKafkaStreamsInfrastructureCustomizer(KafkaStreamsInfrastructureCustomizer... customizers) {
44+
this.infrastructureCustomizers.addAll(Arrays.asList(customizers));
45+
}
46+
47+
/**
48+
* Add customizers.
49+
* @param customizers the customizers.
50+
*/
51+
public void addKafkaStreamsCustomizers(KafkaStreamsInfrastructureCustomizer... customizers) {
52+
this.infrastructureCustomizers.addAll(Arrays.asList(customizers));
53+
}
54+
55+
@Override
56+
public void configureBuilder(StreamsBuilder builder) {
57+
this.infrastructureCustomizers.forEach(cust -> cust.configureBuilder(builder));
58+
}
59+
60+
@Override
61+
public void configureTopology(Topology topology) {
62+
this.infrastructureCustomizers.forEach(cust -> cust.configureTopology(topology));
63+
}
64+
65+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.config;
18+
19+
import org.apache.kafka.streams.StreamsBuilder;
20+
import org.apache.kafka.streams.Topology;
21+
22+
/**
23+
* A customizer for infrastructure components such as the {@code StreamsBuilder} and
24+
* {@code Topology}. It can be provided to the {@link StreamsBuilderFactoryBean} which
25+
* will apply the changes before creating the stream.
26+
*
27+
* @author Gary Russell
28+
* @since 2.4.1
29+
*
30+
*/
31+
public interface KafkaStreamsInfrastructureCustomizer {
32+
33+
/**
34+
* Configure the builder.
35+
* @param builder the builder.
36+
*/
37+
default void configureBuilder(StreamsBuilder builder) {
38+
// no-op
39+
}
40+
41+
/**
42+
* Configure the topology.
43+
* @param topology the topology
44+
*/
45+
default void configureTopology(Topology topology) {
46+
// no-op
47+
}
48+
49+
}

spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -72,6 +72,9 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilde
7272

7373
private final CleanupConfig cleanupConfig;
7474

75+
private KafkaStreamsInfrastructureCustomizer infrastructureCustomizer = new KafkaStreamsInfrastructureCustomizer() {
76+
};
77+
7578
private KafkaStreamsCustomizer kafkaStreamsCustomizer;
7679

7780
private KafkaStreams.StateListener stateListener;
@@ -144,6 +147,16 @@ public void setClientSupplier(KafkaClientSupplier clientSupplier) {
144147
this.clientSupplier = clientSupplier; // NOSONAR (sync)
145148
}
146149

150+
/**
151+
* Set a customizer to configure the builder and/or topology before creating the stream.
152+
* @param infrastructureCustomizer the customizer
153+
* @since 2.4.1
154+
*/
155+
public void setInfrastructureCustomizer(KafkaStreamsInfrastructureCustomizer infrastructureCustomizer) {
156+
Assert.notNull(infrastructureCustomizer, "'infrastructureCustomizer' must not be null");
157+
this.infrastructureCustomizer = infrastructureCustomizer;
158+
}
159+
147160
/**
148161
* Specify a {@link KafkaStreamsCustomizer} to customize a {@link KafkaStreams}
149162
* instance during {@link #start()}.
@@ -233,7 +246,10 @@ public synchronized void start() {
233246
try {
234247
Assert.state(this.properties != null,
235248
"streams configuration properties must not be null");
236-
Topology topology = getObject().build(this.properties); // NOSONAR: getObject() cannot return null
249+
StreamsBuilder builder = getObject();
250+
this.infrastructureCustomizer.configureBuilder(builder);
251+
Topology topology = builder.build(this.properties); // NOSONAR: getObject() cannot return null
252+
this.infrastructureCustomizer.configureTopology(topology);
237253
LOGGER.debug(() -> topology.describe().toString());
238254
this.kafkaStreams = new KafkaStreams(topology, this.properties, this.clientSupplier);
239255
this.kafkaStreams.setStateListener(this.stateListener);

spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanCustomizer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,10 +22,13 @@
2222
import java.util.HashMap;
2323
import java.util.Map;
2424
import java.util.Properties;
25+
import java.util.concurrent.atomic.AtomicBoolean;
2526

2627
import org.apache.kafka.clients.consumer.ConsumerRecord;
2728
import org.apache.kafka.streams.KafkaStreams;
29+
import org.apache.kafka.streams.StreamsBuilder;
2830
import org.apache.kafka.streams.StreamsConfig;
31+
import org.apache.kafka.streams.Topology;
2932
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
3033
import org.apache.kafka.streams.processor.ProcessorContext;
3134
import org.junit.jupiter.api.Test;
@@ -61,6 +64,9 @@ public class KafkaStreamsCustomizerTests {
6164
@Autowired
6265
private StreamsBuilderFactoryBean streamsBuilderFactoryBean;
6366

67+
@Autowired
68+
private KafkaStreamsConfig config;
69+
6470
@Test
6571
public void testKafkaStreamsCustomizer(@Autowired KafkaStreamsConfiguration configuration,
6672
@Autowired KafkaStreamsConfig config) {
@@ -74,13 +80,19 @@ public void testKafkaStreamsCustomizer(@Autowired KafkaStreamsConfiguration conf
7480
.isEqualTo(Foo.class);
7581
assertThat(properties.get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG))
7682
.isEqualTo(1000);
83+
assertThat(this.config.builderConfigured.get()).isTrue();
84+
assertThat(this.config.topologyConfigured.get()).isTrue();
7785
}
7886

7987
@Configuration
8088
@EnableKafka
8189
@EnableKafkaStreams
8290
public static class KafkaStreamsConfig {
8391

92+
final AtomicBoolean builderConfigured = new AtomicBoolean();
93+
94+
final AtomicBoolean topologyConfigured = new AtomicBoolean();
95+
8496
@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
8597
private String brokerAddresses;
8698

@@ -89,6 +101,20 @@ public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder() {
89101
StreamsBuilderFactoryBean streamsBuilderFactoryBean =
90102
new StreamsBuilderFactoryBean(kStreamsConfigs());
91103
streamsBuilderFactoryBean.setKafkaStreamsCustomizer(customizer());
104+
streamsBuilderFactoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
105+
106+
@SuppressWarnings("unchecked")
107+
@Override
108+
public void configureBuilder(StreamsBuilder builder) {
109+
KafkaStreamsConfig.this.builderConfigured.set(true);
110+
}
111+
112+
@Override
113+
public void configureTopology(Topology topology) {
114+
KafkaStreamsConfig.this.topologyConfigured.set(true);
115+
}
116+
117+
});
92118
return streamsBuilderFactoryBean;
93119
}
94120

src/reference/asciidoc/streams.adoc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ In other words, all streams defined by a `StreamsBuilder` are tied with a single
4343
Once a `KafkaStreams` instance has been closed by `streams.close()`, it cannot be restarted.
4444
Instead, a new `KafkaStreams` instance to restart stream processing must be created.
4545

46+
[[streams-spring]]
4647
==== Spring Management
4748

4849
To simplify using Kafka Streams from the Spring application context perspective and use the lifecycle management through a container, the Spring for Apache Kafka introduces `StreamsBuilderFactoryBean`.
@@ -121,6 +122,25 @@ private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
121122
----
122123
====
123124

125+
Starting with version 2.4.1, the factory bean has a new property `infrastructureCustomizer` with type `KafkaStreamsInfrastructureCustomizer`; this allows customization of the `StreamsBuilder` (e.g. to add a state store) and/or the `Topology` before the stream is created.
126+
127+
====
128+
[source, java]
129+
----
130+
public interface KafkaStreamsInfrastructureCustomizer {
131+
132+
void configureBuilder(StreamsBuilder builder);
133+
134+
void configureTopology(Topology topology);
135+
136+
}
137+
----
138+
====
139+
140+
Default no-op implementations are provided to avoid having to implement both methods if one is not required.
141+
142+
A `CompositeKafkaStreamsInfrastructureCustomizer` is provided, for when you need to apply multiple customizers.
143+
124144
[[serde]]
125145
==== Streams JSON Serialization and Deserialization
126146

src/reference/asciidoc/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ The `@KafkaListener` annotation has a new property `splitIterables`; default tru
4444
When a replying listener returns an `Iterable` this property controls whether the return result is sent as a single record or a record for each element is sent.
4545
See <<annotation-send-to>> for more information.
4646

47+
==== Kafka Streams
48+
49+
The `StreamsBuilderFactoryBean` accepts a new property `KafkaStreamsInfrastructureCustomizer`.
50+
This allows configuration of the builder and/or topology before the stream is created.
51+
See <<streams-spring>> for more information.
52+
4753
=== Migration Guide
4854

4955
* This release is essentially the same as the 2.3.x line, except it has been compiled against the 2.4 `kafka-clients` jar, due to a binary incompatibility.

0 commit comments

Comments
 (0)