Skip to content

Commit 092d099

Browse files
committed
(feat) whole commit periodically
1 parent 431c3cc commit 092d099

12 files changed

+273
-28
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package cn.leancloud.kafka.consumer;
2+
3+
import org.apache.kafka.clients.consumer.Consumer;
4+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
5+
import org.apache.kafka.common.TopicPartition;
6+
7+
import java.time.Duration;
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
11+
abstract class AbstractPartialCommitPolicy<K, V> extends AbstractCommitPolicy<K, V> {
12+
private final Duration forceWholeCommitInterval;
13+
private long nextWholeCommitNanos;
14+
15+
AbstractPartialCommitPolicy(Consumer<K, V> consumer, Duration forceWholeCommitInterval) {
16+
super(consumer);
17+
this.forceWholeCommitInterval = forceWholeCommitInterval;
18+
this.nextWholeCommitNanos = nextForceWholeCommitTime(forceWholeCommitInterval);
19+
}
20+
21+
Map<TopicPartition, OffsetAndMetadata> offsetsToPartialCommit() {
22+
if (needWholeCommit()) {
23+
final Map<TopicPartition, OffsetAndMetadata> ret = new HashMap<>(completedTopicOffsets);
24+
for (TopicPartition partition : consumer.assignment()) {
25+
ret.putIfAbsent(partition, consumer.committed(partition));
26+
}
27+
nextWholeCommitNanos = nextForceWholeCommitTime(forceWholeCommitInterval);
28+
return ret;
29+
} else {
30+
return completedTopicOffsets;
31+
}
32+
}
33+
34+
private boolean needWholeCommit() {
35+
return System.nanoTime() >= nextWholeCommitNanos;
36+
}
37+
38+
private long nextForceWholeCommitTime(Duration forceWholeCommitInterval) {
39+
return System.nanoTime() + forceWholeCommitInterval.toNanos();
40+
}
41+
}

src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public void close() {
115115
workerPool.awaitTermination(1, TimeUnit.DAYS);
116116
}
117117
} catch (InterruptedException ex) {
118-
// ignore
118+
Thread.currentThread().interrupt();
119119
}
120120
}
121121

src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilder.java

Lines changed: 84 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import org.apache.kafka.clients.consumer.KafkaConsumer;
55
import org.apache.kafka.clients.consumer.MockConsumer;
66
import org.apache.kafka.common.serialization.Deserializer;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
79

