Skip to content

Commit 4c9ecb1

Browse files
garyrussellartembilan
authored andcommitted
GH-1672: Option to Immediately Stop the Container
Resolves #1672 Previously (and still, by default), stopping the listener container does not take effect until the records from the previous poll are all processed. Add an option to stop after the current record, instead. **cherry-pick to 2.6.x, 1.5.x** # Conflicts: # src/reference/asciidoc/whats-new.adoc
1 parent ff802bd commit 4c9ecb1

File tree

5 files changed

+85
-1
lines changed

5 files changed

+85
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,8 @@ public enum EOSMode {
257257

258258
private boolean stopContainerWhenFenced;
259259

260+
private boolean stopImmediate;
261+
260262
/**
261263
* Create properties for a container that will subscribe to the specified topics.
262264
* @param topics the topics.
@@ -783,6 +785,26 @@ public void setStopContainerWhenFenced(boolean stopContainerWhenFenced) {
783785
this.stopContainerWhenFenced = stopContainerWhenFenced;
784786
}
785787

788+
/**
789+
* When true, the container will be stopped immediately after processing the current record.
790+
* @return true to stop immediately.
791+
* @since 2.5.11
792+
*/
793+
public boolean isStopImmediate() {
794+
return this.stopImmediate;
795+
}
796+
797+
/**
798+
* Set to true to stop the container after processing the current record (when stop()
799+
* is called). When false (default), the container will stop after all the results of
800+
* the previous poll are processed.
801+
* @param stopImmediate true to stop after the current record.
802+
* @since 2.5.11
803+
*/
804+
public void setStopImmediate(boolean stopImmediate) {
805+
this.stopImmediate = stopImmediate;
806+
}
807+
786808
private void adviseListenerIfNeeded() {
787809
if (!CollectionUtils.isEmpty(this.adviceChain)) {
788810
if (AopUtils.isAopProxy(this.messageListener)) {

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
590590

591591
private final boolean fixTxOffsets = this.containerProperties.isFixTxOffsets();
592592

593+
private final boolean stopImmediate = this.containerProperties.isStopImmediate();
594+
593595
private Map<TopicPartition, OffsetMetadata> definedPartitions;
594596

595597
private int count;
@@ -1819,6 +1821,9 @@ private void invokeRecordListener(final ConsumerRecords<K, V> records) {
18191821
private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
18201822
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
18211823
while (iterator.hasNext()) {
1824+
if (this.stopImmediate && !isRunning()) {
1825+
break;
1826+
}
18221827
final ConsumerRecord<K, V> record = checkEarlyIntercept(iterator.next());
18231828
if (record == null) {
18241829
continue;
@@ -1908,6 +1913,9 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
19081913
private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
19091914
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
19101915
while (iterator.hasNext()) {
1916+
if (this.stopImmediate && !isRunning()) {
1917+
break;
1918+
}
19111919
final ConsumerRecord<K, V> record = checkEarlyIntercept(iterator.next());
19121920
if (record == null) {
19131921
continue;

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -3142,6 +3142,53 @@ void commitAfterHandleManual() throws InterruptedException {
31423142
verify(consumer).commitSync(any(), any());
31433143
}
31443144

3145+
@Test
3146+
@SuppressWarnings({ "unchecked", "rawtypes" })
3147+
void stopImmediately() throws InterruptedException {
3148+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
3149+
Consumer<Integer, String> consumer = mock(Consumer.class);
3150+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
3151+
Map<String, Object> cfProps = new HashMap<>();
3152+
cfProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 45000); // wins
3153+
given(cf.getConfigurationProperties()).willReturn(cfProps);
3154+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records =
3155+
Map.of(new TopicPartition("foo", 0), Arrays.asList(new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
3156+
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
3157+
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
3158+
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
3159+
AtomicBoolean first = new AtomicBoolean(true);
3160+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
3161+
Thread.sleep(50);
3162+
return first.getAndSet(false) ? consumerRecords : emptyRecords;
3163+
});
3164+
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
3165+
new TopicPartitionOffset("foo", 0) };
3166+
ContainerProperties containerProps = new ContainerProperties(topicPartition);
3167+
containerProps.setGroupId("grp");
3168+
containerProps.setClientId("clientId");
3169+
containerProps.setStopImmediate(true);
3170+
AtomicInteger delivered = new AtomicInteger();
3171+
AtomicReference<KafkaMessageListenerContainer> containerRef = new AtomicReference<>();
3172+
containerProps.setMessageListener((MessageListener) r -> {
3173+
delivered.incrementAndGet();
3174+
containerRef.get().stop(() -> { });
3175+
});
3176+
KafkaMessageListenerContainer<Integer, String> container =
3177+
new KafkaMessageListenerContainer<>(cf, containerProps);
3178+
containerRef.set(container);
3179+
CountDownLatch latch = new CountDownLatch(1);
3180+
container.setApplicationEventPublisher(event -> {
3181+
if (event instanceof ConsumerStoppedEvent) {
3182+
latch.countDown();
3183+
}
3184+
});
3185+
container.start();
3186+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
3187+
container.stop();
3188+
assertThat(delivered.get()).isEqualTo(1);
3189+
verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(1L))), any());
3190+
}
3191+
31453192
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
31463193
Consumer<?, ?> consumer =
31473194
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class);

src/reference/asciidoc/kafka.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2433,6 +2433,10 @@ Metadata
24332433
|Stop the listener container if a `ProducerFencedException` is thrown.
24342434
See <<after-rollback>> for more information.
24352435

2436+
|stopImmediate
2437+
|`false`
2438+
|When the container is stopped, stop processing after the current record instead of after processing all the records from the previous poll.
2439+
24362440
|subBatchPerPartition
24372441
|See desc.
24382442
|When using a batch listener, if this is `true`, the listener is called with the results of the poll split into sub batches, one per partition.

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ In addition, you can now select the `BackOff` to use based on the failed record
1818
See <<seek-to-current>>, <<recovering-batch-eh>>, <<dead-letters>> and <<after-rollback>> for more information.
1919

2020
You can now configure an `adviceChain` in the container properties.
21+
22+
A new container property `stopImmediate` is now available.
23+
2124
See <<container-props>> for more information.
2225

2326
When the container is configured to publish `ListenerContainerIdleEvent` s, it now publishes a `ListenerContainerNoLongerIdleEvent` when a record is received after publishing an idle event.

0 commit comments

Comments
 (0)