Skip to content

Commit 09fb266

Browse files
garyrussellartembilan
authored andcommitted
GH-1672: Option to Immediately Stop the Container
Resolves #1672 Previously (and still, by default), stopping the listener container does not take effect until the records from the previous poll are all processed. Add an option to stop after the current record, instead. **cherry-pick to 2.6.x, 1.5.x** # Conflicts: # src/reference/asciidoc/whats-new.adoc # Conflicts: # src/reference/asciidoc/whats-new.adoc
1 parent c54d4f5 commit 09fb266

File tree

4 files changed

+82
-1
lines changed

4 files changed

+82
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,8 @@ public enum EOSMode {
256256

257257
private boolean stopContainerWhenFenced;
258258

259+
private boolean stopImmediate;
260+
259261
/**
260262
* Create properties for a container that will subscribe to the specified topics.
261263
* @param topics the topics.
@@ -771,6 +773,26 @@ public void setStopContainerWhenFenced(boolean stopContainerWhenFenced) {
771773
this.stopContainerWhenFenced = stopContainerWhenFenced;
772774
}
773775

776+
/**
777+
* When true, the container will be stopped immediately after processing the current record.
778+
* @return true to stop immediately.
779+
* @since 2.5.11
780+
*/
781+
public boolean isStopImmediate() {
782+
return this.stopImmediate;
783+
}
784+
785+
/**
786+
* Set to true to stop the container after processing the current record (when stop()
787+
* is called). When false (default), the container will stop after all the results of
788+
* the previous poll are processed.
789+
* @param stopImmediate true to stop after the current record.
790+
* @since 2.5.11
791+
*/
792+
public void setStopImmediate(boolean stopImmediate) {
793+
this.stopImmediate = stopImmediate;
794+
}
795+
774796
private void adviseListenerIfNeeded() {
775797
if (!CollectionUtils.isEmpty(this.adviceChain)) {
776798
if (AopUtils.isAopProxy(this.messageListener)) {

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
582582

583583
private final boolean fixTxOffsets = this.containerProperties.isFixTxOffsets();
584584

585+
private final boolean stopImmediate = this.containerProperties.isStopImmediate();
586+
585587
private Map<TopicPartition, OffsetMetadata> definedPartitions;
586588

587589
private int count;
@@ -1793,6 +1795,9 @@ private void invokeRecordListener(final ConsumerRecords<K, V> records) {
17931795
private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
17941796
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
17951797
while (iterator.hasNext()) {
1798+
if (this.stopImmediate && !isRunning()) {
1799+
break;
1800+
}
17961801
final ConsumerRecord<K, V> record = checkEarlyIntercept(iterator.next());
17971802
if (record == null) {
17981803
continue;
@@ -1882,6 +1887,9 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
18821887
private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
18831888
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
18841889
while (iterator.hasNext()) {
1890+
if (this.stopImmediate && !isRunning()) {
1891+
break;
1892+
}
18851893
final ConsumerRecord<K, V> record = checkEarlyIntercept(iterator.next());
18861894
if (record == null) {
18871895
continue;

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -3142,6 +3142,53 @@ void commitAfterHandleManual() throws InterruptedException {
31423142
verify(consumer).commitSync(any(), any());
31433143
}
31443144

3145+
@Test
3146+
@SuppressWarnings({ "unchecked", "rawtypes" })
3147+
void stopImmediately() throws InterruptedException {
3148+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
3149+
Consumer<Integer, String> consumer = mock(Consumer.class);
3150+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
3151+
Map<String, Object> cfProps = new HashMap<>();
3152+
cfProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 45000); // wins
3153+
given(cf.getConfigurationProperties()).willReturn(cfProps);
3154+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records =
3155+
Map.of(new TopicPartition("foo", 0), Arrays.asList(new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
3156+
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
3157+
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
3158+
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
3159+
AtomicBoolean first = new AtomicBoolean(true);
3160+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
3161+
Thread.sleep(50);
3162+
return first.getAndSet(false) ? consumerRecords : emptyRecords;
3163+
});
3164+
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
3165+
new TopicPartitionOffset("foo", 0) };
3166+
ContainerProperties containerProps = new ContainerProperties(topicPartition);
3167+
containerProps.setGroupId("grp");
3168+
containerProps.setClientId("clientId");
3169+
containerProps.setStopImmediate(true);
3170+
AtomicInteger delivered = new AtomicInteger();
3171+
AtomicReference<KafkaMessageListenerContainer> containerRef = new AtomicReference<>();
3172+
containerProps.setMessageListener((MessageListener) r -> {
3173+
delivered.incrementAndGet();
3174+
containerRef.get().stop(() -> { });
3175+
});
3176+
KafkaMessageListenerContainer<Integer, String> container =
3177+
new KafkaMessageListenerContainer<>(cf, containerProps);
3178+
containerRef.set(container);
3179+
CountDownLatch latch = new CountDownLatch(1);
3180+
container.setApplicationEventPublisher(event -> {
3181+
if (event instanceof ConsumerStoppedEvent) {
3182+
latch.countDown();
3183+
}
3184+
});
3185+
container.start();
3186+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
3187+
container.stop();
3188+
assertThat(delivered.get()).isEqualTo(1);
3189+
verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(1L))), any());
3190+
}
3191+
31453192
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
31463193
Consumer<?, ?> consumer =
31473194
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class);

src/reference/asciidoc/kafka.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2336,6 +2336,10 @@ Metadata
23362336
|Stop the listener container if a `ProducerFencedException` is thrown.
23372337
See <<after-rollback>> for more information.
23382338

2339+
|stopImmediate
2340+
|`false`
2341+
|When the container is stopped, stop processing after the current record instead of after processing all the records from the previous poll.
2342+
23392343
|subBatchPerPartition
23402344
|See desc.
23412345
|When using a batch listener, if this is `true`, the listener is called with the results of the poll split into sub batches, one per partition.

0 commit comments

Comments
 (0)