Skip to content

Commit 02129d9

Browse files
garyrussellartembilan
authored andcommitted
GH-762: ConsumerStoppedEvent
Resolves #762 Add an event when a consumer thread exits.
1 parent 82b816d commit 02129d9

File tree

5 files changed

+81
-0
lines changed

5 files changed

+81
-0
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.event;
18+
19+
/**
20+
* An event published when a consumer is stopped. While it is best practice to use
21+
* stateless listeners, you can consume this event to clean up any thread-based resources
22+
* (remove ThreadLocals, destroy thread-scoped beans etc), as long as the context event
23+
* multicaster is not modified to use an async task executor.
24+
*
25+
* @author Gary Russell
26+
* @since 2.2
27+
*
28+
*/
29+
@SuppressWarnings("serial")
30+
public class ConsumerStoppedEvent extends KafkaEvent {
31+
32+
/**
33+
* Construct an instance with the provided source and partitions.
34+
* @param source the container.
35+
*/
36+
public ConsumerStoppedEvent(Object source) {
37+
super(source);
38+
}
39+
40+
@Override
41+
public String toString() {
42+
return "ConsumerStoppedEvent [source=" + getSource() + "]";
43+
}
44+
45+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.springframework.kafka.core.ProducerFactoryUtils;
5959
import org.springframework.kafka.event.ConsumerPausedEvent;
6060
import org.springframework.kafka.event.ConsumerResumedEvent;
61+
import org.springframework.kafka.event.ConsumerStoppedEvent;
6162
import org.springframework.kafka.event.ListenerContainerIdleEvent;
6263
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
6364
import org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback;
@@ -326,6 +327,12 @@ private void publishConsumerResumedEvent(Collection<TopicPartition> partitions)
326327
}
327328
}
328329

330+
private void publishConsumerStoppedEvent() {
331+
if (getApplicationEventPublisher() != null) {
332+
getApplicationEventPublisher().publishEvent(new ConsumerStoppedEvent(this));
333+
}
334+
}
335+
329336
@Override
330337
public String toString() {
331338
return "KafkaMessageListenerContainer [id=" + getBeanName()
@@ -783,6 +790,7 @@ public void run() {
783790
}
784791
this.consumer.close();
785792
this.logger.info("Consumer stopped");
793+
publishConsumerStoppedEvent();
786794
}
787795

788796
/**

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import org.springframework.kafka.core.ProducerFactory;
7777
import org.springframework.kafka.event.ConsumerPausedEvent;
7878
import org.springframework.kafka.event.ConsumerResumedEvent;
79+
import org.springframework.kafka.event.ConsumerStoppedEvent;
7980
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
8081
import org.springframework.kafka.listener.ContainerProperties.AckMode;
8182
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
@@ -1782,13 +1783,17 @@ public void testPauseResume() throws Exception {
17821783
containerProps.setMessageListener((MessageListener) r -> { });
17831784
KafkaMessageListenerContainer<Integer, String> container =
17841785
new KafkaMessageListenerContainer<>(cf, containerProps);
1786+
CountDownLatch stopLatch = new CountDownLatch(1);
17851787
container.setApplicationEventPublisher(e -> {
17861788
if (e instanceof ConsumerPausedEvent) {
17871789
pauseLatch.countDown();
17881790
}
17891791
else if (e instanceof ConsumerResumedEvent) {
17901792
resumeLatch.countDown();
17911793
}
1794+
else if (e instanceof ConsumerStoppedEvent) {
1795+
stopLatch.countDown();
1796+
}
17921797
});
17931798
container.start();
17941799
assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
@@ -1798,6 +1803,7 @@ else if (e instanceof ConsumerResumedEvent) {
17981803
container.resume();
17991804
assertThat(resumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
18001805
container.stop();
1806+
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
18011807
}
18021808

18031809
@SuppressWarnings({ "unchecked", "rawtypes" })

src/reference/asciidoc/kafka.adoc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1475,6 +1475,24 @@ public ConcurrentMessageListenerContainer<String, String>(
14751475
IMPORTANT: Containers created this way are not added to the endpoint registry.
14761476
They should be created as `@Bean` s so that they will be registered with the application context.
14771477

1478+
[[thread-safety]]
1479+
===== Thread Safety
1480+
1481+
When using a concurrent message listener container, a single listener instance is invoked on all consumer threads.
1482+
Listeners, therefore, need to be thread-safe; and it is preferable to use stateless listeners.
1483+
If it is not possible to make your listener thread-safe, or adding synchronization would significantly reduce the benefit of adding concurreny, there are several techniques you can use.
1484+
1485+
. Use `n` containers with `concurrency=1` with a prototype scoped `MessageListener` bean so each container gets its own instance (this is not possible when using `@KafkaListener`).
1486+
. Keep the state in `ThreadLocal<?>` s.
1487+
. Have the singleton listener delegate to a bean that is declared in `SimpleThreadScope` or similar.
1488+
1489+
To facilitate cleaning up thread state (for 2 and 3), starting with version 2.2, the listener container will publish `ConsumerStoppedEvent` s when each thread exits.
1490+
Consume these events with an `ApplicationListener` or `@EventListener` method to remove `ThreadLocal<?>` s, or `remove()` thread-scoped beans from the scope.
1491+
Note that `SimpleThreadScope` does not destroy beans that have a destruction interface (e.g. `DisposableBean`) so you should `destroy()` the instance yourself.
1492+
1493+
IMPORTANT: By default, the application context's event multicaster invokes event listeners on the calling thread.
1494+
If you change the multicaster to use an async executor, thread cleanup will not be effective.
1495+
14781496
[[pause-resume]]
14791497
==== Pausing/Resuming Listener Containers
14801498

src/reference/asciidoc/whats-new.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ A new container property `missingTopicsFatal` has been added.
2727

2828
See <<kafka-container>> for more information.
2929

30+
A `ConsumerStoppedEvent` is now emitted when a consumer terminates.
31+
32+
See <<thread-safety>> for more information.
33+
3034
Batch listeners can optionally receive the complete `ConsumerRecords<?, ?>` object instead of a `List<ConsumerRecord<?, ?>`.
3135

3236
See <<batch-listeners>> for more information.

0 commit comments

Comments
 (0)