Skip to content

Commit c537280

Browse files
garyrussellartembilan
authored andcommitted
GH-1672: Option to Immediately Stop the Container
Resolves #1672 Previously (and still, by default), stopping the listener container does not take effect until the records from the previous poll are all processed. Add an option to stop after the current record, instead. **cherry-pick to 2.6.x, 1.5.x**
1 parent e12e13a commit c537280

File tree

5 files changed

+85
-1
lines changed

5 files changed

+85
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,8 @@ public enum EOSMode {
255255

256256
private boolean stopContainerWhenFenced;
257257

258+
private boolean stopImmediate;
259+
258260
/**
259261
* Create properties for a container that will subscribe to the specified topics.
260262
* @param topics the topics.
@@ -744,6 +746,26 @@ public void setStopContainerWhenFenced(boolean stopContainerWhenFenced) {
744746
this.stopContainerWhenFenced = stopContainerWhenFenced;
745747
}
746748

749+
/**
750+
* When true, the container will be stopped immediately after processing the current record.
751+
* @return true to stop immediately.
752+
* @since 2.5.11
753+
*/
754+
public boolean isStopImmediate() {
755+
return this.stopImmediate;
756+
}
757+
758+
/**
759+
* Set to true to stop the container after processing the current record (when stop()
760+
* is called). When false (default), the container will stop after all the results of
761+
* the previous poll are processed.
762+
* @param stopImmediate true to stop after the current record.
763+
* @since 2.5.11
764+
*/
765+
public void setStopImmediate(boolean stopImmediate) {
766+
this.stopImmediate = stopImmediate;
767+
}
768+
747769
private void adviseListenerIfNeeded() {
748770
if (!CollectionUtils.isEmpty(this.adviceChain)) {
749771
if (AopUtils.isAopProxy(this.messageListener)) {

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
590590

591591
private final boolean fixTxOffsets = this.containerProperties.isFixTxOffsets();
592592

593+
private final boolean stopImmediate = this.containerProperties.isStopImmediate();
594+
593595
private Map<TopicPartition, OffsetMetadata> definedPartitions;
594596

595597
private int count;
@@ -1814,6 +1816,9 @@ private void invokeRecordListener(final ConsumerRecords<K, V> records) {
18141816
private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
18151817
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
18161818
while (iterator.hasNext()) {
1819+
if (this.stopImmediate && !isRunning()) {
1820+
break;
1821+
}
18171822
final ConsumerRecord<K, V> record = checkEarlyIntercept(iterator.next());
18181823
if (record == null) {
18191824
continue;
@@ -1903,6 +1908,9 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
19031908
private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
19041909
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
19051910
while (iterator.hasNext()) {
1911+
if (this.stopImmediate && !isRunning()) {
1912+
break;
1913+
}
19061914
final ConsumerRecord<K, V> record = checkEarlyIntercept(iterator.next());
19071915
if (record == null) {
19081916
continue;

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -3139,6 +3139,53 @@ void commitAfterHandleManual() throws InterruptedException {
31393139
verify(consumer).commitSync(any(), any());
31403140
}
31413141

3142+
@Test
3143+
@SuppressWarnings({ "unchecked", "rawtypes" })
3144+
void stopImmediately() throws InterruptedException {
3145+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
3146+
Consumer<Integer, String> consumer = mock(Consumer.class);
3147+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
3148+
Map<String, Object> cfProps = new HashMap<>();
3149+
cfProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 45000); // wins
3150+
given(cf.getConfigurationProperties()).willReturn(cfProps);
3151+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records =
3152+
Map.of(new TopicPartition("foo", 0), Arrays.asList(new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
3153+
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
3154+
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
3155+
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
3156+
AtomicBoolean first = new AtomicBoolean(true);
3157+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
3158+
Thread.sleep(50);
3159+
return first.getAndSet(false) ? consumerRecords : emptyRecords;
3160+
});
3161+
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
3162+
new TopicPartitionOffset("foo", 0) };
3163+
ContainerProperties containerProps = new ContainerProperties(topicPartition);
3164+
containerProps.setGroupId("grp");
3165+
containerProps.setClientId("clientId");
3166+
containerProps.setStopImmediate(true);
3167+
AtomicInteger delivered = new AtomicInteger();
3168+
AtomicReference<KafkaMessageListenerContainer> containerRef = new AtomicReference<>();
3169+
containerProps.setMessageListener((MessageListener) r -> {
3170+
delivered.incrementAndGet();
3171+
containerRef.get().stop(() -> { });
3172+
});
3173+
KafkaMessageListenerContainer<Integer, String> container =
3174+
new KafkaMessageListenerContainer<>(cf, containerProps);
3175+
containerRef.set(container);
3176+
CountDownLatch latch = new CountDownLatch(1);
3177+
container.setApplicationEventPublisher(event -> {
3178+
if (event instanceof ConsumerStoppedEvent) {
3179+
latch.countDown();
3180+
}
3181+
});
3182+
container.start();
3183+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
3184+
container.stop();
3185+
assertThat(delivered.get()).isEqualTo(1);
3186+
verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(1L))), any());
3187+
}
3188+
31423189
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
31433190
Consumer<?, ?> consumer =
31443191
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class);

src/reference/asciidoc/kafka.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2433,6 +2433,10 @@ Metadata
24332433
|Stop the listener container if a `ProducerFencedException` is thrown.
24342434
See <<after-rollback>> for more information.
24352435

2436+
|stopImmediate
2437+
|`false`
2438+
|When the container is stopped, stop processing after the current record instead of after processing all the records from the previous poll.
2439+
24362440
|subBatchPerPartition
24372441
|See desc.
24382442
|When using a batch listener, if this is `true`, the listener is called with the results of the poll split into sub batches, one per partition.

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,7 @@ This version requires the 2.7.0 `kafka-clients`.
1212
==== Listener Container Changes
1313

1414
The `onlyLogRecordMetadata` container property is now `true` by default.
15+
16+
A new container property `stopImmediate` is now available.
17+
1518
See <<container-props>> for more information.

0 commit comments

Comments
 (0)