Skip to content

Commit 9541107

Browse files
authored
Merge pull request #16 from leancloud/fix/partial-commit
fix partial commit
2 parents 28b22a8 + f65f24b commit 9541107

27 files changed

+1032
-598
lines changed

src/main/java/cn/leancloud/kafka/consumer/AbstractCommitPolicy.java

Lines changed: 105 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,8 @@
77
import org.apache.kafka.common.errors.RetriableException;
88

99
import java.time.Duration;
10-
import java.util.HashMap;
11-
import java.util.HashSet;
12-
import java.util.Map;
13-
import java.util.Set;
10+
import java.util.*;
1411

15-
import static java.util.Comparator.comparing;
16-
import static java.util.function.BinaryOperator.maxBy;
1712
import static java.util.stream.Collectors.toSet;
1813

1914
abstract class AbstractCommitPolicy<K, V> implements CommitPolicy<K, V> {
@@ -49,81 +44,118 @@ void onError(RetriableException e) {
4944
}
5045
}
5146

52-
final Map<TopicPartition, Long> topicOffsetHighWaterMark;
53-
final Map<TopicPartition, OffsetAndMetadata> completedTopicOffsets;
5447
protected final Consumer<K, V> consumer;
48+
private final Map<TopicPartition, Long> topicOffsetHighWaterMark;
49+
private final Map<TopicPartition, CompletedOffsets> completedOffsets;
5550
private final long syncCommitRetryIntervalMs;
5651
private final int maxAttemptsForEachSyncCommit;
5752

5853
AbstractCommitPolicy(Consumer<K, V> consumer, Duration syncCommitRetryInterval, int maxAttemptsForEachSyncCommit) {
5954
this.consumer = consumer;
6055
this.topicOffsetHighWaterMark = new HashMap<>();
61-
this.completedTopicOffsets = new HashMap<>();
56+
this.completedOffsets = new HashMap<>();
6257
this.syncCommitRetryIntervalMs = syncCommitRetryInterval.toMillis();
6358
this.maxAttemptsForEachSyncCommit = maxAttemptsForEachSyncCommit;
6459
}
6560

6661
@Override
6762
public void markPendingRecord(ConsumerRecord<K, V> record) {
63+
final TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
6864
topicOffsetHighWaterMark.merge(
69-
new TopicPartition(record.topic(), record.partition()),
65+
topicPartition,
7066
record.offset() + 1,
7167
Math::max);
68+
69+
final CompletedOffsets offset = completedOffsets.get(topicPartition);
70+
// please note that if offset exists, it could happen for record.offset() >= offset.nextOffsetToCommit()
71+
// when there're duplicate records which have lower offset than our next offset to commit consumed from broker
72+
if (offset == null) {
73+
completedOffsets.put(topicPartition, new CompletedOffsets(record.offset() - 1L));
74+
}
7275
}
7376

7477
@Override
7578
public void markCompletedRecord(ConsumerRecord<K, V> record) {
76-
completedTopicOffsets.merge(
77-
new TopicPartition(record.topic(), record.partition()),
78-
new OffsetAndMetadata(record.offset() + 1L),
79-
maxBy(comparing(OffsetAndMetadata::offset)));
79+
final CompletedOffsets offset = completedOffsets.get(new TopicPartition(record.topic(), record.partition()));
80+
// offset could be null, when the partition of the record was revoked before its processing was done
81+
if (offset != null) {
82+
offset.addCompleteOffset(record.offset());
83+
}
8084
}
8185

