diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc index 40b9dee6c2..e139c15e25 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc @@ -82,9 +82,16 @@ A new `KafkaStreams` is created on each `start()`. You might also consider using different `StreamsBuilderFactoryBean` instances, if you would like to control the lifecycles for `KStream` instances separately. You also can specify `KafkaStreams.StateListener`, `Thread.UncaughtExceptionHandler`, and `StateRestoreListener` options on the `StreamsBuilderFactoryBean`, which are delegated to the internal `KafkaStreams` instance. -Also, apart from setting those options indirectly on `StreamsBuilderFactoryBean`, starting with _version 2.1.5_, you can use a `KafkaStreamsCustomizer` callback interface to configure an inner `KafkaStreams` instance. + +Also, apart from setting those options indirectly on `StreamsBuilderFactoryBean`, you can use a `KafkaStreamsCustomizer` callback interface to: + +1. (from _version 2.1.5_) configure an inner `KafkaStreams` instance using `customize(KafkaStreams)` +2. (from _version 3.3.0_) instantiate a custom implementation of `KafkaStreams` using `initKafkaStreams(Topology, Properties, KafkaClientSupplier)` + Note that `KafkaStreamsCustomizer` overrides the options provided by `StreamsBuilderFactoryBean`. + If you need to perform some `KafkaStreams` operations directly, you can access that internal `KafkaStreams` instance by using `StreamsBuilderFactoryBean.getKafkaStreams()`. + You can autowire `StreamsBuilderFactoryBean` bean by type, but you should be sure to use the full type in the bean definition, as the following example shows: [source,java] diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index b42b7e7723..ad8caa2e02 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -45,3 +45,8 @@ For more details, see xref:kafka/sending-messages.adoc[Sending Messages] section === Customizing Logging in DeadLetterPublishingRecovererFactory When using `DeadLetterPublishingRecovererFactory`, the user applications can override the `maybeLogListenerException` method to customize the logging behavior. + +[[x33-customize-kafka-streams-implementation]] +=== Customizing The Implementation of Kafka Streams + +When using `KafkaStreamsCustomizer` it is now possible to return a custom implementation of the `KafkaStreams` object by overriding the `initKafkaStreams` method. \ No newline at end of file diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaStreamsCustomizer.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaStreamsCustomizer.java index 8155883ab3..28b43c6452 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaStreamsCustomizer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaStreamsCustomizer.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,12 +16,17 @@ package org.springframework.kafka.config; +import java.util.Properties; + +import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.Topology; /** * Callback interface that can be used to configure {@link KafkaStreams} directly. * * @author Nurettin Yilmaz + * @author Almog Gavra * * @since 2.1.5 * @@ -30,6 +35,32 @@ @FunctionalInterface public interface KafkaStreamsCustomizer { + /** + * Customize the instantiation of the {@code KafkaStreams} instance. This + * happens before the modifications made by {@link StreamsBuilderFactoryBean}. + * + * @param topology the full topology + * @param properties the configuration properties + * @param clientSupplier the client supplier + * + * @return a new instance of {@link KafkaStreams} + * + * @since 3.3.0 + */ + default KafkaStreams initKafkaStreams( + Topology topology, + Properties properties, + KafkaClientSupplier clientSupplier + ) { + return new KafkaStreams(topology, properties, clientSupplier); + } + + /** + * Customize the instance of {@code KafkaStreams} after {@link StreamsBuilderFactoryBean} + * has applied its default configurations. + * + * @param kafkaStreams the instantiated Kafka Streams instance + */ void customize(KafkaStreams kafkaStreams); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java index dc391ddb91..19ce9e0abf 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java @@ -61,6 +61,7 @@ * @author Julien Wittouck * @author Sanghyeok An * @author Cédric Schaller + * @author Almog Gavra * * @since 1.1.4 */ @@ -92,7 +93,7 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean { }; private KafkaStreams.StateListener stateListener; @@ -361,15 +362,15 @@ public void start() { try { Assert.state(this.properties != null, "streams configuration properties must not be null"); - this.kafkaStreams = new KafkaStreams(this.topology, this.properties, this.clientSupplier); + this.kafkaStreams = this.kafkaStreamsCustomizer.initKafkaStreams( + this.topology, this.properties, this.clientSupplier + ); this.kafkaStreams.setStateListener(this.stateListener); this.kafkaStreams.setGlobalStateRestoreListener(this.stateRestoreListener); if (this.streamsUncaughtExceptionHandler != null) { this.kafkaStreams.setUncaughtExceptionHandler(this.streamsUncaughtExceptionHandler); } - if (this.kafkaStreamsCustomizer != null) { - this.kafkaStreamsCustomizer.customize(this.kafkaStreams); - } + this.kafkaStreamsCustomizer.customize(this.kafkaStreams); if (this.cleanupConfig.cleanupOnStart()) { this.kafkaStreams.cleanUp(); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java index d3b02aea2c..0efdc1d633 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -58,6 +59,7 @@ /** * @author Nurettin Yilmaz * @author Artem Bilan + * @author Almog Gavra * * @since 2.1.5 */ @@ -95,6 +97,7 @@ public void testKafkaStreamsCustomizer(@Autowired KafkaStreamsConfiguration conf .isEqualTo(1000); assertThat(this.config.builderConfigured.get()).isTrue(); assertThat(this.config.topologyConfigured.get()).isTrue(); + assertThat(this.config.ksInitialized.get()).isTrue(); assertThat(this.meterRegistry.get("kafka.consumer.coordinator.join.total") .tag("customTag", "stream") .tag("spring.id", KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME) @@ -118,6 +121,8 @@ public static class KafkaStreamsConfig { final AtomicBoolean topologyConfigured = new AtomicBoolean(); + final AtomicBoolean ksInitialized = new AtomicBoolean(); + @Autowired EmbeddedKafkaBroker broker; @@ -168,7 +173,26 @@ public KafkaStreamsConfiguration kStreamsConfigs() { } private KafkaStreamsCustomizer customizer() { - return kafkaStreams -> kafkaStreams.setStateListener(STATE_LISTENER); + return new KafkaStreamsCustomizer() { + @Override + public KafkaStreams initKafkaStreams( + final Topology topology, + final Properties properties, + final KafkaClientSupplier clientSupplier + ) { + ksInitialized.set(true); + return KafkaStreamsCustomizer.super.initKafkaStreams( + topology, + properties, + clientSupplier + ); + } + + @Override + public void customize(final KafkaStreams kafkaStreams) { + kafkaStreams.setStateListener(STATE_LISTENER); + } + }; } @Bean