Skip to content

Commit 129b2bf

Browse files
garyrussellartembilan
authored andcommitted
GH-2055: Containers Must Implement DisposableBean
Resolves #2055 If context initialization fails, `Lifecycle.stop()` is not called. Containers must be stopped from `DisposableBean` in this case. **cherry-pick to 2.7.x, 2.6.x, 2.5.x** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java # spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java
1 parent 4cc23d4 commit 129b2bf

File tree

3 files changed

+13
-13
lines changed

3 files changed

+13
-13
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -227,14 +227,7 @@ protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint
227227
@Override
228228
public void destroy() {
229229
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
230-
if (listenerContainer instanceof DisposableBean) {
231-
try {
232-
((DisposableBean) listenerContainer).destroy();
233-
}
234-
catch (Exception ex) {
235-
this.logger.warn(ex, "Failed to destroy message listener container");
236-
}
237-
}
230+
listenerContainer.destroy();
238231
}
239232
}
240233

spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.common.MetricName;
2424
import org.apache.kafka.common.TopicPartition;
2525

26+
import org.springframework.beans.factory.DisposableBean;
2627
import org.springframework.context.SmartLifecycle;
2728
import org.springframework.lang.Nullable;
2829

@@ -34,7 +35,7 @@
3435
* @author Gary Russell
3536
* @author Vladimir Tsanev
3637
*/
37-
public interface MessageListenerContainer extends SmartLifecycle {
38+
public interface MessageListenerContainer extends SmartLifecycle, DisposableBean {
3839

3940
/**
4041
* Setup the message listener to use. Throws an {@link IllegalArgumentException}
@@ -151,4 +152,9 @@ default String getListenerId() {
151152
throw new UnsupportedOperationException("This container does not support retrieving the listener id");
152153
}
153154

155+
@Override
156+
default void destroy() {
157+
stop();
158+
}
159+
154160
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -643,8 +643,9 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
643643
inOrder.verify(consumer).commitSync(anyMap(), any());
644644
inOrder.verify(messageListener).onMessage(any(ConsumerRecord.class));
645645
inOrder.verify(consumer).commitSync(anyMap(), any());
646-
container.stop();
646+
container.destroy();
647647
assertThat(advised).containsExactly("one", "two", "one", "two");
648+
assertThat(container.isRunning()).isFalse();
648649
}
649650

650651
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)