Skip to content

Commit 41ea830

Browse files
garyrussellartembilan
authored andcommitted
GH-1501: StreamsBuilderFB.Listener Improvement
Resolves #1501 - add support for multiple listeners - consistent with consumer/producer factories
1 parent fe6255b commit 41ea830

File tree

2 files changed

+38
-16
lines changed

2 files changed

+38
-16
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
package org.springframework.kafka.config;
1818

1919
import java.time.Duration;
20+
import java.util.ArrayList;
21+
import java.util.Collections;
22+
import java.util.List;
2023
import java.util.Properties;
2124

2225
import org.apache.commons.logging.LogFactory;
@@ -74,6 +77,8 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilde
7477

7578
private final CleanupConfig cleanupConfig;
7679

80+
private final List<Listener> listeners = new ArrayList<>();
81+
7782
private KafkaStreamsInfrastructureCustomizer infrastructureCustomizer = new KafkaStreamsInfrastructureCustomizer() {
7883
};
7984

@@ -97,8 +102,6 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilde
97102

98103
private Topology topology;
99104

100-
private Listener listener = new Listener() { };
101-
102105
private String beanName;
103106

104107
/**
@@ -203,17 +206,6 @@ public void setCloseTimeout(int closeTimeout) {
203206
this.closeTimeout = Duration.ofSeconds(closeTimeout); // NOSONAR (sync)
204207
}
205208

206-
/**
207-
* Set a {@link Listener} which will be called after starting and stopping the
208-
* streams.
209-
* @param listener the listener.
210-
* @since 2.5.3
211-
*/
212-
public void setListener(Listener listener) {
213-
Assert.notNull(listener, "'listener' cannot be null");
214-
this.listener = listener;
215-
}
216-
217209
/**
218210
* Providing access to the associated {@link Topology} of this
219211
* {@link StreamsBuilderFactoryBean}.
@@ -252,6 +244,36 @@ public KafkaStreams getKafkaStreams() {
252244
return this.kafkaStreams;
253245
}
254246

247+
/**
248+
* Get the current list of listeners.
249+
* @return the listeners.
250+
* @since 2.5.3
251+
*/
252+
public List<Listener> getListeners() {
253+
return Collections.unmodifiableList(this.listeners);
254+
}
255+
256+
/**
257+
* Add a {@link Listener} which will be called after starting and stopping the
258+
* streams.
259+
* @param listener the listener.
260+
* @since 2.5.3
261+
*/
262+
public void addListener(Listener listener) {
263+
Assert.notNull(listener, "'listener' cannot be null");
264+
this.listeners.add(listener);
265+
}
266+
267+
/**
268+
* Remove a listener.
269+
* @param listener the listener.
270+
* @return true if removed.
271+
* @since 2.5.3
272+
*/
273+
public boolean removeListener(Listener listener) {
274+
return this.listeners.remove(listener);
275+
}
276+
255277
@Override
256278
protected synchronized StreamsBuilder createInstance() {
257279
if (this.autoStartup) {
@@ -297,7 +319,7 @@ public synchronized void start() {
297319
this.kafkaStreams.cleanUp();
298320
}
299321
this.kafkaStreams.start();
300-
this.listener.streamsAdded(this.beanName, this.kafkaStreams);
322+
this.listeners.forEach(listener -> listener.streamsAdded(this.beanName, this.kafkaStreams));
301323
this.running = true;
302324
}
303325
catch (Exception e) {
@@ -315,7 +337,7 @@ public synchronized void stop() {
315337
if (this.cleanupConfig.cleanupOnStop()) {
316338
this.kafkaStreams.cleanUp();
317339
}
318-
this.listener.streamsRemoved(this.beanName, this.kafkaStreams);
340+
this.listeners.forEach(listener -> listener.streamsRemoved(this.beanName, this.kafkaStreams));
319341
this.kafkaStreams = null;
320342
}
321343
}

spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public void configureTopology(Topology topology) {
149149
}
150150

151151
});
152-
streamsBuilderFactoryBean.setListener(new KafkaStreamsMicrometerListener(meterRegistry(),
152+
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry(),
153153
Collections.singletonList(new ImmutableTag("customTag", "stream"))));
154154
return streamsBuilderFactoryBean;
155155
}

0 commit comments

Comments
 (0)