Skip to content

Commit ff57bb6

Browse files
committed
(feat) add retry on failed sync commit
1 parent eb39c30 commit ff57bb6

14 files changed

+337
-50
lines changed

src/main/java/cn/leancloud/kafka/consumer/AbstractCommitPolicy.java

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
import org.apache.kafka.clients.consumer.ConsumerRecord;
55
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
66
import org.apache.kafka.common.TopicPartition;
7+
import org.apache.kafka.common.errors.RetriableException;
78

9+
import java.time.Duration;
810
import java.util.HashMap;
911
import java.util.HashSet;
1012
import java.util.Map;
@@ -15,14 +17,50 @@
1517
import static java.util.stream.Collectors.toSet;
1618

1719
abstract class AbstractCommitPolicy<K, V> implements CommitPolicy<K, V> {
18-
protected final Consumer<K, V> consumer;
20+
static SleepFunction sleepFunction = Thread::sleep;
21+
22+
interface SleepFunction {
23+
void sleep(long timeout) throws InterruptedException;
24+
}
25+
26+
private static class RetryContext {
27+
private final long retryInterval;
28+
private final int maxAttempts;
29+
private int numOfAttempts;
30+
31+
private RetryContext(long retryInterval, int maxAttempts) {
32+
this.retryInterval = retryInterval;
33+
this.maxAttempts = maxAttempts;
34+
this.numOfAttempts = 0;
35+
}
36+
37+
void onError(RetriableException e) {
38+
if (++numOfAttempts >= maxAttempts) {
39+
throw e;
40+
} else {
41+
try {
42+
sleepFunction.sleep(retryInterval);
43+
} catch (InterruptedException ex) {
44+
e.addSuppressed(ex);
45+
Thread.currentThread().interrupt();
46+
throw e;
47+
}
48+
}
49+
}
50+
}
51+
1952
final Map<TopicPartition, Long> topicOffsetHighWaterMark;
2053
final Map<TopicPartition, OffsetAndMetadata> completedTopicOffsets;
54+
protected final Consumer<K, V> consumer;
55+
private final long syncCommitRetryIntervalMs;
56+
private final int maxAttemptsForEachSyncCommit;
2157

22-
AbstractCommitPolicy(Consumer<K, V> consumer) {
58+
AbstractCommitPolicy(Consumer<K, V> consumer, Duration syncCommitRetryInterval, int maxAttemptsForEachSyncCommit) {
2359
this.consumer = consumer;
2460
this.topicOffsetHighWaterMark = new HashMap<>();
2561
this.completedTopicOffsets = new HashMap<>();
62+
this.syncCommitRetryIntervalMs = syncCommitRetryInterval.toMillis();
63+
this.maxAttemptsForEachSyncCommit = maxAttemptsForEachSyncCommit;
2664
}
2765

2866
@Override
@@ -43,7 +81,7 @@ public void markCompletedRecord(ConsumerRecord<K, V> record) {
4381

4482
@Override
4583
public Set<TopicPartition> syncPartialCommit() {
46-
consumer.commitSync(completedTopicOffsets);
84+
commitSync(completedTopicOffsets);
4785
final Set<TopicPartition> partitions = checkCompletedPartitions();
4886
completedTopicOffsets.clear();
4987
for (TopicPartition p : partitions) {
@@ -85,6 +123,30 @@ Map<TopicPartition, OffsetAndMetadata> completedTopicOffsets() {
85123
return completedTopicOffsets;
86124
}
87125

126+
void commitSync() {
127+
final RetryContext context = context();
128+
do {
129+
try {
130+
consumer.commitSync();
131+
return;
132+
} catch (RetriableException e) {
133+
context.onError(e);
134+
}
135+
} while (true);
136+
}
137+
138+
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
139+
final RetryContext context = context();
140+
do {
141+
try {
142+
consumer.commitSync(offsets);
143+
return;
144+
} catch (RetriableException e) {
145+
context.onError(e);
146+
}
147+
} while (true);
148+
}
149+
88150
private Set<TopicPartition> checkCompletedPartitions() {
89151
return completedTopicOffsets
90152
.entrySet()
@@ -102,4 +164,8 @@ private boolean topicOffsetMeetHighWaterMark(TopicPartition topicPartition, Offs
102164
// maybe this partition revoked before a msg of this partition was processed
103165
return true;
104166
}
167+
168+
private RetryContext context() {
169+
return new RetryContext(syncCommitRetryIntervalMs, maxAttemptsForEachSyncCommit);
170+
}
105171
}

src/main/java/cn/leancloud/kafka/consumer/AbstractRecommitAwareCommitPolicy.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@ abstract class AbstractRecommitAwareCommitPolicy<K, V> extends AbstractCommitPol
1212
private final Duration recommitInterval;
1313
private long nextRecommitNanos;
1414

15-
AbstractRecommitAwareCommitPolicy(Consumer<K, V> consumer, Duration recommitInterval) {
16-
super(consumer);
15+
AbstractRecommitAwareCommitPolicy(Consumer<K, V> consumer,
16+
Duration syncCommitRetryInterval,
17+
int maxAttemptsForEachSyncCommit,
18+
Duration recommitInterval) {
19+
super(consumer, syncCommitRetryInterval, maxAttemptsForEachSyncCommit);
1720
this.recommitInterval = recommitInterval;
1821
updateNextRecommitTime(System.nanoTime());
1922
}

src/main/java/cn/leancloud/kafka/consumer/AsyncCommitPolicy.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@ final class AsyncCommitPolicy<K, V> extends AbstractRecommitAwareCommitPolicy<K,
2121
private int pendingAsyncCommitCounter;
2222
private boolean forceSync;
2323

24-
AsyncCommitPolicy(Consumer<K, V> consumer, Duration recommitInterval, int maxPendingAsyncCommits) {
25-
super(consumer, recommitInterval);
24+
AsyncCommitPolicy(Consumer<K, V> consumer,
25+
Duration syncCommitRetryInterval,
26+
int maxAttemptsForEachSyncCommit,
27+
Duration recommitInterval,
28+
int maxPendingAsyncCommits) {
29+
super(consumer, syncCommitRetryInterval, maxAttemptsForEachSyncCommit, recommitInterval);
2630
this.maxPendingAsyncCommits = maxPendingAsyncCommits;
2731
this.callback = new AsyncCommitCallback();
2832
}
@@ -91,9 +95,9 @@ private void asyncCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
9195

9296
private void syncCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
9397
if (offsets.isEmpty()) {
94-
consumer.commitSync();
98+
commitSync();
9599
} else {
96-
consumer.commitSync(offsets);
100+
commitSync(offsets);
97101
}
98102
}
99103

src/main/java/cn/leancloud/kafka/consumer/AutoCommitPolicy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99

1010
final class AutoCommitPolicy<K, V> extends AbstractCommitPolicy<K, V> {
1111
AutoCommitPolicy(Consumer<K, V> consumer) {
12-
super(consumer);
12+
// For auto commit policy we don't commit by hand so we don't need retry sync commit facilities
13+
super(consumer, Duration.ZERO, 1);
1314
}
1415

1516
@Override

src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilder.java

Lines changed: 104 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package cn.leancloud.kafka.consumer;
22

3-
import org.apache.kafka.clients.consumer.Consumer;
4-
import org.apache.kafka.clients.consumer.ConsumerRecord;
5-
import org.apache.kafka.clients.consumer.KafkaConsumer;
6-
import org.apache.kafka.clients.consumer.MockConsumer;
3+
import org.apache.kafka.clients.consumer.*;
74
import org.apache.kafka.common.serialization.Deserializer;
85
import org.slf4j.Logger;
96
import org.slf4j.LoggerFactory;
@@ -85,6 +82,8 @@ private static void requireArgument(boolean expression, String template, Object.
8582
private Map<String, Object> configs;
8683
private ConsumerRecordHandler<K, V> consumerRecordHandler;
8784
private ConsumerSeekDestination forceSeekTo = ConsumerSeekDestination.NONE;
85+
private Duration syncCommitRetryInterval = Duration.ofSeconds(1);
86+
private int maxAttemptsForEachSyncCommit = 3;
8887
@Nullable
8988
private Consumer<K, V> consumer;
9089
@Nullable
@@ -235,6 +234,83 @@ public LcKafkaConsumerBuilder<K, V> gracefulShutdownTimeout(Duration gracefulShu
235234
return this;
236235
}
237236

237+
/**
238+
* Sets the amount of time to wait before retry a failed synchronous commit on calling {@link KafkaConsumer#commitSync()}.
239+
* or {@link KafkaConsumer#commitSync(Map)}. Every synchronous commit may fail but most of times they are caused by
240+
* {@link org.apache.kafka.common.errors.RetriableException} and we can retry commit on this kind of exception safely.
241+
* This configuration set the interval between each retry.
242+
* <p>
243+
* For those failures of asynchronous commit by calling {@link KafkaConsumer#commitAsync()} or
244+
* {@link KafkaConsumer#commitAsync(OffsetCommitCallback)}, we retry them by a synchronous commit automatically
245+
* when we found any of them. So we only need configurations for synchronous commits.
246+
* <p>
247+
* The default {@code syncCommitRetryIntervalMillis} is 1000.
248+
*
249+
* @param syncCommitRetryIntervalMillis the retry interval in milliseconds
250+
* @return this
251+
* @throws IllegalArgumentException if {@code syncCommitRetryIntervalMillis} is a negative value
252+
*/
253+
public LcKafkaConsumerBuilder<K, V> syncCommitRetryIntervalMillis(long syncCommitRetryIntervalMillis) {
254+
requireArgument(syncCommitRetryIntervalMillis >= 0,
255+
"syncCommitRetryIntervalMillis: %s (expected >= 0)", syncCommitRetryIntervalMillis);
256+
this.syncCommitRetryInterval = Duration.ofMillis(syncCommitRetryIntervalMillis);
257+
return this;
258+
}
259+
260+
/**
261+
* Sets the amount of time to wait before retry a failed synchronous commit on calling {@link KafkaConsumer#commitSync()}.
262+
* or {@link KafkaConsumer#commitSync(Map)}. Every synchronous commit may fail but most of times they are caused by
263+
* {@link org.apache.kafka.common.errors.RetriableException} and we can retry commit on this kind of exception safely.
264+
* This configuration set the interval between each retry.
265+
* <p>
266+
* For those failures of asynchronous commit by calling {@link KafkaConsumer#commitAsync()} or
267+
* {@link KafkaConsumer#commitAsync(OffsetCommitCallback)}, we retry them by a synchronous commit automatically
268+
* when we found any of them. So we only need configurations for synchronous commits.
269+
* <p>
270+
* The default {@code syncCommitRetryInterval} is 1 second.
271+
*
272+
* @param syncCommitRetryInterval the retry interval
273+
* @return this
274+
* @throws NullPointerException if {@code syncCommitRetryInterval} is null
275+
* @throws IllegalArgumentException if {@code syncCommitRetryInterval} is a negative duration
276+
*/
277+
public LcKafkaConsumerBuilder<K, V> syncCommitRetryInterval(Duration syncCommitRetryInterval) {
278+
requireNonNull(syncCommitRetryInterval, "syncCommitRetryInterval");
279+
requireArgument(!syncCommitRetryInterval.isNegative(),
280+
"syncCommitRetryInterval: %s (expect positive or zero duration)", syncCommitRetryInterval);
281+
this.syncCommitRetryInterval = syncCommitRetryInterval;
282+
return this;
283+
}
284+
285+
/**
286+
* Sets the maximum attempt times for a synchronous commit by calling {@link KafkaConsumer#commitSync()}.
287+
* or {@link KafkaConsumer#commitSync(Map)}. Every synchronous commit may fail but most of times they are caused by
288+
* {@link org.apache.kafka.common.errors.RetriableException} and we can retry commit on this kind of exception safely.
289+
* This configuration cap the maximum retry times. If attempts reach to {@code maxAttemptsForEachSyncCommit}, the cached
290+
* {@link org.apache.kafka.common.errors.RetriableException} will be rethrown by then it will cause the Kafka Consumer
291+
* to stop and quit.
292+
* <p>
293+
* For those failures of asynchronous commit by calling {@link KafkaConsumer#commitAsync()} or
294+
* {@link KafkaConsumer#commitAsync(OffsetCommitCallback)}, we retry them by a synchronous commit automatically
295+
* when we found any of them. So we only need configurations for synchronous commits.
296+
* <p>
297+
* Please note that {@code maxAttemptsForEachSyncCommit} multiplies
298+
* {@code syncCommitRetryInterval} should far lower than {@code max.poll.interval.ms}, otherwise Kafka Consumer
299+
* may encounter session timeout or polling timeout due to not calling {@link KafkaConsumer#poll(long)} for too long.
300+
* <p>
301+
* The default {@code maxAttemptsForEachSyncCommit} is 3.
302+
*
303+
* @param maxAttemptsForEachSyncCommit maximum attempt times for a synchronous commit
304+
* @return this
305+
* @throws IllegalArgumentException if {@code maxAttemptsForEachSyncCommit} is zero or a negative value
306+
*/
307+
public LcKafkaConsumerBuilder<K, V> maxAttemptsForEachSyncCommit(int maxAttemptsForEachSyncCommit) {
308+
requireArgument(maxAttemptsForEachSyncCommit > 0,
309+
"maxAttemptsForEachSyncCommit: %s (expect > 0)", maxAttemptsForEachSyncCommit);
310+
this.maxAttemptsForEachSyncCommit = maxAttemptsForEachSyncCommit;
311+
return this;
312+
}
313+
238314
/**
239315
* When using async consumer to commit offset asynchronously, this argument can force consumer to do a synchronous
240316
* commit after there's already this ({@code maxPendingAsyncCommits}) many async commits on the fly without
@@ -407,7 +483,8 @@ public LcKafkaConsumerBuilder<K, V> forceSeekTo(ConsumerSeekDestination destinat
407483
public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildAuto() {
408484
checkConfigs(AutoCommitConsumerConfigs.values());
409485
consumer = buildConsumer(true);
410-
policy = workerPool == ImmediateExecutorService.INSTANCE ? NoOpCommitPolicy.getInstance() : new AutoCommitPolicy<>(consumer);
486+
policy = workerPool == ImmediateExecutorService.INSTANCE ?
487+
NoOpCommitPolicy.getInstance() : new AutoCommitPolicy<>(consumer);
411488
return doBuild();
412489
}
413490

@@ -435,7 +512,11 @@ public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildAuto() {
435512
*/
436513
public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildSync() {
437514
consumer = buildConsumer(false);
438-
policy = new SyncCommitPolicy<>(consumer, getRecommitInterval());
515+
policy = new SyncCommitPolicy<>(
516+
consumer,
517+
syncCommitRetryInterval,
518+
maxAttemptsForEachSyncCommit,
519+
getRecommitInterval());
439520
return doBuild();
440521
}
441522

@@ -462,7 +543,11 @@ public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildSync() {
462543
*/
463544
public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildPartialSync() {
464545
consumer = buildConsumer(false);
465-
policy = new PartialSyncCommitPolicy<>(consumer, getRecommitInterval());
546+
policy = new PartialSyncCommitPolicy<>(
547+
consumer,
548+
syncCommitRetryInterval,
549+
maxAttemptsForEachSyncCommit,
550+
getRecommitInterval());
466551
return doBuild();
467552
}
468553

@@ -494,7 +579,12 @@ public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildPartialSync() {
494579
*/
495580
public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildAsync() {
496581
consumer = buildConsumer(false);
497-
policy = new AsyncCommitPolicy<>(consumer, getRecommitInterval(), maxPendingAsyncCommits);
582+
policy = new AsyncCommitPolicy<>(
583+
consumer,
584+
syncCommitRetryInterval,
585+
maxAttemptsForEachSyncCommit,
586+
getRecommitInterval(),
587+
maxPendingAsyncCommits);
498588
return doBuild();
499589
}
500590

@@ -525,7 +615,12 @@ public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildAsync() {
525615
*/
526616
public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildPartialAsync() {
527617
consumer = buildConsumer(false);
528-
policy = new PartialAsyncCommitPolicy<>(consumer, getRecommitInterval(), maxPendingAsyncCommits);
618+
policy = new PartialAsyncCommitPolicy<>(
619+
consumer,
620+
syncCommitRetryInterval,
621+
maxAttemptsForEachSyncCommit,
622+
getRecommitInterval(),
623+
maxPendingAsyncCommits);
529624
return doBuild();
530625
}
531626

src/main/java/cn/leancloud/kafka/consumer/PartialAsyncCommitPolicy.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,12 @@ final class PartialAsyncCommitPolicy<K, V> extends AbstractRecommitAwareCommitPo
1919
private int pendingAsyncCommitCounter;
2020
private boolean forceSync;
2121

22-
PartialAsyncCommitPolicy(Consumer<K, V> consumer, Duration forceWholeCommitInterval, int maxPendingAsyncCommits) {
23-
super(consumer, forceWholeCommitInterval);
22+
PartialAsyncCommitPolicy(Consumer<K, V> consumer,
23+
Duration syncCommitRetryInterval,
24+
int maxAttemptsForEachSyncCommit,
25+
Duration forceWholeCommitInterval,
26+
int maxPendingAsyncCommits) {
27+
super(consumer, syncCommitRetryInterval, maxAttemptsForEachSyncCommit, forceWholeCommitInterval);
2428
this.maxPendingAsyncCommits = maxPendingAsyncCommits;
2529
this.callback = new AsyncCommitCallback();
2630
this.pendingAsyncCommitOffset = new HashMap<>();
@@ -36,7 +40,7 @@ public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
3640
} else {
3741
final Set<TopicPartition> partitions = getCompletedPartitions(noPendingRecords);
3842
if (syncCommit) {
39-
consumer.commitSync(offsets);
43+
commitSync(offsets);
4044
pendingAsyncCommitOffset.clear();
4145
pendingAsyncCommitCounter = 0;
4246
forceSync = false;

src/main/java/cn/leancloud/kafka/consumer/PartialSyncCommitPolicy.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,18 @@
1010
import java.util.Set;
1111

1212
final class PartialSyncCommitPolicy<K, V> extends AbstractRecommitAwareCommitPolicy<K, V> {
13-
PartialSyncCommitPolicy(Consumer<K, V> consumer, Duration forceWholeCommitInterval) {
14-
super(consumer, forceWholeCommitInterval);
13+
PartialSyncCommitPolicy(Consumer<K, V> consumer,
14+
Duration syncCommitRetryInterval,
15+
int maxAttemptsForEachSyncCommit,
16+
Duration forceWholeCommitInterval) {
17+
super(consumer, syncCommitRetryInterval, maxAttemptsForEachSyncCommit, forceWholeCommitInterval);
1518
}
1619

1720
@Override
1821
public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
1922
final Map<TopicPartition, OffsetAndMetadata> offsets = offsetsForPartialCommit();
2023
if (!offsets.isEmpty()) {
21-
consumer.commitSync(offsets);
24+
commitSync(offsets);
2225
}
2326

2427
if (completedTopicOffsets.isEmpty()) {

0 commit comments

Comments
 (0)