Skip to content

Commit c75e60c

Browse files
garyrussellartembilan
authored andcommitted
GH-1915: Batch Manual AckMode Improvements
Resolves #1915 - perform acks within the scope of the ack call so users can catch exceptions with sync commits - when using MANUAL_IMMEDIATE, commit offsets for the batch instead of one-at-a-time
1 parent 435db6a commit c75e60c

File tree

3 files changed

+107
-5
lines changed

3 files changed

+107
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1755,6 +1755,38 @@ private void processAck(ConsumerRecord<K, V> record) {
17551755
}
17561756
}
17571757

1758+
private void processAcks(ConsumerRecords<K, V> records) {
1759+
if (!Thread.currentThread().equals(this.consumerThread)) {
1760+
try {
1761+
for (ConsumerRecord<K, V> record : records) {
1762+
this.acks.put(record);
1763+
}
1764+
if (this.isManualImmediateAck) {
1765+
this.consumer.wakeup();
1766+
}
1767+
}
1768+
catch (InterruptedException e) {
1769+
Thread.currentThread().interrupt();
1770+
throw new KafkaException("Interrupted while storing ack", e);
1771+
}
1772+
}
1773+
else {
1774+
if (this.isManualImmediateAck) {
1775+
try {
1776+
ackImmediate(records);
1777+
}
1778+
catch (@SuppressWarnings(UNUSED) WakeupException e) {
1779+
// ignore - not polling
1780+
}
1781+
}
1782+
else {
1783+
for (ConsumerRecord<K, V> record : records) {
1784+
addOffset(record);
1785+
}
1786+
}
1787+
}
1788+
}
1789+
17581790
private synchronized void ackInOrder(ConsumerRecord<K, V> record) {
17591791
TopicPartition part = new TopicPartition(record.topic(), record.partition());
17601792
List<Long> offs = this.offsetsInThisBatch.get(part);
@@ -1806,6 +1838,25 @@ else if (this.syncCommits) {
18061838
}
18071839
}
18081840

1841+
private void ackImmediate(ConsumerRecords<K, V> records) {
1842+
Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
1843+
for (TopicPartition part : records.partitions()) {
1844+
commits.put(part,
1845+
new OffsetAndMetadata(records.records(part)
1846+
.get(records.records(part).size() - 1).offset() + 1));
1847+
}
1848+
this.commitLogger.log(() -> "Committing: " + commits);
1849+
if (this.producer != null) {
1850+
doSendOffsets(this.producer, commits);
1851+
}
1852+
else if (this.syncCommits) {
1853+
commitSync(commits);
1854+
}
1855+
else {
1856+
commitAsync(commits, 0);
1857+
}
1858+
}
1859+
18091860
private void commitAsync(Map<TopicPartition, OffsetAndMetadata> commits, int retries) {
18101861
this.consumer.commitAsync(commits, (offsetsAttempted, exception) -> {
18111862
if (exception instanceof RetriableCommitFailedException
@@ -2060,9 +2111,6 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA
20602111
if (index++ >= this.nackIndex) {
20612112
toSeek.add(record);
20622113
}
2063-
else {
2064-
this.acks.put(record);
2065-
}
20662114
}
20672115
}
20682116
if (this.producer != null || (!this.isAnyManualAck && !this.autoCommit)) {
@@ -3026,12 +3074,12 @@ public void acknowledge() {
30263074
Map<TopicPartition, List<ConsumerRecord<K, V>>> deferred = ListenerConsumer.this.deferredOffsets;
30273075
if (!this.acked) {
30283076
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(this.records)) {
3029-
processAck(record);
30303077
if (offs != null) {
30313078
offs.remove(new TopicPartition(record.topic(), record.partition()));
30323079
deferred.remove(new TopicPartition(record.topic(), record.partition()));
30333080
}
30343081
}
3082+
processAcks(this.records);
30353083
this.acked = true;
30363084
}
30373085
}
@@ -3050,6 +3098,22 @@ public void nack(int index, long sleep) {
30503098
ListenerConsumer.this.deferredOffsets.forEach((part, recs) -> recs.clear());
30513099
}
30523100
}
3101+
int i = 0;
3102+
List<ConsumerRecord<K, V>> toAck = new LinkedList<>();
3103+
for (ConsumerRecord<K, V> record : this.records) {
3104+
if (i++ < index) {
3105+
toAck.add(record);
3106+
}
3107+
else {
3108+
break;
3109+
}
3110+
}
3111+
Map<TopicPartition, List<ConsumerRecord<K, V>>> newRecords = new HashMap<>();
3112+
for (ConsumerRecord<K, V> record : toAck) {
3113+
newRecords.computeIfAbsent(new TopicPartition(record.topic(), record.partition()),
3114+
tp -> new LinkedList<>()).add(record);
3115+
}
3116+
processAcks(new ConsumerRecords<K, V>(newRecords));
30533117
}
30543118

30553119
@Override
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.springframework.kafka.listener.ContainerProperties.AckMode;
20+
21+
/**
22+
* @author Gary Russell
23+
* @since 2.8
24+
*
25+
*/
26+
public class ManualImmediateNackBatchTests extends ManualNackBatchTests {
27+
28+
static {
29+
ackMode = AckMode.MANUAL_IMMEDIATE;
30+
}
31+
32+
}

spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackBatchTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,12 @@ public class ManualNackBatchTests {
7373

7474
private static final String CONTAINER_ID = "container";
7575

76+
protected static AckMode ackMode;
77+
78+
static {
79+
ackMode = AckMode.MANUAL;
80+
}
81+
7682
@SuppressWarnings("rawtypes")
7783
@Autowired
7884
private Consumer consumer;
@@ -216,7 +222,7 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
216222
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
217223
factory.setConsumerFactory(consumerFactory());
218224
factory.setBatchListener(true);
219-
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
225+
factory.getContainerProperties().setAckMode(ackMode);
220226
return factory;
221227
}
222228

0 commit comments

Comments
 (0)