Skip to content

Commit eb39c30

Browse files
authored
Merge pull request #13 from leancloud/fix/async-policy
fix bugs on async commit
2 parents ff21d1a + f5ccbe3 commit eb39c30

17 files changed

+177
-214
lines changed

src/main/java/cn/leancloud/kafka/consumer/AbstractCommitPolicy.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,23 @@ abstract class AbstractCommitPolicy<K, V> implements CommitPolicy<K, V> {
2626
}
2727

2828
@Override
29-
public void addPendingRecord(ConsumerRecord<K, V> record) {
29+
public void markPendingRecord(ConsumerRecord<K, V> record) {
3030
topicOffsetHighWaterMark.merge(
3131
new TopicPartition(record.topic(), record.partition()),
3232
record.offset() + 1,
3333
Math::max);
3434
}
3535

3636
@Override
37-
public void completeRecord(ConsumerRecord<K, V> record) {
37+
public void markCompletedRecord(ConsumerRecord<K, V> record) {
3838
completedTopicOffsets.merge(
3939
new TopicPartition(record.topic(), record.partition()),
4040
new OffsetAndMetadata(record.offset() + 1L),
4141
maxBy(comparing(OffsetAndMetadata::offset)));
4242
}
4343

4444
@Override
45-
public Set<TopicPartition> partialCommit() {
45+
public Set<TopicPartition> syncPartialCommit() {
4646
consumer.commitSync(completedTopicOffsets);
4747
final Set<TopicPartition> partitions = checkCompletedPartitions();
4848
completedTopicOffsets.clear();

src/main/java/cn/leancloud/kafka/consumer/AbstractPartialCommitPolicy.java

Lines changed: 0 additions & 22 deletions
This file was deleted.

src/main/java/cn/leancloud/kafka/consumer/AsyncCommitPolicy.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,10 @@ private void commit(Map<TopicPartition, OffsetAndMetadata> offsets) {
7373
} else {
7474
asyncCommit(offsets);
7575
}
76-
// update next recommit time even if async commit failed, we tolerate this situation
76+
77+
// for our commit policy, no matter syncCommit or asyncCommit we use, we always
78+
// commit all assigned offsets, so we can update recommit time here safely. And
79+
// we don't mind that if the async commit request failed, we tolerate this situation
7780
updateNextRecommitTime();
7881
}
7982

src/main/java/cn/leancloud/kafka/consumer/CommitPolicy.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,42 @@
66
import java.util.Set;
77

88
interface CommitPolicy<K, V> {
9-
void addPendingRecord(ConsumerRecord<K, V> record);
9+
/**
10+
* Mark an {@link ConsumerRecord} as pending before processing it. So {@link CommitPolicy} can know which and
11+
* how many records we need to process. It is called by {@link Fetcher} when {@code Fetcher} fetched any
12+
* {@link ConsumerRecord}s from Broker.
13+
*
14+
* @param record the {@link ConsumerRecord} need to process
15+
*/
16+
void markPendingRecord(ConsumerRecord<K, V> record);
1017

11-
void completeRecord(ConsumerRecord<K, V> record);
18+
/**
19+
* Mark an {@link ConsumerRecord} as completed after processing it. So {@link CommitPolicy} can know which and
20+
* how many records we have processed. It is called by {@link Fetcher} when {@code Fetcher} make sure that
21+
* a {@code ConsumerRecord} was processed successfully.
22+
*
23+
* @param record the {@link ConsumerRecord} processed
24+
*/
25+
void markCompletedRecord(ConsumerRecord<K, V> record);
1226

27+
/**
28+
* Try commit offset for any {@link TopicPartition}s which has processed {@link ConsumerRecord}s based on the
29+
* intrinsic policy of this {@link CommitPolicy}. This method is called whenever there're any
30+
* {@link ConsumerRecord} processed.
31+
*
32+
* @param noPendingRecords is there any pending records which have not been processed. Though {@link CommitPolicy}
33+
* can calculate this value by itself, we still pass this value as {@link Fetcher} can
34+
* calculate this value much quicker
35+
* @return those {@link TopicPartition}s which have no pending {@code ConsumerRecord}s
36+
*/
1337
Set<TopicPartition> tryCommit(boolean noPendingRecords);
1438

15-
Set<TopicPartition> partialCommit();
39+
/**
40+
* Do a dedicated partition commit synchronously which only commit those {@link ConsumerRecord}s that have
41+
* processed but have not been committed yet. Usually it is called when {@link LcKafkaConsumer} is about to
42+
* shutdown or when some partitions was revoked.
43+
*
44+
* @return those {@link TopicPartition}s which have no pending {@code ConsumerRecord}s
45+
*/
46+
Set<TopicPartition> syncPartialCommit();
1647
}

src/main/java/cn/leancloud/kafka/consumer/Fetcher.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,6 @@ static class TimeoutFuture<K, V> implements Future<ConsumerRecord<K, V>> {
2525
private final long timeoutAtNanos;
2626
private final Time time;
2727

28-
TimeoutFuture(Future<ConsumerRecord<K, V>> wrappedFuture) {
29-
this(wrappedFuture, Long.MAX_VALUE);
30-
}
31-
3228
TimeoutFuture(Future<ConsumerRecord<K, V>> wrappedFuture, long timeoutInNanos) {
3329
this(wrappedFuture, timeoutInNanos, Time.SYSTEM);
3430
}
@@ -193,13 +189,13 @@ private void dispatchFetchedRecords(ConsumerRecords<K, V> records) {
193189
return record;
194190
});
195191
pendingFutures.put(record, timeoutAwareFuture(future));
196-
policy.addPendingRecord(record);
192+
policy.markPendingRecord(record);
197193
}
198194
}
199195

200-
private TimeoutFuture<K, V> timeoutAwareFuture(Future<ConsumerRecord<K, V>> future) {
196+
private Future<ConsumerRecord<K, V>> timeoutAwareFuture(Future<ConsumerRecord<K, V>> future) {
201197
if (unlimitedHandleRecordTime()) {
202-
return new TimeoutFuture<>(future);
198+
return future;
203199
} else {
204200
return new TimeoutFuture<>(future, handleRecordTimeoutNanos);
205201
}
@@ -219,7 +215,7 @@ assert record != null;
219215
assert !future.isCancelled();
220216
final Future<ConsumerRecord<K, V>> v = pendingFutures.remove(record);
221217
assert v != null;
222-
policy.completeRecord(record);
218+
policy.markCompletedRecord(record);
223219
}
224220

225221
private void processTimeoutRecords() throws TimeoutException {
@@ -228,6 +224,7 @@ private void processTimeoutRecords() throws TimeoutException {
228224
}
229225

230226
for (Map.Entry<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>> entry : pendingFutures.entrySet()) {
227+
// we can sure that this conversion must be success
231228
final TimeoutFuture<K, V> future = (TimeoutFuture<K, V>) entry.getValue();
232229
if (future.timeout()) {
233230
future.cancel(false);
@@ -262,7 +259,7 @@ private void gracefulShutdown(UnsubscribedStatus unsubscribedStatus) {
262259
long shutdownTimeout = 0L;
263260
try {
264261
shutdownTimeout = waitPendingFuturesDone();
265-
policy.partialCommit();
262+
policy.syncPartialCommit();
266263
pendingFutures.clear();
267264
} catch (Exception ex) {
268265
logger.error("Graceful shutdown got unexpected exception", ex);
@@ -285,7 +282,7 @@ private long waitPendingFuturesDone() {
285282
assert remain >= 0;
286283
final ConsumerRecord<K, V> record = future.get(remain, TimeUnit.MILLISECONDS);
287284
assert record != null;
288-
policy.completeRecord(record);
285+
policy.markCompletedRecord(record);
289286
} catch (TimeoutException ex) {
290287
future.cancel(false);
291288
} catch (InterruptedException ex) {

src/main/java/cn/leancloud/kafka/consumer/NoOpCommitPolicy.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ static <K, V> NoOpCommitPolicy<K, V> getInstance() {
1515
}
1616

1717
@Override
18-
public void addPendingRecord(ConsumerRecord<K, V> record) {
18+
public void markPendingRecord(ConsumerRecord<K, V> record) {
1919

2020
}
2121

2222
@Override
23-
public void completeRecord(ConsumerRecord<K, V> record) {
23+
public void markCompletedRecord(ConsumerRecord<K, V> record) {
2424

2525
}
2626

@@ -30,7 +30,7 @@ public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
3030
}
3131

3232
@Override
33-
public Set<TopicPartition> partialCommit() {
33+
public Set<TopicPartition> syncPartialCommit() {
3434
return Collections.emptySet();
3535
}
3636
}

src/main/java/cn/leancloud/kafka/consumer/PartialAsyncCommitPolicy.java

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,39 +10,42 @@
1010
import java.time.Duration;
1111
import java.util.*;
1212

13-
final class PartialAsyncCommitPolicy<K, V> extends AbstractPartialCommitPolicy<K, V> {
13+
final class PartialAsyncCommitPolicy<K, V> extends AbstractRecommitAwareCommitPolicy<K, V> {
1414
private static final Logger logger = LoggerFactory.getLogger(PartialAsyncCommitPolicy.class);
1515

1616
private final int maxPendingAsyncCommits;
1717
private final OffsetCommitCallback callback;
18+
private final Map<TopicPartition, OffsetAndMetadata> pendingAsyncCommitOffset;
1819
private int pendingAsyncCommitCounter;
1920
private boolean forceSync;
2021

2122
PartialAsyncCommitPolicy(Consumer<K, V> consumer, Duration forceWholeCommitInterval, int maxPendingAsyncCommits) {
2223
super(consumer, forceWholeCommitInterval);
2324
this.maxPendingAsyncCommits = maxPendingAsyncCommits;
2425
this.callback = new AsyncCommitCallback();
26+
this.pendingAsyncCommitOffset = new HashMap<>();
2527
}
2628

2729
@Override
2830
public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
29-
final Map<TopicPartition, OffsetAndMetadata> offsets = offsetsForPartialCommit();
31+
final boolean syncCommit = forceSync || pendingAsyncCommitCounter >= maxPendingAsyncCommits;
32+
final Map<TopicPartition, OffsetAndMetadata> offsets = offsetsForPartialCommit(syncCommit);
33+
3034
if (offsets.isEmpty()) {
3135
return Collections.emptySet();
3236
} else {
3337
final Set<TopicPartition> partitions = getCompletedPartitions(noPendingRecords);
34-
if (forceSync || pendingAsyncCommitCounter >= maxPendingAsyncCommits) {
38+
if (syncCommit) {
3539
consumer.commitSync(offsets);
40+
pendingAsyncCommitOffset.clear();
3641
pendingAsyncCommitCounter = 0;
3742
forceSync = false;
3843
clearCachedCompletedPartitionsRecords(partitions, noPendingRecords);
3944
} else {
4045
++pendingAsyncCommitCounter;
4146
consumer.commitAsync(offsets, callback);
47+
pendingAsyncCommitOffset.putAll(offsets);
4248
}
43-
44-
// update next recommit time even if async commit failed, we tolerate this situation
45-
updateNextRecommitTime();
4649
return partitions;
4750
}
4851
}
@@ -57,6 +60,23 @@ boolean forceSync() {
5760
return forceSync;
5861
}
5962

63+
private Map<TopicPartition, OffsetAndMetadata> offsetsForPartialCommit(boolean syncCommit) {
64+
final Map<TopicPartition, OffsetAndMetadata> offsets;
65+
if (needRecommit()) {
66+
offsets = offsetsForRecommit();
67+
// we tolerate the commit failure when using async commit
68+
updateNextRecommitTime();
69+
} else if (syncCommit) {
70+
offsets = completedTopicOffsets;
71+
} else {
72+
offsets = new HashMap<>(completedTopicOffsets);
73+
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : pendingAsyncCommitOffset.entrySet()) {
74+
offsets.remove(entry.getKey(), entry.getValue());
75+
}
76+
}
77+
return offsets;
78+
}
79+
6080
private class AsyncCommitCallback implements OffsetCommitCallback {
6181
@Override
6282
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
@@ -68,11 +88,12 @@ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception
6888
logger.warn("Failed to commit offset: " + offsets + " asynchronously", exception);
6989
forceSync = true;
7090
} else {
71-
final Map<TopicPartition, OffsetAndMetadata> completeOffsets =
72-
offsets == completedTopicOffsets ? new HashMap<>(offsets) : offsets;
73-
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : completeOffsets.entrySet()) {
74-
completedTopicOffsets.remove(entry.getKey(), entry.getValue());
91+
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
7592
topicOffsetHighWaterMark.remove(entry.getKey(), entry.getValue().offset());
93+
// we don't clear pendingAsyncCommitOffset here
94+
// because they are cleared on sync commit which occurs every maxPendingAsyncCommits async commits
95+
// todo: maybe we don't need to clean completedTopicOffsets too
96+
completedTopicOffsets.remove(entry.getKey(), entry.getValue());
7697
}
7798
}
7899
}

src/main/java/cn/leancloud/kafka/consumer/PartialSyncCommitPolicy.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import java.util.Map;
1010
import java.util.Set;
1111

12-
final class PartialSyncCommitPolicy<K, V> extends AbstractPartialCommitPolicy<K, V> {
12+
final class PartialSyncCommitPolicy<K, V> extends AbstractRecommitAwareCommitPolicy<K, V> {
1313
PartialSyncCommitPolicy(Consumer<K, V> consumer, Duration forceWholeCommitInterval) {
1414
super(consumer, forceWholeCommitInterval);
1515
}
@@ -19,7 +19,6 @@ public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
1919
final Map<TopicPartition, OffsetAndMetadata> offsets = offsetsForPartialCommit();
2020
if (!offsets.isEmpty()) {
2121
consumer.commitSync(offsets);
22-
updateNextRecommitTime();
2322
}
2423

2524
if (completedTopicOffsets.isEmpty()) {
@@ -30,4 +29,14 @@ public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
3029
return partitions;
3130
}
3231
}
32+
33+
private Map<TopicPartition, OffsetAndMetadata> offsetsForPartialCommit() {
34+
if (needRecommit()) {
35+
final Map<TopicPartition, OffsetAndMetadata> offsets = offsetsForRecommit();
36+
updateNextRecommitTime();
37+
return offsets;
38+
} else {
39+
return completedTopicOffsets;
40+
}
41+
}
3342
}

src/main/java/cn/leancloud/kafka/consumer/RebalanceListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
3636
pausedPartitions = consumer.paused();
3737
if (!pausedPartitions.isEmpty()) {
3838
pausedPartitions = new HashSet<>(pausedPartitions);
39-
pausedPartitions.removeAll(policy.partialCommit());
39+
pausedPartitions.removeAll(policy.syncPartialCommit());
4040
}
4141
}
4242

0 commit comments

Comments
 (0)