Skip to content

Commit 2a051cc

Browse files
committed
(feat) recommit
1 parent 7f8c89e commit 2a051cc

19 files changed

+546
-239
lines changed

src/main/java/cn/leancloud/kafka/consumer/AbstractCommitPolicy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import static java.util.function.BinaryOperator.maxBy;
1515
import static java.util.stream.Collectors.toSet;
1616

17-
abstract class AbstractCommitPolicy<K,V> implements CommitPolicy<K,V> {
17+
abstract class AbstractCommitPolicy<K, V> implements CommitPolicy<K, V> {
1818
protected final Consumer<K, V> consumer;
1919
final Map<TopicPartition, Long> topicOffsetHighWaterMark;
2020
final Map<TopicPartition, OffsetAndMetadata> completedTopicOffsets;

src/main/java/cn/leancloud/kafka/consumer/AbstractPartialCommitPolicy.java

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,40 +5,18 @@
55
import org.apache.kafka.common.TopicPartition;
66

77
import java.time.Duration;
8-
import java.util.HashMap;
98
import java.util.Map;
109

11-
abstract class AbstractPartialCommitPolicy<K, V> extends AbstractCommitPolicy<K, V> {
12-
private final Duration forceWholeCommitInterval;
13-
private long nextWholeCommitNanos;
14-
15-
AbstractPartialCommitPolicy(Consumer<K, V> consumer, Duration forceWholeCommitInterval) {
16-
super(consumer);
17-
this.forceWholeCommitInterval = forceWholeCommitInterval;
18-
this.nextWholeCommitNanos = nextForceWholeCommitTime(forceWholeCommitInterval);
10+
abstract class AbstractPartialCommitPolicy<K, V> extends AbstractRecommitAwareCommitPolicy<K, V> {
11+
AbstractPartialCommitPolicy(Consumer<K, V> consumer, Duration RecommitInterval) {
12+
super(consumer, RecommitInterval);
1913
}
2014

21-
Map<TopicPartition, OffsetAndMetadata> offsetsToPartialCommit() {
22-
if (needWholeCommit()) {
23-
final Map<TopicPartition, OffsetAndMetadata> ret = new HashMap<>(completedTopicOffsets);
24-
for (TopicPartition partition : consumer.assignment()) {
25-
final OffsetAndMetadata offset = consumer.committed(partition);
26-
if (offset != null) {
27-
ret.putIfAbsent(partition, offset);
28-
}
29-
}
30-
nextWholeCommitNanos = nextForceWholeCommitTime(forceWholeCommitInterval);
31-
return ret;
15+
Map<TopicPartition, OffsetAndMetadata> offsetsForPartialCommit() {
16+
if (needRecommit()) {
17+
return offsetsForRecommit();
3218
} else {
3319
return completedTopicOffsets;
3420
}
3521
}
36-
37-
private boolean needWholeCommit() {
38-
return System.nanoTime() >= nextWholeCommitNanos;
39-
}
40-
41-
private long nextForceWholeCommitTime(Duration forceWholeCommitInterval) {
42-
return System.nanoTime() + forceWholeCommitInterval.toNanos();
43-
}
4422
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package cn.leancloud.kafka.consumer;
2+
3+
import org.apache.kafka.clients.consumer.Consumer;
4+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
5+
import org.apache.kafka.common.TopicPartition;
6+
7+
import java.time.Duration;
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
11+
abstract class AbstractRecommitAwareCommitPolicy<K, V> extends AbstractCommitPolicy<K, V> {
12+
private final Duration recommitInterval;
13+
private long nextRecommitNanos;
14+
15+
AbstractRecommitAwareCommitPolicy(Consumer<K, V> consumer, Duration recommitInterval) {
16+
super(consumer);
17+
this.recommitInterval = recommitInterval;
18+
updateNextRecommitTime(System.nanoTime());
19+
}
20+
21+
Map<TopicPartition, OffsetAndMetadata> offsetsForRecommit() {
22+
assert needRecommit() : "current nanos: " + System.nanoTime() + " nextRecommitNanos:" + nextRecommitNanos;
23+
24+
final Map<TopicPartition, OffsetAndMetadata> ret = new HashMap<>(completedTopicOffsets);
25+
for (TopicPartition partition : consumer.assignment()) {
26+
final OffsetAndMetadata offset = consumer.committed(partition);
27+
if (offset != null) {
28+
ret.putIfAbsent(partition, offset);
29+
}
30+
}
31+
32+
return ret;
33+
}
34+
35+
boolean needRecommit() {
36+
return System.nanoTime() >= nextRecommitNanos;
37+
}
38+
39+
void updateNextRecommitTime() {
40+
updateNextRecommitTime(System.nanoTime());
41+
}
42+
43+
long nextRecommitNanos() {
44+
return nextRecommitNanos;
45+
}
46+
47+
private void updateNextRecommitTime(long currentNanos) {
48+
nextRecommitNanos = currentNanos + recommitInterval.toNanos();
49+
}
50+
}
Lines changed: 70 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,42 @@
11
package cn.leancloud.kafka.consumer;
22

33
import org.apache.kafka.clients.consumer.Consumer;
4+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
5+
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
46
import org.apache.kafka.common.TopicPartition;
57
import org.slf4j.Logger;
68
import org.slf4j.LoggerFactory;
79

10+
import java.time.Duration;
811
import java.util.Collections;
912
import java.util.HashSet;
13+
import java.util.Map;
1014
import java.util.Set;
1115

12-
final class AsyncCommitPolicy<K, V> extends AbstractCommitPolicy<K, V> {
16+
final class AsyncCommitPolicy<K, V> extends AbstractRecommitAwareCommitPolicy<K, V> {
1317
private static final Logger logger = LoggerFactory.getLogger(AsyncCommitPolicy.class);
1418

1519
private final int maxPendingAsyncCommits;
20+
private final OffsetCommitCallback callback;
1621
private int pendingAsyncCommitCounter;
1722
private boolean forceSync;
1823

19-
AsyncCommitPolicy(Consumer<K, V> consumer, int maxPendingAsyncCommits) {
20-
super(consumer);
24+
AsyncCommitPolicy(Consumer<K, V> consumer, Duration recommitInterval, int maxPendingAsyncCommits) {
25+
super(consumer, recommitInterval);
2126
this.maxPendingAsyncCommits = maxPendingAsyncCommits;
27+
this.callback = new AsyncCommitCallback();
2228
}
2329

2430
@Override
2531
public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
2632
if (!noPendingRecords || completedTopicOffsets.isEmpty()) {
33+
if (needRecommit()) {
34+
commit(offsetsForRecommit());
35+
}
2736
return Collections.emptySet();
2837
}
2938

30-
if (forceSync || pendingAsyncCommitCounter >= maxPendingAsyncCommits) {
31-
consumer.commitSync();
32-
pendingAsyncCommitCounter = 0;
33-
forceSync = false;
34-
} else {
35-
++pendingAsyncCommitCounter;
36-
consumer.commitAsync((offsets, exception) -> {
37-
--pendingAsyncCommitCounter;
38-
assert pendingAsyncCommitCounter >= 0 : "actual: " + pendingAsyncCommitCounter;
39-
if (exception != null) {
40-
logger.warn("Failed to commit offsets: " + offsets + " asynchronously", exception);
41-
forceSync = true;
42-
}
43-
});
44-
}
39+
commit();
4540

4641
final Set<TopicPartition> partitions = new HashSet<>(completedTopicOffsets.keySet());
4742
// it's OK to clear these collections here and we will not left any complete offset without commit even
@@ -50,4 +45,61 @@ public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
5045
topicOffsetHighWaterMark.clear();
5146
return partitions;
5247
}
48+
49+
int pendingAsyncCommitCount() {
50+
return pendingAsyncCommitCounter;
51+
}
52+
53+
boolean forceSync() {
54+
return forceSync;
55+
}
56+
57+
void setForceSync(boolean forceSync) {
58+
this.forceSync = forceSync;
59+
}
60+
61+
private void commit() {
62+
commit(Collections.emptyMap());
63+
}
64+
65+
private void commit(Map<TopicPartition, OffsetAndMetadata> offsets) {
66+
if (forceSync || pendingAsyncCommitCounter >= maxPendingAsyncCommits) {
67+
syncCommit(offsets);
68+
pendingAsyncCommitCounter = 0;
69+
forceSync = false;
70+
} else {
71+
asyncCommit(offsets);
72+
}
73+
// update next recommit time even if async commit failed, we tolerate this situation
74+
updateNextRecommitTime();
75+
}
76+
77+
private void asyncCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
78+
++pendingAsyncCommitCounter;
79+
if (offsets.isEmpty()) {
80+
consumer.commitAsync(callback);
81+
} else {
82+
consumer.commitAsync(offsets, callback);
83+
}
84+
}
85+
86+
private void syncCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
87+
if (offsets.isEmpty()) {
88+
consumer.commitSync();
89+
} else {
90+
consumer.commitSync(offsets);
91+
}
92+
}
93+
94+
private class AsyncCommitCallback implements OffsetCommitCallback {
95+
@Override
96+
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
97+
--pendingAsyncCommitCounter;
98+
assert pendingAsyncCommitCounter >= 0 : "actual: " + pendingAsyncCommitCounter;
99+
if (exception != null) {
100+
logger.warn("Failed to commit offsets: " + offsets + " asynchronously", exception);
101+
forceSync = true;
102+
}
103+
}
104+
}
53105
}

src/main/java/cn/leancloud/kafka/consumer/AutoCommitPolicy.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import org.apache.kafka.clients.consumer.Consumer;
44
import org.apache.kafka.common.TopicPartition;
55

6+
import java.time.Duration;
67
import java.util.Collections;
78
import java.util.Set;
89

src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilder.java

Lines changed: 46 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ private static void requireArgument(boolean expression, String template, Object.
8989
@Nullable
9090
private CommitPolicy<K, V> policy;
9191
@Nullable
92-
private Duration forceWholeCommitInterval;
92+
private Duration recommitInterval;
9393

9494
private LcKafkaConsumerBuilder(Map<String, Object> kafkaConsumerConfigs,
9595
ConsumerRecordHandler<K, V> consumerRecordHandler) {
@@ -193,52 +193,56 @@ public LcKafkaConsumerBuilder<K, V> maxPendingAsyncCommits(int maxPendingAsyncCo
193193
}
194194

195195
/**
196-
* The interval to commit all partitions and it's completed offsets to broker on a partial commit consumer.
196+
* The interval to commit all partitions and it's completed offsets to broker on a non-automatic commit consumer.
197197
* <p>
198-
* This configuration is only valid and is required on partial commit consumer build with
198+
* This configuration is only valid and is required on a non-automatic commit consumer build with
199+
* {@link LcKafkaConsumerBuilder#buildSync()}, {@link LcKafkaConsumerBuilder#buildAsync()},
199200
* {@link LcKafkaConsumerBuilder#buildPartialSync()} or {@link LcKafkaConsumerBuilder#buildPartialAsync()}.
200201
* For these kind of consumers, usually they only commit offsets of a partition when there was records consumed from
201202
* that partition and all these consumed records was handled successfully. But we must periodically commit those
202-
* subscribed partitions who have not have any records too. Otherwise, after commit offset log retention timeout,
203-
* Kafka broker may forget where the current commit offset of these partition for the consumer are. Then, when the
204-
* consumer crashed and recovered, if the consumer set "auto.offset.reset" configuration to "earliest", it may
205-
* consume a already consumed record again. So please make sure that {@code forceWholeCommitIntervalInMillis}
206-
* is within log retention time set on Kafka broker.
207-
* <p>
208-
* The default {@code forceWholeCommitInterval} is 1 hour.
203+
* subscribed partitions who have had records but no new records for a long time too. Otherwise, after commit offset
204+
* retention timeout, Kafka broker may forget where the current commit offset of these partition for the consumer
205+
* are. Then, when the consumer crashed and recovered, if the consumer set <code>auto.offset.reset</code>
206+
* configuration to <code>earliest</code>, it may consume a already consumed record again. So please make sure
207+
* that {@code recommitIntervalInMillis} is within the limit set by <code>offsets.retention.minutes</code>
208+
* on Kafka broker or even within 1/3 of that limit to tolerate some commit failures on async commit consumer.
209+
* <p>
210+
* The default {@code recommitInterval} is 1 hour.
209211
*
210-
* @param forceWholeCommitIntervalInMillis the interval in millis seconds to do a whole commit
212+
* @param recommitIntervalInMillis the interval in millis seconds to do a recommit
211213
* @return this
212214
*/
213-
public LcKafkaConsumerBuilder<K, V> forceWholeCommitIntervalInMillis(long forceWholeCommitIntervalInMillis) {
214-
requireArgument(forceWholeCommitIntervalInMillis > 0,
215-
"forceWholeCommitIntervalInMillis: %s (expected > 0)", forceWholeCommitIntervalInMillis);
215+
public LcKafkaConsumerBuilder<K, V> recommitIntervalInMillis(long recommitIntervalInMillis) {
216+
requireArgument(recommitIntervalInMillis > 0,
217+
"recommitIntervalInMillis: %s (expected > 0)", recommitIntervalInMillis);
216218

217-
this.forceWholeCommitInterval = Duration.ofMillis(forceWholeCommitIntervalInMillis);
219+
this.recommitInterval = Duration.ofMillis(recommitIntervalInMillis);
218220
return this;
219221
}
220222

221223
/**
222-
* The interval to commit all partitions and it's completed offsets to broker on a partial commit consumer.
224+
* The interval to commit all partitions and it's completed offsets to broker on a non-automatic commit consumer.
223225
* <p>
224-
* This configuration is only valid on partial commit consumer build with
226+
* This configuration is only valid and is required on a non-automatic commit consumer build with
227+
* {@link LcKafkaConsumerBuilder#buildSync()}, {@link LcKafkaConsumerBuilder#buildAsync()},
225228
* {@link LcKafkaConsumerBuilder#buildPartialSync()} or {@link LcKafkaConsumerBuilder#buildPartialAsync()}.
226229
* For these kind of consumers, usually they only commit offsets of a partition when there was records consumed from
227230
* that partition and all these consumed records was handled successfully. But we must periodically commit those
228-
* subscribed partitions who have not have any records too. Otherwise, after commit offset log retention timeout,
229-
* Kafka broker may forget where the current commit offset of these partition for the consumer are. Then, when the
230-
* consumer crashed and recovered, if the consumer set "auto.offset.reset" configuration to "earliest", it may
231-
* consume a already consumed record again. So please make sure that {@code forceWholeCommitInterval}
232-
* is within log retention time set on Kafka broker.
233-
* <p>
234-
* The default {@code forceWholeCommitInterval} is 1 hour.
231+
* subscribed partitions who have had records but no new records for a long time too. Otherwise, after commit offset
232+
* retention timeout, Kafka broker may forget where the current commit offset of these partition for the consumer
233+
* are. Then, when the consumer crashed and recovered, if the consumer set <code>auto.offset.reset</code>
234+
* configuration to <code>earliest</code>, it may consume a already consumed record again. So please make sure
235+
* that {@code recommitInterval} is within the limit set by <code>offsets.retention.minutes</code> on
236+
* Kafka broker or even within 1/3 of that limit to tolerate some commit failures on async commit consumer..
237+
* <p>
238+
* The default {@code recommitInterval} is 1 hour.
235239
*
236-
* @param forceWholeCommitInterval the interval to do a whole commit
240+
* @param recommitInterval the interval to do a recommit
237241
* @return this
238242
*/
239-
public LcKafkaConsumerBuilder<K, V> forceWholeCommitInterval(Duration forceWholeCommitInterval) {
240-
requireNonNull(forceWholeCommitInterval, "forceWholeCommitInterval");
241-
this.forceWholeCommitInterval = forceWholeCommitInterval;
243+
public LcKafkaConsumerBuilder<K, V> recommitInterval(Duration recommitInterval) {
244+
requireNonNull(recommitInterval, "recommitInterval");
245+
this.recommitInterval = recommitInterval;
242246
return this;
243247
}
244248

@@ -342,7 +346,7 @@ public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildAuto() {
342346
*/
343347
public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildSync() {
344348
consumer = buildConsumer(false);
345-
policy = new SyncCommitPolicy<>(consumer);
349+
policy = new SyncCommitPolicy<>(consumer, getRecommitInterval());
346350
return doBuild();
347351
}
348352

@@ -367,15 +371,8 @@ public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildSync() {
367371
* @return this
368372
*/
369373
public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildPartialSync() {
370-
if (forceWholeCommitInterval == null) {
371-
logger.warn("Force whole commit interval is not set for a partial commit consumer, the default " +
372-
"interval of 1 hour will be used.");
373-
forceWholeCommitInterval = Duration.ofHours(1);
374-
}
375-
assert forceWholeCommitInterval != null;
376-
377374
consumer = buildConsumer(false);
378-
policy = new PartialSyncCommitPolicy<>(consumer, forceWholeCommitInterval);
375+
policy = new PartialSyncCommitPolicy<>(consumer, getRecommitInterval());
379376
return doBuild();
380377
}
381378

@@ -406,7 +403,7 @@ public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildPartialSync() {
406403
*/
407404
public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildAsync() {
408405
consumer = buildConsumer(false);
409-
policy = new AsyncCommitPolicy<>(consumer, maxPendingAsyncCommits);
406+
policy = new AsyncCommitPolicy<>(consumer, getRecommitInterval(), maxPendingAsyncCommits);
410407
return doBuild();
411408
}
412409

@@ -435,15 +432,8 @@ public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildAsync() {
435432
* @return this
436433
*/
437434
public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildPartialAsync() {
438-
if (forceWholeCommitInterval == null) {
439-
logger.warn("Force whole commit interval is not set for a partial commit consumer, the default " +
440-
"interval of 30 seconds will be used.");
441-
forceWholeCommitInterval = Duration.ofSeconds(30);
442-
}
443-
assert forceWholeCommitInterval != null;
444-
445435
consumer = buildConsumer(false);
446-
policy = new PartialAsyncCommitPolicy<>(consumer, forceWholeCommitInterval, maxPendingAsyncCommits);
436+
policy = new PartialAsyncCommitPolicy<>(consumer, getRecommitInterval(), maxPendingAsyncCommits);
447437
return doBuild();
448438
}
449439

@@ -494,6 +484,16 @@ private Consumer<K, V> buildConsumer(boolean autoCommit) {
494484
return new KafkaConsumer<>(configs, keyDeserializer, valueDeserializer);
495485
}
496486

487+
Duration getRecommitInterval() {
488+
if (recommitInterval == null) {
489+
logger.warn("Recommit interval is not set for a non-automatic commit consumer, the default " +
490+
"interval of 1 hour will be used.");
491+
recommitInterval = Duration.ofHours(1);
492+
}
493+
494+
return recommitInterval;
495+
}
496+
497497
private void checkConfigs(KafkaConfigsChecker[] checkers) {
498498
for (KafkaConfigsChecker check : checkers) {
499499
check.check(configs);

0 commit comments

Comments
 (0)