Skip to content

Commit e148db5

Browse files
sobychackogaryrussell
authored andcommitted
GH-1400: Expose topology from StreamsBuilder
Resolves #1400 Currently, StreamsBuilderFacotryBean does not give direct access to the toplogy object. It will be beneficial for downstream clients such as the Kafka Strems binder in Spring Cloud Stream to get access to the topology object directly.
1 parent f57534f commit e148db5

File tree

2 files changed

+15
-1
lines changed

2 files changed

+15
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilde
9292

9393
private volatile boolean running;
9494

95+
private Topology topology;
96+
9597
/**
9698
* Default constructor that creates the factory without configuration
9799
* {@link Properties}. It is the factory user's responsibility to properly set
@@ -254,6 +256,15 @@ public void setCloseTimeout(int closeTimeout) {
254256
this.closeTimeout = Duration.ofSeconds(closeTimeout); // NOSONAR (sync)
255257
}
256258

259+
/**
260+
* Providing access to the associated {@link Topology} of this {@link StreamsBuilderFactoryBean}.
261+
* @return {@link Topology} object
262+
* @since 2.3.7
263+
*/
264+
public Topology getTopology() {
265+
return this.topology;
266+
}
267+
257268
@Override
258269
public Class<?> getObjectType() {
259270
return StreamsBuilder.class;
@@ -311,6 +322,7 @@ public synchronized void start() {
311322
Assert.state(this.properties != null,
312323
"streams configuration properties must not be null");
313324
Topology topology = getObject().build(this.properties); // NOSONAR: getObject() cannot return null
325+
this.topology = topology;
314326
LOGGER.debug(() -> topology.describe().toString());
315327
this.kafkaStreams = new KafkaStreams(topology, this.properties, this.clientSupplier);
316328
this.kafkaStreams.setStateListener(this.stateListener);

spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,6 +51,7 @@
5151
* @author Artem Bilan
5252
* @author Gary Russell
5353
* @author Denis Washington
54+
* @author Soby Chacko
5455
*/
5556
@SpringJUnitConfig
5657
@DirtiesContext
@@ -99,6 +100,7 @@ protected StreamsBuilder createInstance() {
99100
streamsBuilderFactoryBean.start();
100101
StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject();
101102
verify(streamsBuilder).build(kafkaStreamsConfiguration.asProperties());
103+
assertThat(streamsBuilderFactoryBean.getTopology()).isNotNull();
102104
}
103105

104106
@Test

0 commit comments

Comments
 (0)