Skip to content

Commit 52019c2

Browse files
authored
GH-3515: Support custom KafkaStreams implementations
Fixes: #3515 #3515 - Extend `KafkaStreamsCustomizer` with `initKafkaStreams` method - Update `StreamsBuilderFactoryBean` to use the new method - Update documentation and what's-new section - Fix formatting and address checkstyle issues
1 parent f3bf1f0 commit 52019c2

File tree

5 files changed

+77
-9
lines changed

5 files changed

+77
-9
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,16 @@ A new `KafkaStreams` is created on each `start()`.
8282
You might also consider using different `StreamsBuilderFactoryBean` instances, if you would like to control the lifecycles for `KStream` instances separately.
8383

8484
You also can specify `KafkaStreams.StateListener`, `Thread.UncaughtExceptionHandler`, and `StateRestoreListener` options on the `StreamsBuilderFactoryBean`, which are delegated to the internal `KafkaStreams` instance.
85-
Also, apart from setting those options indirectly on `StreamsBuilderFactoryBean`, starting with _version 2.1.5_, you can use a `KafkaStreamsCustomizer` callback interface to configure an inner `KafkaStreams` instance.
85+
86+
Also, apart from setting those options indirectly on `StreamsBuilderFactoryBean`, you can use a `KafkaStreamsCustomizer` callback interface to:
87+
88+
1. (from _version 2.1.5_) configure an inner `KafkaStreams` instance using `customize(KafkaStreams)`
89+
2. (from _version 3.3.0_) instantiate a custom implementation of `KafkaStreams` using `initKafkaStreams(Topology, Properties, KafkaClientSupplier)`
90+
8691
Note that `KafkaStreamsCustomizer` overrides the options provided by `StreamsBuilderFactoryBean`.
92+
8793
If you need to perform some `KafkaStreams` operations directly, you can access that internal `KafkaStreams` instance by using `StreamsBuilderFactoryBean.getKafkaStreams()`.
94+
8895
You can autowire `StreamsBuilderFactoryBean` bean by type, but you should be sure to use the full type in the bean definition, as the following example shows:
8996

9097
[source,java]

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,8 @@ For more details, see xref:kafka/sending-messages.adoc[Sending Messages] section
4545
=== Customizing Logging in DeadLetterPublishingRecovererFactory
4646

4747
When using `DeadLetterPublishingRecovererFactory`, the user applications can override the `maybeLogListenerException` method to customize the logging behavior.
48+
49+
[[x33-customize-kafka-streams-implementation]]
50+
=== Customizing The Implementation of Kafka Streams
51+
52+
When using `KafkaStreamsCustomizer` it is now possible to return a custom implementation of the `KafkaStreams` object by overriding the `initKafkaStreams` method.

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaStreamsCustomizer.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,12 +16,17 @@
1616

1717
package org.springframework.kafka.config;
1818

19+
import java.util.Properties;
20+
21+
import org.apache.kafka.streams.KafkaClientSupplier;
1922
import org.apache.kafka.streams.KafkaStreams;
23+
import org.apache.kafka.streams.Topology;
2024

2125
/**
2226
* Callback interface that can be used to configure {@link KafkaStreams} directly.
2327
*
2428
* @author Nurettin Yilmaz
29+
* @author Almog Gavra
2530
*
2631
* @since 2.1.5
2732
*
@@ -30,6 +35,32 @@
3035
@FunctionalInterface
3136
public interface KafkaStreamsCustomizer {
3237

38+
/**
39+
* Customize the instantiation of the {@code KafkaStreams} instance. This
40+
* happens before the modifications made by {@link StreamsBuilderFactoryBean}.
41+
*
42+
* @param topology the full topology
43+
* @param properties the configuration properties
44+
* @param clientSupplier the client supplier
45+
*
46+
* @return a new instance of {@link KafkaStreams}
47+
*
48+
* @since 3.3.0
49+
*/
50+
default KafkaStreams initKafkaStreams(
51+
Topology topology,
52+
Properties properties,
53+
KafkaClientSupplier clientSupplier
54+
) {
55+
return new KafkaStreams(topology, properties, clientSupplier);
56+
}
57+
58+
/**
59+
* Customize the instance of {@code KafkaStreams} after {@link StreamsBuilderFactoryBean}
60+
* has applied its default configurations.
61+
*
62+
* @param kafkaStreams the instantiated Kafka Streams instance
63+
*/
3364
void customize(KafkaStreams kafkaStreams);
3465

3566
}

spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
* @author Julien Wittouck
6262
* @author Sanghyeok An
6363
* @author Cédric Schaller
64+
* @author Almog Gavra
6465
*
6566
* @since 1.1.4
6667
*/
@@ -92,7 +93,7 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilde
9293
private KafkaStreamsInfrastructureCustomizer infrastructureCustomizer = new KafkaStreamsInfrastructureCustomizer() {
9394
};
9495

95-
private KafkaStreamsCustomizer kafkaStreamsCustomizer;
96+
private KafkaStreamsCustomizer kafkaStreamsCustomizer = kafkaStreams -> { };
9697

9798
private KafkaStreams.StateListener stateListener;
9899

@@ -361,15 +362,15 @@ public void start() {
361362
try {
362363
Assert.state(this.properties != null,
363364
"streams configuration properties must not be null");
364-
this.kafkaStreams = new KafkaStreams(this.topology, this.properties, this.clientSupplier);
365+
this.kafkaStreams = this.kafkaStreamsCustomizer.initKafkaStreams(
366+
this.topology, this.properties, this.clientSupplier
367+
);
365368
this.kafkaStreams.setStateListener(this.stateListener);
366369
this.kafkaStreams.setGlobalStateRestoreListener(this.stateRestoreListener);
367370
if (this.streamsUncaughtExceptionHandler != null) {
368371
this.kafkaStreams.setUncaughtExceptionHandler(this.streamsUncaughtExceptionHandler);
369372
}
370-
if (this.kafkaStreamsCustomizer != null) {
371-
this.kafkaStreamsCustomizer.customize(this.kafkaStreams);
372-
}
373+
this.kafkaStreamsCustomizer.customize(this.kafkaStreams);
373374
if (this.cleanupConfig.cleanupOnStart()) {
374375
this.kafkaStreams.cleanUp();
375376
}

spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626

2727
import org.apache.kafka.clients.consumer.ConsumerRecord;
2828
import org.apache.kafka.common.serialization.Serdes;
29+
import org.apache.kafka.streams.KafkaClientSupplier;
2930
import org.apache.kafka.streams.KafkaStreams;
3031
import org.apache.kafka.streams.StreamsBuilder;
3132
import org.apache.kafka.streams.StreamsConfig;
@@ -58,6 +59,7 @@
5859
/**
5960
* @author Nurettin Yilmaz
6061
* @author Artem Bilan
62+
* @author Almog Gavra
6163
*
6264
* @since 2.1.5
6365
*/
@@ -95,6 +97,7 @@ public void testKafkaStreamsCustomizer(@Autowired KafkaStreamsConfiguration conf
9597
.isEqualTo(1000);
9698
assertThat(this.config.builderConfigured.get()).isTrue();
9799
assertThat(this.config.topologyConfigured.get()).isTrue();
100+
assertThat(this.config.ksInitialized.get()).isTrue();
98101
assertThat(this.meterRegistry.get("kafka.consumer.coordinator.join.total")
99102
.tag("customTag", "stream")
100103
.tag("spring.id", KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
@@ -118,6 +121,8 @@ public static class KafkaStreamsConfig {
118121

119122
final AtomicBoolean topologyConfigured = new AtomicBoolean();
120123

124+
final AtomicBoolean ksInitialized = new AtomicBoolean();
125+
121126
@Autowired
122127
EmbeddedKafkaBroker broker;
123128

@@ -168,7 +173,26 @@ public KafkaStreamsConfiguration kStreamsConfigs() {
168173
}
169174

170175
private KafkaStreamsCustomizer customizer() {
171-
return kafkaStreams -> kafkaStreams.setStateListener(STATE_LISTENER);
176+
return new KafkaStreamsCustomizer() {
177+
@Override
178+
public KafkaStreams initKafkaStreams(
179+
final Topology topology,
180+
final Properties properties,
181+
final KafkaClientSupplier clientSupplier
182+
) {
183+
ksInitialized.set(true);
184+
return KafkaStreamsCustomizer.super.initKafkaStreams(
185+
topology,
186+
properties,
187+
clientSupplier
188+
);
189+
}
190+
191+
@Override
192+
public void customize(final KafkaStreams kafkaStreams) {
193+
kafkaStreams.setStateListener(STATE_LISTENER);
194+
}
195+
};
172196
}
173197

174198
@Bean

0 commit comments

Comments
 (0)