Skip to content

Customize Instantiation of KafkaStreams in StreamsBuilderFactoryBeanΒ #3515

@agavra

Description

@agavra

Context

We'd like to be able to customize the construction of the KafkaStreams instance when using the StreamsBuilderFactoryBean in order to add native spring-kafka support for Responsive and other potential drop in replacements for Kafka Streams.

I am happy to contribute this change.

Proposed API

We will extend KafkaStreamsCustomizer to include an initialization method:

public interface KafkaStreamsCustomizer {

    // ...
    
    default KafkaStreams initializeKafkaStreams(Topology topology, Properties properties, KafkaClientSupplier supplier) {
        return new KafkaStreams(topology, properties, supplier);
    }

}

The StreamsBuilderFactoryBean will use this method when instantiating a new instance of KafkaStreams.

Example Usage

    @Bean
    public StreamsBuilderFactoryBeanConfigurer configurer() {
        return fb -> fb.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
           @Override KafkaStreams initializeKafkaStreams(Topology topology, Properties properties, KafkaClientSuppiler supplier) {
              // note that ResponsiveKafkaStreams extends KafkaStreams so this is a drop-in replacement
              return new ResponsiveKafkaStreams(topology, properties, supplier);
           }
        };
    }

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions