Skip to content

Commit 5807606

Browse files
garyrussellartembilan
authored andcommitted
GH-1522: Commit recovered record with MANUAL Acks
Resolves #1522 If the `errorHandler.isAckAfterHandle()` is true and the handler exits normally, add the offset to the pending `acks`, even when the `AckMode` is `MANUAL`.
1 parent 261e73e commit 5807606

File tree

3 files changed

+56
-2
lines changed

3 files changed

+56
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
578578

579579
private boolean producerPerConsumerPartition;
580580

581+
private boolean commitRecovered;
582+
581583
private volatile boolean consumerPaused;
582584

583585
private volatile Thread consumerThread;
@@ -1794,7 +1796,13 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> recor
17941796
invokeErrorHandler(record, iterator, e);
17951797
if ((!acked && !this.autoCommit && this.errorHandler.isAckAfterHandle())
17961798
|| this.producer != null) {
1799+
if (this.isManualAck) {
1800+
this.commitRecovered = true;
1801+
}
17971802
ackCurrent(record);
1803+
if (this.isManualAck) {
1804+
this.commitRecovered = false;
1805+
}
17981806
}
17991807
}
18001808
catch (KafkaException ke) {
@@ -1932,7 +1940,8 @@ public void ackCurrent(final ConsumerRecord<K, V> record) {
19321940
this.acks.add(record);
19331941
}
19341942
}
1935-
else if (this.producer != null || (!this.isAnyManualAck && !this.autoCommit)) {
1943+
else if (this.producer != null
1944+
|| ((!this.isAnyManualAck || this.commitRecovered) && !this.autoCommit)) {
19361945
this.acks.add(record);
19371946
}
19381947
if (this.producer != null) {

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ public static void seekOrRecover(Exception thrownException, List<ConsumerRecord<
174174
}
175175
}
176176
else {
177-
logger.warn(() -> "'commitRecovered' ignored, container AckMode must be MANUAL_IMMEDIATE, not "
177+
logger.debug(() -> "'commitRecovered' ignored, container AckMode must be MANUAL_IMMEDIATE, not "
178178
+ container.getContainerProperties().getAckMode());
179179
}
180180
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3081,6 +3081,51 @@ private void testCommitRetriesGuts(boolean sync) throws Exception {
30813081
}
30823082
}
30833083

3084+
@Test
3085+
@SuppressWarnings({ "unchecked", "rawtypes" })
3086+
void commitAfterHandleManual() throws InterruptedException {
3087+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
3088+
Consumer<Integer, String> consumer = mock(Consumer.class);
3089+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
3090+
Map<String, Object> cfProps = new HashMap<>();
3091+
cfProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 45000); // wins
3092+
given(cf.getConfigurationProperties()).willReturn(cfProps);
3093+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
3094+
records.put(new TopicPartition("foo", 0), Arrays.asList(
3095+
new ConsumerRecord<>("foo", 0, 0L, 1, "foo")));
3096+
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
3097+
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
3098+
AtomicBoolean first = new AtomicBoolean(true);
3099+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
3100+
Thread.sleep(50);
3101+
return first.getAndSet(false) ? consumerRecords : emptyRecords;
3102+
});
3103+
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
3104+
new TopicPartitionOffset("foo", 0) };
3105+
ContainerProperties containerProps = new ContainerProperties(topicPartition);
3106+
containerProps.setAckMode(AckMode.MANUAL);
3107+
containerProps.setGroupId("grp");
3108+
containerProps.setClientId("clientId");
3109+
containerProps.setIdleEventInterval(100L);
3110+
containerProps.setMessageListener((MessageListener) r -> {
3111+
throw new RuntimeException("test");
3112+
});
3113+
KafkaMessageListenerContainer<Integer, String> container =
3114+
new KafkaMessageListenerContainer<>(cf, containerProps);
3115+
AtomicBoolean recovered = new AtomicBoolean();
3116+
CountDownLatch latch = new CountDownLatch(1);
3117+
container.setErrorHandler(new SeekToCurrentErrorHandler((rec, ex) -> {
3118+
recovered.set(true);
3119+
latch.countDown();
3120+
},
3121+
new FixedBackOff(0, 0)));
3122+
container.start();
3123+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
3124+
container.stop();
3125+
assertThat(recovered.get()).isTrue();
3126+
verify(consumer).commitSync(any(), any());
3127+
}
3128+
30843129
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
30853130
Consumer<?, ?> consumer =
30863131
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class);

0 commit comments

Comments
 (0)