Skip to content

Commit 3fe295e

Browse files
committed
GH-1496: Handle RetriableCommitFailedException
Resolves #1496 Certain commit exceptions are retriable. **I will back-port as necessary** * Fix import
1 parent 5aff841 commit 3fe295e

File tree

3 files changed

+143
-10
lines changed

3 files changed

+143
-10
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public class ConsumerProperties {
4444
*/
4545
public static final long DEFAULT_POLL_TIMEOUT = 5_000L;
4646

47+
private static final int DEFAULT_COMMIT_RETRIES = 3;
48+
4749
/**
4850
* Topic names.
4951
*/
@@ -99,6 +101,8 @@ public class ConsumerProperties {
99101

100102
private Duration authorizationExceptionRetryInterval;
101103

104+
private int commitRetries = DEFAULT_COMMIT_RETRIES;
105+
102106
/**
103107
* Create properties for a container that will subscribe to the specified topics.
104108
* @param topics the topics.
@@ -324,6 +328,28 @@ public void setAuthorizationExceptionRetryInterval(Duration authorizationExcepti
324328
this.authorizationExceptionRetryInterval = authorizationExceptionRetryInterval;
325329
}
326330

331+
/**
332+
* The number of retries allowed when a
333+
* {@link org.apache.kafka.clients.consumer.RetriableCommitFailedException} is thrown
334+
* by the consumer.
335+
* @return the number of retries.
336+
* @since 2.3.9
337+
*/
338+
public int getCommitRetries() {
339+
return this.commitRetries;
340+
}
341+
342+
/**
343+
* Set number of retries allowed when a
344+
* {@link org.apache.kafka.clients.consumer.RetriableCommitFailedException} is thrown
345+
* by the consumer. Default 3 (4 attempts total).
346+
* @param commitRetries the commitRetries.
347+
* @since 2.3.9
348+
*/
349+
public void setCommitRetries(int commitRetries) {
350+
this.commitRetries = commitRetries;
351+
}
352+
327353
@Override
328354
public String toString() {
329355
return "ConsumerProperties ["

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
5252
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
5353
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
54+
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
5455
import org.apache.kafka.clients.producer.Producer;
5556
import org.apache.kafka.common.Metric;
5657
import org.apache.kafka.common.MetricName;
@@ -1204,6 +1205,10 @@ private void wrapUp() {
12041205
* @param e the exception.
12051206
*/
12061207
protected void handleConsumerException(Exception e) {
1208+
if (e instanceof RetriableCommitFailedException) {
1209+
this.logger.error(e, "Commit retries exhausted");
1210+
return;
1211+
}
12071212
try {
12081213
if (!this.isBatchListener && this.errorHandler != null) {
12091214
this.errorHandler.handle(e, Collections.emptyList(), this.consumer,
@@ -1283,13 +1288,25 @@ private void ackImmediate(ConsumerRecord<K, V> record) {
12831288
this.producer.sendOffsetsToTransaction(commits, this.consumerGroupId);
12841289
}
12851290
else if (this.syncCommits) {
1286-
this.consumer.commitSync(commits, this.syncCommitTimeout);
1291+
commitSync(commits);
12871292
}
12881293
else {
1289-
this.consumer.commitAsync(commits, this.commitCallback);
1294+
commitAsync(commits, 0);
12901295
}
12911296
}
12921297

1298+
private void commitAsync(Map<TopicPartition, OffsetAndMetadata> commits, int retries) {
1299+
this.consumer.commitAsync(commits, (offsetsAttempted, exception) -> {
1300+
if (exception instanceof RetriableCommitFailedException
1301+
&& retries < this.containerProperties.getCommitRetries()) {
1302+
commitAsync(commits, retries + 1);
1303+
}
1304+
else {
1305+
this.commitCallback.onComplete(offsetsAttempted, exception);
1306+
}
1307+
});
1308+
}
1309+
12931310
private void invokeListener(final ConsumerRecords<K, V> records) {
12941311
if (this.isBatchListener) {
12951312
invokeBatchListener(records);
@@ -1851,10 +1868,10 @@ public void ackCurrent(final ConsumerRecord<K, V> record,
18511868
if (producer == null) {
18521869
this.commitLogger.log(() -> "Committing: " + offsetsToCommit);
18531870
if (this.syncCommits) {
1854-
this.consumer.commitSync(offsetsToCommit, this.syncCommitTimeout);
1871+
commitSync(offsetsToCommit);
18551872
}
18561873
else {
1857-
this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
1874+
commitAsync(offsetsToCommit, 0);
18581875
}
18591876
}
18601877
else {
@@ -2074,10 +2091,10 @@ private void commitIfNecessary() {
20742091
this.commitLogger.log(() -> "Committing: " + commits);
20752092
try {
20762093
if (this.syncCommits) {
2077-
this.consumer.commitSync(commits, this.syncCommitTimeout);
2094+
commitSync(commits);
20782095
}
20792096
else {
2080-
this.consumer.commitAsync(commits, this.commitCallback);
2097+
commitAsync(commits, 0);
20812098
}
20822099
}
20832100
catch (@SuppressWarnings(UNUSED) WakeupException e) {
@@ -2087,6 +2104,22 @@ private void commitIfNecessary() {
20872104
}
20882105
}
20892106

2107+
private void commitSync(Map<TopicPartition, OffsetAndMetadata> commits) {
2108+
doCommitSync(commits, 0);
2109+
}
2110+
2111+
private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> commits, int retries) {
2112+
try {
2113+
this.consumer.commitSync(commits, this.syncCommitTimeout);
2114+
}
2115+
catch (RetriableCommitFailedException e) {
2116+
if (retries >= this.containerProperties.getCommitRetries()) {
2117+
throw e;
2118+
}
2119+
doCommitSync(commits, retries + 1);
2120+
}
2121+
}
2122+
20902123
private Map<TopicPartition, OffsetAndMetadata> buildCommits() {
20912124
Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
20922125
for (Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
@@ -2353,6 +2386,7 @@ private boolean collectAndCommitIfNecessary(Collection<TopicPartition> partition
23532386
return true;
23542387
}
23552388

2389+
@SuppressWarnings("unused")
23562390
private void commitCurrentOffsets(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
23572391
ListenerConsumer.this.commitLogger.log(() -> "Committing on assignment: " + offsetsToCommit);
23582392
if (ListenerConsumer.this.transactionTemplate != null
@@ -2392,12 +2426,16 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
23922426
else {
23932427
ContainerProperties containerProps = KafkaMessageListenerContainer.this.getContainerProperties();
23942428
if (containerProps.isSyncCommits()) {
2395-
ListenerConsumer.this.consumer.commitSync(offsetsToCommit,
2396-
containerProps.getSyncCommitTimeout());
2429+
try {
2430+
ListenerConsumer.this.consumer.commitSync(offsetsToCommit,
2431+
containerProps.getSyncCommitTimeout());
2432+
}
2433+
catch (RetriableCommitFailedException e) {
2434+
// ignore since this is on assignment anyway
2435+
}
23972436
}
23982437
else {
2399-
ListenerConsumer.this.consumer.commitAsync(offsetsToCommit,
2400-
containerProps.getCommitCallback());
2438+
commitAsync(offsetsToCommit, 0);
24012439
}
24022440
}
24032441
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@
6666
import org.apache.kafka.clients.consumer.ConsumerRecords;
6767
import org.apache.kafka.clients.consumer.KafkaConsumer;
6868
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
69+
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
70+
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
6971
import org.apache.kafka.clients.producer.ProducerConfig;
7072
import org.apache.kafka.common.TopicPartition;
7173
import org.apache.kafka.common.errors.AuthorizationException;
@@ -2937,6 +2939,73 @@ public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer,
29372939
container.stop();
29382940
}
29392941

2942+
@Test
2943+
void testCommitSyncRetries() throws Exception {
2944+
testCommitRetriesGuts(true);
2945+
}
2946+
2947+
@Test
2948+
void testCommitAsyncRetries() throws Exception {
2949+
testCommitRetriesGuts(false);
2950+
}
2951+
2952+
@SuppressWarnings({ "unchecked", "rawtypes" })
2953+
private void testCommitRetriesGuts(boolean sync) throws Exception {
2954+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
2955+
Consumer<Integer, String> consumer = mock(Consumer.class);
2956+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
2957+
Map<String, Object> cfProps = new HashMap<>();
2958+
cfProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 45000); // wins
2959+
given(cf.getConfigurationProperties()).willReturn(cfProps);
2960+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
2961+
records.put(new TopicPartition("foo", 0), Arrays.asList(
2962+
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
2963+
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
2964+
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
2965+
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
2966+
AtomicBoolean first = new AtomicBoolean(true);
2967+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
2968+
Thread.sleep(50);
2969+
return first.getAndSet(false) ? consumerRecords : emptyRecords;
2970+
});
2971+
CountDownLatch latch = new CountDownLatch(4);
2972+
if (sync) {
2973+
willAnswer(i -> {
2974+
latch.countDown();
2975+
throw new RetriableCommitFailedException("");
2976+
}).given(consumer).commitSync(anyMap(), eq(Duration.ofSeconds(45)));
2977+
}
2978+
else {
2979+
willAnswer(i -> {
2980+
OffsetCommitCallback callback = i.getArgument(1);
2981+
callback.onComplete(i.getArgument(0), new RetriableCommitFailedException(""));
2982+
latch.countDown();
2983+
return null;
2984+
}).given(consumer).commitAsync(anyMap(), any());
2985+
}
2986+
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
2987+
new TopicPartitionOffset("foo", 0) };
2988+
ContainerProperties containerProps = new ContainerProperties(topicPartition);
2989+
containerProps.setSyncCommits(sync);
2990+
containerProps.setGroupId("grp");
2991+
containerProps.setClientId("clientId");
2992+
containerProps.setIdleEventInterval(100L);
2993+
containerProps.setMessageListener((MessageListener) r -> {
2994+
});
2995+
containerProps.setMissingTopicsFatal(false);
2996+
KafkaMessageListenerContainer<Integer, String> container =
2997+
new KafkaMessageListenerContainer<>(cf, containerProps);
2998+
container.start();
2999+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
3000+
container.stop();
3001+
if (sync) {
3002+
verify(consumer, times(4)).commitSync(any(), any());
3003+
}
3004+
else {
3005+
verify(consumer, times(4)).commitAsync(any(), any());
3006+
}
3007+
}
3008+
29403009
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
29413010
Consumer<?, ?> consumer =
29423011
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class);

0 commit comments

Comments
 (0)