810
import javax.annotation.Nullable;
911
import java.time.Duration;
@@ -22,6 +24,8 @@
2224
* @param <V> the type of value for records consumed from Kafka
2325
*/
2426
public final class LcKafkaConsumerBuilder<K, V> {
27+
private static final Logger logger = LoggerFactory.getLogger(LcKafkaConsumerBuilder.class);
28+
2529
/**
2630
* Create a {@code LcKafkaConsumerBuilder} used to build {@link LcKafkaConsumer}.
2731
*
@@ -84,6 +88,8 @@ private static void requireArgument(boolean expression, String template, Object.
8488
private Deserializer<V> valueDeserializer;
8589
@Nullable
8690
private CommitPolicy<K, V> policy;
91+
@Nullable
92+
private Duration forceWholeCommitInterval;
8793

8894
private LcKafkaConsumerBuilder(Map<String, Object> kafkaConsumerConfigs,
8995
ConsumerRecordHandler<K, V> consumerRecordHandler) {
@@ -109,14 +115,14 @@ private LcKafkaConsumerBuilder(Map<String, Object> kafkaConsumerConfigs,
109115
* If 0, poll operation will return immediately with any records that are available currently in the buffer,
110116
* else returns empty.
111117
* <p>
112-
* Must not be negative.
118+
* Must not be negative. And the default {@code pollTimeoutMillis} is 100.
113119
*
114-
* @param pollTimeoutMs the poll timeout in milliseconds
120+
* @param pollTimeoutMillis the poll timeout in milliseconds
115121
* @return this
116122
*/
117-
public LcKafkaConsumerBuilder<K, V> pollTimeoutMillis(long pollTimeoutMs) {
118-
requireArgument(pollTimeoutMs >= 0, "pollTimeoutMillis: %s (expect >= 0)", pollTimeoutMs);
119-
this.pollTimeout = pollTimeoutMs;
123+
public LcKafkaConsumerBuilder<K, V> pollTimeoutMillis(long pollTimeoutMillis) {
124+
requireArgument(pollTimeoutMillis >= 0, "pollTimeoutMillis: %s (expect >= 0)", pollTimeoutMillis);
125+
this.pollTimeout = pollTimeoutMillis;
120126
return this;
121127
}
122128

@@ -127,6 +133,7 @@ public LcKafkaConsumerBuilder<K, V> pollTimeoutMillis(long pollTimeoutMs) {
127133
* If 0, poll operation will return immediately with any records that are available currently in the buffer,
128134
* else returns empty.
129135
* <p>
136+
* The default {@code pollTimeout} is 100 millis seconds.
130137
*
131138
* @param pollTimeout the poll timeout duration
132139
* @return this
@@ -140,6 +147,8 @@ public LcKafkaConsumerBuilder<K, V> pollTimeout(Duration pollTimeout) {
140147
/**
141148
* Sets the amount of time to wait after calling {@link LcKafkaConsumer#close()} for
142149
* consumed records to handle before actually shutting down.
150+
* <p>
151+
* The default {@code gracefulShutdownTimeoutMillis} is 10_000.
143152
*
144153
* @param gracefulShutdownTimeoutMillis the graceful shutdown timeout in milliseconds
145154
* @return this
@@ -154,6 +163,8 @@ public LcKafkaConsumerBuilder<K, V> gracefulShutdownTimeoutMillis(long gracefulS
154163
/**
155164
* Sets the amount of time to wait after calling {@link LcKafkaConsumer#close()} for
156165
* consumed records to handle before actually shutting down.
166+
* <p>
167+
* The default {@code gracefulShutdownTimeout} is 10 seconds.
157168
*
158169
* @param gracefulShutdownTimeout the graceful shutdown timeout duration
159170
* @return this
@@ -168,6 +179,8 @@ public LcKafkaConsumerBuilder<K, V> gracefulShutdownTimeout(Duration gracefulShu
168179
* When using async consumer to commit offset asynchronously, this argument can force consumer to do a synchronous
169180
* commit after there's already this ({@code maxPendingAsyncCommits}) many async commits on the fly without
170181
* response from broker.
182+
* <p>
183+
* The default {@code maxPendingAsyncCommits} is 10.
171184
*
172185
* @param maxPendingAsyncCommits do a synchronous commit when pending async commits beyond this limit
173186
* @return this
@@ -179,6 +192,56 @@ public LcKafkaConsumerBuilder<K, V> maxPendingAsyncCommits(int maxPendingAsyncCo
179192
return this;
180193
}
181194

195+
/**
196+
* The interval to commit all partitions and it's completed offsets to broker on a partial commit consumer.
197+
* <p>
198+
* This configuration is only valid and is required on partial commit consumer build with
199+
* {@link LcKafkaConsumerBuilder#buildPartialSync()} or {@link LcKafkaConsumerBuilder#buildPartialAsync()}.
200+
* For these kind of consumers, usually they only commit offsets of a partition when there was records consumed from
201+
* that partition and all these consumed records was handled successfully. But we must periodically commit those
202+
* subscribed partitions who have not have any records too. Otherwise, after commit offset log retention timeout,
203+
* Kafka broker may forget where the current commit offset of these partition for the consumer are. Then, when the
204+
* consumer crashed and recovered, if the consumer set "auto.offset.reset" configuration to "earliest", it may
205+
* consume a already consumed record again. So please make sure that {@code forceWholeCommitIntervalInMillis}
206+
* is within log retention time set on Kafka broker.
207+
* <p>
208+
* The default {@code forceWholeCommitInterval} is 1 hour.
209+
*
210+
* @param forceWholeCommitIntervalInMillis the interval in millis seconds to do a whole commit
211+
* @return this
212+
*/
213+
public LcKafkaConsumerBuilder<K, V> forceWholeCommitIntervalInMillis(long forceWholeCommitIntervalInMillis) {
214+
requireArgument(forceWholeCommitIntervalInMillis > 0,
215+
"forceWholeCommitIntervalInMillis: %s (expected > 0)", forceWholeCommitIntervalInMillis);
216+
217+
this.forceWholeCommitInterval = Duration.ofMillis(forceWholeCommitIntervalInMillis);
218+
return this;
219+
}
220+
221+
/**
222+
* The interval to commit all partitions and it's completed offsets to broker on a partial commit consumer.
223+
* <p>
224+
* This configuration is only valid on partial commit consumer build with
225+
* {@link LcKafkaConsumerBuilder#buildPartialSync()} or {@link LcKafkaConsumerBuilder#buildPartialAsync()}.
226+
* For these kind of consumers, usually they only commit offsets of a partition when there was records consumed from
227+
* that partition and all these consumed records was handled successfully. But we must periodically commit those
228+
* subscribed partitions who have not have any records too. Otherwise, after commit offset log retention timeout,
229+
* Kafka broker may forget where the current commit offset of these partition for the consumer are. Then, when the
230+
* consumer crashed and recovered, if the consumer set "auto.offset.reset" configuration to "earliest", it may
231+
* consume a already consumed record again. So please make sure that {@code forceWholeCommitInterval}
232+
* is within log retention time set on Kafka broker.
233+
* <p>
234+
* The default {@code forceWholeCommitInterval} is 1 hour.
235+
*
236+
* @param forceWholeCommitInterval the interval to do a whole commit
237+
* @return this
238+
*/
239+
public LcKafkaConsumerBuilder<K, V> forceWholeCommitInterval(Duration forceWholeCommitInterval) {
240+
requireNonNull(forceWholeCommitInterval, "forceWholeCommitInterval");
241+
this.forceWholeCommitInterval = forceWholeCommitInterval;
242+
return this;
243+
}
244+
182245
/**
183246
* Internal testing usage only.
184247
* <p>
@@ -304,8 +367,15 @@ public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildSync() {
304367
* @return this
305368
*/
306369
public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildPartialSync() {
370+
if (forceWholeCommitInterval == null) {
371+
logger.warn("Force whole commit interval is not set for a partial commit consumer, the default " +
372+
"interval of 1 hour will be used.");
373+
forceWholeCommitInterval = Duration.ofHours(1);
374+
}
375+
assert forceWholeCommitInterval != null;
376+
307377
consumer = buildConsumer(false);
308-
policy = new PartialSyncCommitPolicy<>(consumer);
378+
policy = new PartialSyncCommitPolicy<>(consumer, forceWholeCommitInterval);
309379
return doBuild();
310380
}
311381

@@ -365,8 +435,15 @@ public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildAsync() {
365435
* @return this
366436
*/
367437
public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildPartialAsync() {
438+
if (forceWholeCommitInterval == null) {
439+
logger.warn("Force whole commit interval is not set for a partial commit consumer, the default " +
440+
"interval of 30 seconds will be used.");
441+
forceWholeCommitInterval = Duration.ofSeconds(30);
442+
}
443+
assert forceWholeCommitInterval != null;
444+
368445
consumer = buildConsumer(false);
369-
policy = new PartialAsyncCommitPolicy<>(consumer, maxPendingAsyncCommits);
446+
policy = new PartialAsyncCommitPolicy<>(consumer, forceWholeCommitInterval, maxPendingAsyncCommits);
370447
return doBuild();
371448
}
372449

src/main/java/cn/leancloud/kafka/consumer/PartialAsyncCommitPolicy.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,18 @@
66
import org.slf4j.Logger;
77
import org.slf4j.LoggerFactory;
88

9+
import java.time.Duration;
910
import java.util.*;
1011

11-
final class PartialAsyncCommitPolicy<K, V> extends AbstractCommitPolicy<K, V> {
12+
final class PartialAsyncCommitPolicy<K, V> extends AbstractPartialCommitPolicy<K, V> {
1213
private static final Logger logger = LoggerFactory.getLogger(PartialAsyncCommitPolicy.class);
1314

1415
private final int maxPendingAsyncCommits;
1516
private int pendingAsyncCommitCounter;
1617
private boolean forceSync;
1718

18-
PartialAsyncCommitPolicy(Consumer<K, V> consumer, int maxPendingAsyncCommits) {
19-
super(consumer);
19+
PartialAsyncCommitPolicy(Consumer<K, V> consumer, Duration forceWholeCommitInterval, int maxPendingAsyncCommits) {
20+
super(consumer, forceWholeCommitInterval);
2021
this.maxPendingAsyncCommits = maxPendingAsyncCommits;
2122
}
2223

@@ -28,13 +29,13 @@ public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
2829

2930
final Set<TopicPartition> partitions = getCompletedPartitions(noPendingRecords);
3031
if (forceSync || pendingAsyncCommitCounter >= maxPendingAsyncCommits) {
31-
consumer.commitSync(completedTopicOffsets);
32+
consumer.commitSync(offsetsToPartialCommit());
3233
pendingAsyncCommitCounter = 0;
3334
forceSync = false;
3435
clearCachedCompletedPartitionsRecords(partitions, noPendingRecords);
3536
} else {
3637
++pendingAsyncCommitCounter;
37-
consumer.commitAsync(completedTopicOffsets, (offsets, exception) -> {
38+
consumer.commitAsync(offsetsToPartialCommit(), (offsets, exception) -> {
3839
--pendingAsyncCommitCounter;
3940
assert pendingAsyncCommitCounter >= 0 : "actual: " + pendingAsyncCommitCounter;
4041
if (exception != null) {

src/main/java/cn/leancloud/kafka/consumer/PartialSyncCommitPolicy.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
import org.apache.kafka.clients.consumer.Consumer;
44
import org.apache.kafka.common.TopicPartition;
55

6+
import java.time.Duration;
67
import java.util.Collections;
78
import java.util.Set;
89

9-
final class PartialSyncCommitPolicy<K, V> extends AbstractCommitPolicy<K, V> {
10-
PartialSyncCommitPolicy(Consumer<K, V> consumer) {
11-
super(consumer);
10+
final class PartialSyncCommitPolicy<K, V> extends AbstractPartialCommitPolicy<K, V> {
11+
PartialSyncCommitPolicy(Consumer<K, V> consumer, Duration forceWholeCommitInterval) {
12+
super(consumer, forceWholeCommitInterval);
1213
}
1314

1415
@Override
@@ -17,7 +18,7 @@ public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
1718
return Collections.emptySet();
1819
}
1920

20-
consumer.commitSync(completedTopicOffsets);
21+
consumer.commitSync(offsetsToPartialCommit());
2122

2223
final Set<TopicPartition> partitions = getCompletedPartitions(noPendingRecords);
2324
clearCachedCompletedPartitionsRecords(partitions, noPendingRecords);
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package cn.leancloud.kafka.consumer;
2+
3+
import org.apache.kafka.clients.consumer.*;
4+
import org.apache.kafka.common.TopicPartition;
5+
import org.junit.After;
6+
import org.junit.Before;
7+
import org.junit.Test;
8+
9+
import java.time.Duration;
10+
import java.util.*;
11+
12+
import static cn.leancloud.kafka.consumer.TestingUtils.*;
13+
import static java.util.Comparator.comparing;
14+
import static java.util.stream.Collectors.toList;
15+
import static java.util.function.BinaryOperator.maxBy;
16+
import static java.util.stream.IntStream.range;
17+
import static org.assertj.core.api.Assertions.assertThat;
18+
import static org.awaitility.Awaitility.await;
19+
20+
public class AbstractPartialCommitPolicyTest {
21+
private static class TestingPartialCommitPolicy extends AbstractPartialCommitPolicy<Object, Object> {
22+
TestingPartialCommitPolicy(Consumer<Object, Object> consumer, Duration forceWholeCommitInterval) {
23+
super(consumer, forceWholeCommitInterval);
24+
}
25+
26+
@Override
27+
public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
28+
throw new UnsupportedOperationException();
29+
}
30+
}
31+
32+
private MockConsumer<Object, Object> consumer;
33+
private AbstractPartialCommitPolicy<Object, Object> policy;
34+
35+
@Before
36+
public void setUp() throws Exception {
37+
consumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
38+
}
39+
40+
@After
41+
public void tearDown() throws Exception {
42+
consumer.close();
43+
}
44+
45+
@Test
46+
public void testPartialCommit() {
47+
policy = new TestingPartialCommitPolicy(consumer, Duration.ofHours(1));
48+
final List<TopicPartition> partitions = toPartitions(range(0, 30).boxed().collect(toList()));
49+
final List<ConsumerRecord<Object, Object>> pendingRecords = prepareConsumerRecords(partitions, 1, 10);
50+
final List<ConsumerRecord<Object, Object>> completedRecords =
51+
completeRecords(pendingRecords.subList(0, pendingRecords.size() / 2));
52+
53+
final Map<TopicPartition, OffsetAndMetadata> completeOffsets = buildCommitOffsets(completedRecords);
54+
55+
assertThat(policy.offsetsToPartialCommit()).isEqualTo(completeOffsets);
56+
}
57+
58+
@Test
59+
public void testWholeCommit() throws Exception {
60+
policy = new TestingPartialCommitPolicy(consumer, Duration.ofMillis(200));
61+
final Map<TopicPartition, OffsetAndMetadata> previousCommitOffsets =
62+
commitRecords(completeRecords(prepareConsumerRecords(toPartitions(range(0, 10).boxed().collect(toList())), 1, 10)));
63+
64+
policy.offsetsToPartialCommit().clear();
65+
policy.topicOffsetHighWaterMark().clear();
66+
final Map<TopicPartition, OffsetAndMetadata> newOffsetsToCommit =
67+
buildCommitOffsets(completeRecords(prepareConsumerRecords(toPartitions(range(10, 20).boxed().collect(toList())), 1, 10)));
68+
69+
70+
Thread.sleep(200);
71+
assertThat(policy.offsetsToPartialCommit())
72+
.hasSize(previousCommitOffsets.size() + newOffsetsToCommit.size())
73+
.containsAllEntriesOf(previousCommitOffsets)
74+
.containsAllEntriesOf(newOffsetsToCommit);
75+
}
76+
77+
private List<ConsumerRecord<Object, Object>> completeRecords(List<ConsumerRecord<Object, Object>> records) {
78+
for (ConsumerRecord<Object, Object> record : records) {
79+
policy.addPendingRecord(record);
80+
policy.completeRecord(record);
81+
}
82+
return records;
83+
}
84+
85+
private Map<TopicPartition, OffsetAndMetadata> commitRecords(List<ConsumerRecord<Object, Object>> records) {
86+
final Map<TopicPartition, OffsetAndMetadata> offsets = buildCommitOffsets(records);
87+
assignPartitions(consumer, offsets.keySet(), 0L);
88+
fireConsumerRecords(consumer, records);
89+
consumer.poll(0);
90+
consumer.commitSync(offsets);
91+
return offsets;
92+
}
93+
94+
private Map<TopicPartition, OffsetAndMetadata> buildCommitOffsets(List<ConsumerRecord<Object, Object>> records) {
95+
final Map<TopicPartition, OffsetAndMetadata> completeOffsets = new HashMap<>();
96+
for (ConsumerRecord<Object, Object> record : records) {
97+
completeOffsets.merge(new TopicPartition(testingTopic, record.partition()),
98+
new OffsetAndMetadata(record.offset() + 1),
99+
maxBy(comparing(OffsetAndMetadata::offset)));
100+
}
101+
return completeOffsets;
102+
}
103+
}

0 commit comments

Comments
 (0)