Skip to content

Commit 4da47f5

Browse files
committed
GH-1496: Handle RetriableCommitFailedException
Resolves #1496 Certain commit exceptions are retriable. **I will back-port as necessary** * Fix import
1 parent 27f2f65 commit 4da47f5

File tree

3 files changed

+143
-10
lines changed

3 files changed

+143
-10
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public class ConsumerProperties {
4444
*/
4545
public static final long DEFAULT_POLL_TIMEOUT = 5_000L;
4646

47+
private static final int DEFAULT_COMMIT_RETRIES = 3;
48+
4749
/**
4850
* Topic names.
4951
*/
@@ -99,6 +101,8 @@ public class ConsumerProperties {
99101

100102
private Duration authorizationExceptionRetryInterval;
101103

104+
private int commitRetries = DEFAULT_COMMIT_RETRIES;
105+
102106
/**
103107
* Create properties for a container that will subscribe to the specified topics.
104108
* @param topics the topics.
@@ -324,6 +328,28 @@ public void setAuthorizationExceptionRetryInterval(Duration authorizationExcepti
324328
this.authorizationExceptionRetryInterval = authorizationExceptionRetryInterval;
325329
}
326330

331+
/**
332+
* The number of retries allowed when a
333+
* {@link org.apache.kafka.clients.consumer.RetriableCommitFailedException} is thrown
334+
* by the consumer.
335+
* @return the number of retries.
336+
* @since 2.3.9
337+
*/
338+
public int getCommitRetries() {
339+
return this.commitRetries;
340+
}
341+
342+
/**
343+
* Set number of retries allowed when a
344+
* {@link org.apache.kafka.clients.consumer.RetriableCommitFailedException} is thrown
345+
* by the consumer. Default 3 (4 attempts total).
346+
* @param commitRetries the commitRetries.
347+
* @since 2.3.9
348+
*/
349+
public void setCommitRetries(int commitRetries) {
350+
this.commitRetries = commitRetries;
351+
}
352+
327353
@Override
328354
public String toString() {
329355
return "ConsumerProperties ["

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
5050
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
5151
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
52+
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
5253
import org.apache.kafka.clients.producer.Producer;
5354
import org.apache.kafka.common.Metric;
5455
import org.apache.kafka.common.MetricName;
@@ -1201,6 +1202,10 @@ private void wrapUp() {
12011202
* @param e the exception.
12021203
*/
12031204
protected void handleConsumerException(Exception e) {
1205+
if (e instanceof RetriableCommitFailedException) {
1206+
this.logger.error(e, "Commit retries exhausted");
1207+
return;
1208+
}
12041209
try {
12051210
if (!this.isBatchListener && this.errorHandler != null) {
12061211
this.errorHandler.handle(e, Collections.emptyList(), this.consumer,
@@ -1280,13 +1285,25 @@ private void ackImmediate(ConsumerRecord<K, V> record) {
12801285
this.producer.sendOffsetsToTransaction(commits, this.consumerGroupId);
12811286
}
12821287
else if (this.syncCommits) {
1283-
this.consumer.commitSync(commits, this.syncCommitTimeout);
1288+
commitSync(commits);
12841289
}
12851290
else {
1286-
this.consumer.commitAsync(commits, this.commitCallback);
1291+
commitAsync(commits, 0);
12871292
}
12881293
}
12891294

1295+
private void commitAsync(Map<TopicPartition, OffsetAndMetadata> commits, int retries) {
1296+
this.consumer.commitAsync(commits, (offsetsAttempted, exception) -> {
1297+
if (exception instanceof RetriableCommitFailedException
1298+
&& retries < this.containerProperties.getCommitRetries()) {
1299+
commitAsync(commits, retries + 1);
1300+
}
1301+
else {
1302+
this.commitCallback.onComplete(offsetsAttempted, exception);
1303+
}
1304+
});
1305+
}
1306+
12901307
private void invokeListener(final ConsumerRecords<K, V> records) {
12911308
if (this.isBatchListener) {
12921309
invokeBatchListener(records);
@@ -1848,10 +1865,10 @@ public void ackCurrent(final ConsumerRecord<K, V> record,
18481865
if (producer == null) {
18491866
this.commitLogger.log(() -> "Committing: " + offsetsToCommit);
18501867
if (this.syncCommits) {
1851-
this.consumer.commitSync(offsetsToCommit, this.syncCommitTimeout);
1868+
commitSync(offsetsToCommit);
18521869
}
18531870
else {
1854-
this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
1871+
commitAsync(offsetsToCommit, 0);
18551872
}
18561873
}
18571874
else {
@@ -2071,10 +2088,10 @@ private void commitIfNecessary() {
20712088
this.commitLogger.log(() -> "Committing: " + commits);
20722089
try {
20732090
if (this.syncCommits) {
2074-
this.consumer.commitSync(commits, this.syncCommitTimeout);
2091+
commitSync(commits);
20752092
}
20762093
else {
2077-
this.consumer.commitAsync(commits, this.commitCallback);
2094+
commitAsync(commits, 0);
20782095
}
20792096
}
20802097
catch (@SuppressWarnings(UNUSED) WakeupException e) {
@@ -2084,6 +2101,22 @@ private void commitIfNecessary() {
20842101
}
20852102
}
20862103

2104+
private void commitSync(Map<TopicPartition, OffsetAndMetadata> commits) {
2105+
doCommitSync(commits, 0);
2106+
}
2107+
2108+
private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> commits, int retries) {
2109+
try {
2110+
this.consumer.commitSync(commits, this.syncCommitTimeout);
2111+
}
2112+
catch (RetriableCommitFailedException e) {
2113+
if (retries >= this.containerProperties.getCommitRetries()) {
2114+
throw e;
2115+
}
2116+
doCommitSync(commits, retries + 1);
2117+
}
2118+
}
2119+
20872120
private Map<TopicPartition, OffsetAndMetadata> buildCommits() {
20882121
Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
20892122
for (Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
@@ -2348,6 +2381,7 @@ private boolean collectAndCommitIfNecessary(Collection<TopicPartition> partition
23482381
return true;
23492382
}
23502383

2384+
@SuppressWarnings("unused")
23512385
private void commitCurrentOffsets(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
23522386
ListenerConsumer.this.commitLogger.log(() -> "Committing on assignment: " + offsetsToCommit);
23532387
if (ListenerConsumer.this.transactionTemplate != null
@@ -2387,12 +2421,16 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
23872421
else {
23882422
ContainerProperties containerProps = KafkaMessageListenerContainer.this.getContainerProperties();
23892423
if (containerProps.isSyncCommits()) {
2390-
ListenerConsumer.this.consumer.commitSync(offsetsToCommit,
2391-
containerProps.getSyncCommitTimeout());
2424+
try {
2425+
ListenerConsumer.this.consumer.commitSync(offsetsToCommit,
2426+
containerProps.getSyncCommitTimeout());
2427+
}
2428+
catch (RetriableCommitFailedException e) {
2429+
// ignore since this is on assignment anyway
2430+
}
23922431
}
23932432
else {
2394-
ListenerConsumer.this.consumer.commitAsync(offsetsToCommit,
2395-
containerProps.getCommitCallback());
2433+
commitAsync(offsetsToCommit, 0);
23962434
}
23972435
}
23982436
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@
6666
import org.apache.kafka.clients.consumer.ConsumerRecords;
6767
import org.apache.kafka.clients.consumer.KafkaConsumer;
6868
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
69+
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
70+
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
6971
import org.apache.kafka.clients.producer.ProducerConfig;
7072
import org.apache.kafka.common.TopicPartition;
7173
import org.apache.kafka.common.errors.AuthorizationException;
@@ -2855,6 +2857,73 @@ public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer,
28552857
container.stop();
28562858
}
28572859

2860+
@Test
2861+
void testCommitSyncRetries() throws Exception {
2862+
testCommitRetriesGuts(true);
2863+
}
2864+
2865+
@Test
2866+
void testCommitAsyncRetries() throws Exception {
2867+
testCommitRetriesGuts(false);
2868+
}
2869+
2870+
@SuppressWarnings({ "unchecked", "rawtypes" })
2871+
private void testCommitRetriesGuts(boolean sync) throws Exception {
2872+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
2873+
Consumer<Integer, String> consumer = mock(Consumer.class);
2874+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
2875+
Map<String, Object> cfProps = new HashMap<>();
2876+
cfProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 45000); // wins
2877+
given(cf.getConfigurationProperties()).willReturn(cfProps);
2878+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
2879+
records.put(new TopicPartition("foo", 0), Arrays.asList(
2880+
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
2881+
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
2882+
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
2883+
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
2884+
AtomicBoolean first = new AtomicBoolean(true);
2885+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
2886+
Thread.sleep(50);
2887+
return first.getAndSet(false) ? consumerRecords : emptyRecords;
2888+
});
2889+
CountDownLatch latch = new CountDownLatch(4);
2890+
if (sync) {
2891+
willAnswer(i -> {
2892+
latch.countDown();
2893+
throw new RetriableCommitFailedException("");
2894+
}).given(consumer).commitSync(anyMap(), eq(Duration.ofSeconds(45)));
2895+
}
2896+
else {
2897+
willAnswer(i -> {
2898+
OffsetCommitCallback callback = i.getArgument(1);
2899+
callback.onComplete(i.getArgument(0), new RetriableCommitFailedException(""));
2900+
latch.countDown();
2901+
return null;
2902+
}).given(consumer).commitAsync(anyMap(), any());
2903+
}
2904+
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
2905+
new TopicPartitionOffset("foo", 0) };
2906+
ContainerProperties containerProps = new ContainerProperties(topicPartition);
2907+
containerProps.setSyncCommits(sync);
2908+
containerProps.setGroupId("grp");
2909+
containerProps.setClientId("clientId");
2910+
containerProps.setIdleEventInterval(100L);
2911+
containerProps.setMessageListener((MessageListener) r -> {
2912+
});
2913+
containerProps.setMissingTopicsFatal(false);
2914+
KafkaMessageListenerContainer<Integer, String> container =
2915+
new KafkaMessageListenerContainer<>(cf, containerProps);
2916+
container.start();
2917+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
2918+
container.stop();
2919+
if (sync) {
2920+
verify(consumer, times(4)).commitSync(any(), any());
2921+
}
2922+
else {
2923+
verify(consumer, times(4)).commitAsync(any(), any());
2924+
}
2925+
}
2926+
28582927
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
28592928
Consumer<?, ?> consumer =
28602929
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class);

0 commit comments

Comments
 (0)