8286
@Override
83-
public Set<TopicPartition> syncPartialCommit() {
84-
commitSync(completedTopicOffsets);
85-
final Set<TopicPartition> partitions = checkCompletedPartitions();
86-
completedTopicOffsets.clear();
87-
for (TopicPartition p : partitions) {
88-
topicOffsetHighWaterMark.remove(p);
89-
}
90-
return partitions;
87+
public void revokePartitions(Collection<TopicPartition> partitions) {
88+
clearProcessingRecordStatesFor(partitions);
9189
}
9290

93-
Set<TopicPartition> getCompletedPartitions(boolean noPendingRecords) {
94-
final Set<TopicPartition> partitions;
95-
if (noPendingRecords) {
96-
assert checkCompletedPartitions().equals(topicOffsetHighWaterMark.keySet())
97-
: "expect: " + checkCompletedPartitions() + " actual: " + topicOffsetHighWaterMark.keySet();
98-
partitions = new HashSet<>(topicOffsetHighWaterMark.keySet());
99-
} else {
100-
partitions = checkCompletedPartitions();
91+
@Override
92+
public Set<TopicPartition> partialCommitSync() {
93+
final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = completedTopicOffsetsToCommit();
94+
if (offsetsToCommit.isEmpty()) {
95+
return Collections.emptySet();
10196
}
102-
return partitions;
97+
commitSyncWithRetry(offsetsToCommit);
98+
updatePartialCommittedOffsets(offsetsToCommit);
99+
100+
return clearProcessingRecordStatesForCompletedPartitions(offsetsToCommit);
103101
}
104102

105-
void clearCachedCompletedPartitionsRecords(Set<TopicPartition> completedPartitions, boolean noPendingRecords) {
106-
completedTopicOffsets.clear();
107-
if (noPendingRecords) {
108-
topicOffsetHighWaterMark.clear();
109-
} else {
110-
for (TopicPartition p : completedPartitions) {
111-
topicOffsetHighWaterMark.remove(p);
112-
}
113-
}
103+
Set<TopicPartition> fullCommitSync() {
104+
commitSyncWithRetry();
105+
106+
final Set<TopicPartition> completePartitions = partitionsForAllRecordsStates();
107+
clearAllProcessingRecordStates();
108+
return completePartitions;
114109
}
115110

116111
@VisibleForTesting
117112
Map<TopicPartition, Long> topicOffsetHighWaterMark() {
118113
return topicOffsetHighWaterMark;
119114
}
120115

121-
@VisibleForTesting
122-
Map<TopicPartition, OffsetAndMetadata> completedTopicOffsets() {
123-
return completedTopicOffsets;
116+
Map<TopicPartition, OffsetAndMetadata> completedTopicOffsetsToCommit() {
117+
if (noCompletedOffsets()) {
118+
return Collections.emptyMap();
119+
}
120+
121+
final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
122+
for (Map.Entry<TopicPartition, CompletedOffsets> entry : completedOffsets.entrySet()) {
123+
final CompletedOffsets offset = entry.getValue();
124+
if (offset.hasOffsetToCommit()) {
125+
offsets.put(entry.getKey(), offset.getOffsetToCommit());
126+
}
127+
}
128+
129+
return offsets;
130+
}
131+
132+
boolean noTopicOffsetsToCommit() {
133+
if (noCompletedOffsets()) {
134+
return true;
135+
}
136+
137+
for (Map.Entry<TopicPartition, CompletedOffsets> entry : completedOffsets.entrySet()) {
138+
final CompletedOffsets offset = entry.getValue();
139+
if (offset.hasOffsetToCommit()) {
140+
return false;
141+
}
142+
}
143+
144+
return true;
145+
}
146+
147+
void updatePartialCommittedOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
148+
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
149+
final CompletedOffsets offset = completedOffsets.get(entry.getKey());
150+
offset.updateCommittedOffset(entry.getValue().offset());
151+
}
152+
}
153+
154+
boolean noCompletedOffsets() {
155+
return completedOffsets.isEmpty();
124156
}
125157

126-
void commitSync() {
158+
void commitSyncWithRetry() {
127159
final RetryContext context = context();
128160
do {
129161
try {
@@ -135,7 +167,7 @@ void commitSync() {
135167
} while (true);
136168
}
137169

138-
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
170+
void commitSyncWithRetry(Map<TopicPartition, OffsetAndMetadata> offsets) {
139171
final RetryContext context = context();
140172
do {
141173
try {
@@ -147,8 +179,34 @@ void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
147179
} while (true);
148180
}
149181

150-
private Set<TopicPartition> checkCompletedPartitions() {
151-
return completedTopicOffsets
182+
Set<TopicPartition> partitionsForAllRecordsStates() {
183+
return new HashSet<>(topicOffsetHighWaterMark.keySet());
184+
}
185+
186+
void clearAllProcessingRecordStates() {
187+
topicOffsetHighWaterMark.clear();
188+
completedOffsets.clear();
189+
}
190+
191+
Set<TopicPartition> clearProcessingRecordStatesForCompletedPartitions(Map<TopicPartition, OffsetAndMetadata> committedOffsets) {
192+
final Set<TopicPartition> partitions = partitionsToSafeResume(committedOffsets);
193+
clearProcessingRecordStatesFor(partitions);
194+
return partitions;
195+
}
196+
197+
void clearProcessingRecordStatesFor(Collection<TopicPartition> partitions) {
198+
for (TopicPartition p : partitions) {
199+
topicOffsetHighWaterMark.remove(p);
200+
completedOffsets.remove(p);
201+
}
202+
}
203+
204+
Set<TopicPartition> partitionsToSafeResume() {
205+
return partitionsToSafeResume(completedTopicOffsetsToCommit());
206+
}
207+
208+
Set<TopicPartition> partitionsToSafeResume(Map<TopicPartition, OffsetAndMetadata> completedOffsets) {
209+
return completedOffsets
152210
.entrySet()
153211
.stream()
154212
.filter(entry -> topicOffsetMeetHighWaterMark(entry.getKey(), entry.getValue()))

src/main/java/cn/leancloud/kafka/consumer/AbstractRecommitAwareCommitPolicy.java

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.time.Duration;
88
import java.util.HashMap;
99
import java.util.Map;
10+
import java.util.Set;
1011

1112
abstract class AbstractRecommitAwareCommitPolicy<K, V> extends AbstractCommitPolicy<K, V> {
1213
private final Duration recommitInterval;
@@ -21,34 +22,46 @@ abstract class AbstractRecommitAwareCommitPolicy<K, V> extends AbstractCommitPol
2122
updateNextRecommitTime(System.nanoTime());
2223
}
2324

24-
Map<TopicPartition, OffsetAndMetadata> offsetsForRecommit() {
25-
assert needRecommit() : "current nanos: " + System.nanoTime() + " nextRecommitNanos:" + nextRecommitNanos;
26-
27-
final Map<TopicPartition, OffsetAndMetadata> ret = new HashMap<>(completedTopicOffsets);
28-
for (TopicPartition partition : consumer.assignment()) {
29-
final OffsetAndMetadata offset = consumer.committed(partition);
30-
if (offset != null) {
31-
ret.putIfAbsent(partition, offset);
32-
}
25+
@Override
26+
public final Set<TopicPartition> tryCommit(boolean noPendingRecords) {
27+
if (needRecommit()) {
28+
commitSyncWithRetry(offsetsForRecommit());
29+
updateNextRecommitTime();
3330
}
34-
35-
return ret;
31+
return tryCommit0(noPendingRecords);
3632
}
3733

38-
boolean needRecommit() {
39-
return System.nanoTime() >= nextRecommitNanos;
40-
}
34+
abstract Set<TopicPartition> tryCommit0(boolean noPendingRecords);
4135

4236
void updateNextRecommitTime() {
4337
updateNextRecommitTime(System.nanoTime());
4438
}
4539

40+
@VisibleForTesting
41+
void updateNextRecommitTime(long currentNanos) {
42+
nextRecommitNanos = currentNanos + recommitInterval.toNanos();
43+
}
44+
4645
@VisibleForTesting
4746
long nextRecommitNanos() {
4847
return nextRecommitNanos;
4948
}
5049

51-
private void updateNextRecommitTime(long currentNanos) {
52-
nextRecommitNanos = currentNanos + recommitInterval.toNanos();
50+
private boolean needRecommit() {
51+
return System.nanoTime() >= nextRecommitNanos;
52+
}
53+
54+
private Map<TopicPartition, OffsetAndMetadata> offsetsForRecommit() {
55+
assert needRecommit() : "current nanos: " + System.nanoTime() + " nextRecommitNanos:" + nextRecommitNanos;
56+
57+
final Map<TopicPartition, OffsetAndMetadata> ret = new HashMap<>();
58+
for (TopicPartition partition : consumer.assignment()) {
59+
final OffsetAndMetadata offset = consumer.committed(partition);
60+
if (offset != null) {
61+
ret.putIfAbsent(partition, offset);
62+
}
63+
}
64+
65+
return ret;
5366
}
5467
}

src/main/java/cn/leancloud/kafka/consumer/AsyncCommitPolicy.java

Lines changed: 18 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@
88
import org.slf4j.LoggerFactory;
99

1010
import java.time.Duration;
11-
import java.util.Collections;
12-
import java.util.HashSet;
1311
import java.util.Map;
1412
import java.util.Set;
1513

14+
import static java.util.Collections.emptySet;
15+
1616
final class AsyncCommitPolicy<K, V> extends AbstractRecommitAwareCommitPolicy<K, V> {
1717
private static final Logger logger = LoggerFactory.getLogger(AsyncCommitPolicy.class);
1818

@@ -32,21 +32,21 @@ final class AsyncCommitPolicy<K, V> extends AbstractRecommitAwareCommitPolicy<K,
3232
}
3333

3434
@Override
35-
public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
36-
if (!noPendingRecords || completedTopicOffsets.isEmpty()) {
37-
if (needRecommit()) {
38-
commit(offsetsForRecommit());
39-
}
40-
return Collections.emptySet();
35+
Set<TopicPartition> tryCommit0(boolean noPendingRecords) {
36+
// with forceSync mark it means a previous async commit was failed, so
37+
// we do a sync commit no matter if there's any pending records or completed offsets
38+
if (!forceSync && (!noPendingRecords || noTopicOffsetsToCommit())) {
39+
return emptySet();
4140
}
4241

42+
final Set<TopicPartition> partitions = partitionsForAllRecordsStates();
4343
commit();
4444

45-
final Set<TopicPartition> partitions = new HashSet<>(completedTopicOffsets.keySet());
46-
// it's OK to clear these collections here and we will not left any complete offset without commit even
47-
// when this async commit failed because if the async commit failed we will do a sync commit after all
48-
completedTopicOffsets.clear();
49-
topicOffsetHighWaterMark.clear();
45+
// for our commit policy, no matter syncCommit or asyncCommit we are using, we always
46+
// commit all assigned offsets, so we can update recommit time here safely. And
47+
// we don't mind that if the async commit request failed, we tolerate this situation
48+
updateNextRecommitTime();
49+
5050
return partitions;
5151
}
5252

@@ -60,44 +60,15 @@ boolean forceSync() {
6060
return forceSync;
6161
}
6262

63-
@VisibleForTesting
64-
void setForceSync(boolean forceSync) {
65-
this.forceSync = forceSync;
66-
}
67-
6863
private void commit() {
69-
commit(Collections.emptyMap());
70-
}
71-
72-
private void commit(Map<TopicPartition, OffsetAndMetadata> offsets) {
7364
if (forceSync || pendingAsyncCommitCounter >= maxPendingAsyncCommits) {
74-
syncCommit(offsets);
65+
commitSyncWithRetry();
7566
pendingAsyncCommitCounter = 0;
7667
forceSync = false;
68+
clearAllProcessingRecordStates();
7769
} else {
78-
asyncCommit(offsets);
79-
}
80-
81-
// for our commit policy, no matter syncCommit or asyncCommit we use, we always
82-
// commit all assigned offsets, so we can update recommit time here safely. And
83-
// we don't mind that if the async commit request failed, we tolerate this situation
84-
updateNextRecommitTime();
85-
}
86-
87-
private void asyncCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
88-
++pendingAsyncCommitCounter;
89-
if (offsets.isEmpty()) {
70+
++pendingAsyncCommitCounter;
9071
consumer.commitAsync(callback);
91-
} else {
92-
consumer.commitAsync(offsets, callback);
93-
}
94-
}
95-
96-
private void syncCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
97-
if (offsets.isEmpty()) {
98-
commitSync();
99-
} else {
100-
commitSync(offsets);
10172
}
10273
}
10374

@@ -109,6 +80,8 @@ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception
10980
if (exception != null) {
11081
logger.warn("Failed to commit offsets: " + offsets + " asynchronously", exception);
11182
forceSync = true;
83+
} else {
84+
clearProcessingRecordStatesForCompletedPartitions(offsets);
11285
}
11386
}
11487
}

0 commit comments

Comments
 (0)