Skip to content

Commit 16a12fe

Browse files
asibrossartembilan
authored andcommitted
Fix KafkaListenerEndpointRegistry.stop(Runnable)
The `KafkaListenerEndpointRegistry.stop(Runnable)` doesn't change its `running` state * Removed the callback wrap in favor of setting the state to stop in two places * Set the running state to false before actually calling stop on the containers
1 parent 9bfc880 commit 16a12fe

File tree

2 files changed

+95
-4
lines changed

2 files changed

+95
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
* @author Juergen Hoeller
6161
* @author Artem Bilan
6262
* @author Gary Russell
63+
* @author Asi Bross
6364
*
6465
* @see KafkaListenerEndpoint
6566
* @see MessageListenerContainer
@@ -70,8 +71,7 @@ public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifec
7071

7172
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); //NOSONAR
7273

73-
private final Map<String, MessageListenerContainer> listenerContainers =
74-
new ConcurrentHashMap<String, MessageListenerContainer>();
74+
private final Map<String, MessageListenerContainer> listenerContainers = new ConcurrentHashMap<>();
7575

7676
private int phase = AbstractMessageListenerContainer.DEFAULT_PHASE;
7777

@@ -162,6 +162,7 @@ public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListe
162162
@SuppressWarnings("unchecked")
163163
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
164164
boolean startImmediately) {
165+
165166
Assert.notNull(endpoint, "Endpoint must not be null");
166167
Assert.notNull(factory, "Factory must not be null");
167168

@@ -260,17 +261,19 @@ public void start() {
260261

261262
@Override
262263
public void stop() {
264+
this.running = false;
263265
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
264266
listenerContainer.stop();
265267
}
266-
this.running = false;
267268
}
268269

269270
@Override
270271
public void stop(Runnable callback) {
272+
this.running = false;
271273
Collection<MessageListenerContainer> listenerContainersToStop = getListenerContainers();
272274
if (listenerContainersToStop.size() > 0) {
273-
AggregatingCallback aggregatingCallback = new AggregatingCallback(listenerContainersToStop.size(), callback);
275+
AggregatingCallback aggregatingCallback = new AggregatingCallback(listenerContainersToStop.size(),
276+
callback);
274277
for (MessageListenerContainer listenerContainer : listenerContainersToStop) {
275278
if (listenerContainer.isRunning()) {
276279
listenerContainer.stop(aggregatingCallback);
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright 2017-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
21+
import static org.mockito.Mockito.mock;
22+
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.ExecutionException;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.TimeoutException;
27+
28+
import org.junit.jupiter.api.Test;
29+
30+
import org.springframework.beans.factory.annotation.Autowired;
31+
import org.springframework.context.annotation.Bean;
32+
import org.springframework.context.annotation.Configuration;
33+
import org.springframework.kafka.annotation.EnableKafka;
34+
import org.springframework.kafka.annotation.KafkaListener;
35+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
36+
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
37+
import org.springframework.kafka.core.ConsumerFactory;
38+
import org.springframework.test.annotation.DirtiesContext;
39+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
40+
41+
/**
42+
* @author Asi Bross
43+
*
44+
* @since 2.3.5
45+
*
46+
*/
47+
@SpringJUnitConfig
48+
@DirtiesContext
49+
public class KafkaListenerEndpointRegistryLifecycleTests {
50+
51+
@Autowired
52+
private KafkaListenerEndpointRegistry registry;
53+
54+
@Test
55+
public void lifecycleTest() throws InterruptedException, ExecutionException, TimeoutException {
56+
// Registry is started automatically by the application context
57+
assertThat(registry.isRunning()).isTrue();
58+
59+
this.registry.stop();
60+
assertThat(registry.isRunning()).isFalse();
61+
62+
this.registry.start();
63+
assertThat(registry.isRunning()).isTrue();
64+
65+
CompletableFuture<Boolean> isRunning = new CompletableFuture<>();
66+
this.registry.stop(() -> isRunning.complete(this.registry.isRunning()));
67+
assertThat(isRunning.get(1, TimeUnit.SECONDS)).isFalse();
68+
}
69+
70+
@KafkaListener(topics = "foo", groupId = "bar")
71+
public void listen() {
72+
}
73+
74+
@Configuration
75+
@EnableKafka
76+
public static class Config {
77+
78+
@SuppressWarnings({ "rawtypes", "unchecked" })
79+
@Bean
80+
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
81+
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
82+
factory.setConsumerFactory(mock(ConsumerFactory.class, RETURNS_DEEP_STUBS));
83+
return factory;
84+
}
85+
86+
}
87+
88+
}

0 commit comments

Comments
 (0)