Skip to content

Commit 9b5366e

Browse files
authored
Merge pull request #21 from leancloud/fix/consumer-pause-partition-forever
fix partition may pause forever
2 parents 796557f + 2f9a7c1 commit 9b5366e

File tree

6 files changed

+143
-17
lines changed

6 files changed

+143
-17
lines changed

src/main/java/cn/leancloud/kafka/consumer/CompletedOffsets.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,15 @@
66

77
class CompletedOffsets {
88
private final PriorityQueue<Long> outOfOrderQueue;
9-
private long committedOffset;
9+
private long completedOffset;
1010
private long nextOffsetToCommit;
1111

12-
CompletedOffsets(long lastCommittedOffset) {
13-
this.committedOffset = lastCommittedOffset;
14-
this.nextOffsetToCommit = lastCommittedOffset + 1;
12+
CompletedOffsets(long lastCompletedOffset) {
13+
this.completedOffset = lastCompletedOffset;
14+
this.nextOffsetToCommit = lastCompletedOffset + 1;
1515
this.outOfOrderQueue = new PriorityQueue<>();
1616
}
1717

18-
long nextOffsetToCommit() {
19-
return nextOffsetToCommit;
20-
}
21-
2218
void addCompleteOffset(long offset) {
2319
if (offset == nextOffsetToCommit) {
2420
++nextOffsetToCommit;
@@ -33,15 +29,21 @@ void addCompleteOffset(long offset) {
3329
}
3430

3531
boolean hasOffsetToCommit() {
36-
return committedOffset < nextOffsetToCommit - 1;
32+
return completedOffset < nextOffsetToCommit - 1;
3733
}
3834

3935
OffsetAndMetadata getOffsetToCommit() {
36+
assert hasOffsetToCommit();
4037
return new OffsetAndMetadata(nextOffsetToCommit);
4138
}
4239

40+
/**
41+
* Update committed offset, completed offset = committed offset - 1
42+
* @param committedOffset the offset that committed successfully
43+
*/
4344
void updateCommittedOffset(long committedOffset) {
44-
assert committedOffset > this.committedOffset : "old:" + this.committedOffset + " new:" + committedOffset;
45-
this.committedOffset = committedOffset;
45+
assert committedOffset > this.completedOffset : "old:" + this.completedOffset + " new:" + committedOffset;
46+
assert committedOffset <= nextOffsetToCommit : "completedOffset:" + committedOffset + " nextOffsetToCommit:" + nextOffsetToCommit;
47+
this.completedOffset = committedOffset - 1;
4648
}
4749
}

src/main/java/cn/leancloud/kafka/consumer/ProcessRecordsProgress.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,9 @@ boolean noOffsetsToCommit() {
141141
void updateCommittedOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
142142
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
143143
final CompletedOffsets offset = completedOffsets.get(entry.getKey());
144-
offset.updateCommittedOffset(entry.getValue().offset());
144+
if (offset != null) {
145+
offset.updateCommittedOffset(entry.getValue().offset());
146+
}
145147
}
146148
}
147149

@@ -169,7 +171,10 @@ private boolean topicOffsetMeetHighWaterMark(TopicPartition topicPartition, Offs
169171
if (offsetHighWaterMark != null) {
170172
return offset.offset() >= offsetHighWaterMark;
171173
}
172-
// maybe this partition revoked before a msg of this partition was processed
174+
175+
assert !completedOffsets.containsKey(topicPartition) : "partition:" + topicPartition + " completedOffsets:" + completedOffsets;
176+
177+
// topicOffsetHighWaterMark for topicPartition may have been cleared due to like a sync whole commit
173178
return true;
174179
}
175180
}

src/test/java/cn/leancloud/kafka/consumer/CompletedOffsetsTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,9 @@ public void testUpdateCommittedOffset() {
5959
offsets.addCompleteOffset(102);
6060
offsets.addCompleteOffset(103);
6161

62-
offsets.updateCommittedOffset(103);
62+
assertThat(offsets.hasOffsetToCommit()).isTrue();
63+
assertThat(offsets.getOffsetToCommit().offset()).isEqualTo(104);
64+
offsets.updateCommittedOffset(offsets.getOffsetToCommit().offset());
6365
assertThat(offsets.hasOffsetToCommit()).isFalse();
64-
assertThat(offsets.getOffsetToCommit()).isEqualTo(new OffsetAndMetadata(104));
6566
}
6667
}

src/test/java/cn/leancloud/kafka/consumer/ProcessRecordsProgressTest.java

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.junit.Test;
88

99
import java.util.ArrayList;
10+
import java.util.Collections;
1011
import java.util.List;
1112
import java.util.stream.IntStream;
1213

@@ -85,6 +86,31 @@ public void testAddSeveralCompleteRecord() {
8586
.isEqualTo(new OffsetAndMetadata(1002L));
8687
}
8788

89+
@Test
90+
public void testClearAll() {
91+
final List<ConsumerRecord<Object, Object>> records = new ArrayList<>();
92+
records.add(new ConsumerRecord<>(testingTopic, 101, 1001, defaultKey, defaultMsg));
93+
records.add(new ConsumerRecord<>(testingTopic, 102, 1002, defaultKey, defaultMsg));
94+
records.add(new ConsumerRecord<>(testingTopic, 103, 1003, defaultKey, defaultMsg));
95+
records.add(new ConsumerRecord<>(testingTopic, 101, 1004, defaultKey, defaultMsg));
96+
97+
for (ConsumerRecord<Object, Object> record : records) {
98+
progress.markPendingRecord(record);
99+
progress.markCompletedRecord(record);
100+
}
101+
102+
assertThat(progress.noCompletedRecords()).isFalse();
103+
assertThat(progress.noPendingRecords()).isFalse();
104+
progress.clearAll();
105+
assertThat(progress.noCompletedRecords()).isTrue();
106+
assertThat(progress.noPendingRecords()).isTrue();
107+
}
108+
109+
@Test
110+
public void testCompletedOffsetsToCommitOnEmptyCompletedRecords() {
111+
assertThat(progress.completedOffsetsToCommit()).isEmpty();
112+
}
113+
88114
@Test
89115
public void testRevokePartitions() {
90116
final List<ConsumerRecord<Object, Object>> records = new ArrayList<>();
@@ -106,6 +132,98 @@ public void testRevokePartitions() {
106132
.containsValue(new OffsetAndMetadata(1004L));
107133
}
108134

135+
@Test
136+
public void testNoOffsetsToCommit() {
137+
assertThat(progress.noOffsetsToCommit()).isTrue();
138+
}
139+
140+
@Test
141+
public void testNoOffsetsToCommit2() {
142+
final List<ConsumerRecord<Object, Object>> records = new ArrayList<>();
143+
records.add(new ConsumerRecord<>(testingTopic, 101, 1001, defaultKey, defaultMsg));
144+
records.add(new ConsumerRecord<>(testingTopic, 102, 1002, defaultKey, defaultMsg));
145+
records.add(new ConsumerRecord<>(testingTopic, 103, 1003, defaultKey, defaultMsg));
146+
records.add(new ConsumerRecord<>(testingTopic, 101, 1004, defaultKey, defaultMsg));
147+
148+
for (ConsumerRecord<Object, Object> record : records) {
149+
progress.markPendingRecord(record);
150+
}
151+
152+
assertThat(progress.noOffsetsToCommit()).isTrue();
153+
}
154+
155+
@Test
156+
public void testNoOffsetsToCommit3() {
157+
final List<ConsumerRecord<Object, Object>> records = new ArrayList<>();
158+
records.add(new ConsumerRecord<>(testingTopic, 101, 1001, defaultKey, defaultMsg));
159+
records.add(new ConsumerRecord<>(testingTopic, 102, 1002, defaultKey, defaultMsg));
160+
records.add(new ConsumerRecord<>(testingTopic, 103, 1003, defaultKey, defaultMsg));
161+
records.add(new ConsumerRecord<>(testingTopic, 101, 1004, defaultKey, defaultMsg));
162+
163+
for (ConsumerRecord<Object, Object> record : records) {
164+
progress.markPendingRecord(record);
165+
}
166+
167+
progress.markCompletedRecord(new ConsumerRecord<>(testingTopic, 101, 1001, defaultKey, defaultMsg));
168+
assertThat(progress.noOffsetsToCommit()).isFalse();
169+
}
170+
171+
@Test
172+
public void testUpdateCommittedOffsets() {
173+
final List<ConsumerRecord<Object, Object>> records = new ArrayList<>();
174+
records.add(new ConsumerRecord<>(testingTopic, 101, 1001, defaultKey, defaultMsg));
175+
records.add(new ConsumerRecord<>(testingTopic, 102, 1002, defaultKey, defaultMsg));
176+
records.add(new ConsumerRecord<>(testingTopic, 103, 1003, defaultKey, defaultMsg));
177+
178+
for (ConsumerRecord<Object, Object> record : records) {
179+
progress.markPendingRecord(record);
180+
progress.markCompletedRecord(record);
181+
}
182+
183+
assertThat(progress.completedOffsetsToCommit()).hasSize(3);
184+
progress.updateCommittedOffsets(buildCommitOffsets(records));
185+
assertThat(progress.completedOffsetsToCommit()).isEmpty();
186+
}
187+
188+
@Test
189+
public void testUpdateCommittedOffsetsWithoutProgress() {
190+
final List<ConsumerRecord<Object, Object>> records = new ArrayList<>();
191+
records.add(new ConsumerRecord<>(testingTopic, 101, 1001, defaultKey, defaultMsg));
192+
records.add(new ConsumerRecord<>(testingTopic, 102, 1002, defaultKey, defaultMsg));
193+
records.add(new ConsumerRecord<>(testingTopic, 103, 1003, defaultKey, defaultMsg));
194+
195+
for (ConsumerRecord<Object, Object> record : records) {
196+
progress.markPendingRecord(record);
197+
progress.markCompletedRecord(record);
198+
}
199+
200+
progress.clearFor(Collections.singletonList(new TopicPartition(testingTopic, 101)));
201+
assertThat(progress.completedOffsetsToCommit()).hasSize(2);
202+
progress.updateCommittedOffsets(buildCommitOffsets(records));
203+
assertThat(progress.completedOffsetsToCommit()).isEmpty();
204+
}
205+
206+
@Test
207+
public void testCompletedPartitionsWithoutProgress() {
208+
final List<ConsumerRecord<Object, Object>> records = new ArrayList<>();
209+
records.add(new ConsumerRecord<>(testingTopic, 101, 1001, defaultKey, defaultMsg));
210+
records.add(new ConsumerRecord<>(testingTopic, 102, 1002, defaultKey, defaultMsg));
211+
records.add(new ConsumerRecord<>(testingTopic, 103, 1003, defaultKey, defaultMsg));
212+
213+
for (ConsumerRecord<Object, Object> record : records) {
214+
progress.markPendingRecord(record);
215+
progress.markCompletedRecord(record);
216+
}
217+
218+
progress.markPendingRecord(new ConsumerRecord<>(testingTopic, 102, 1003, defaultKey, defaultMsg));
219+
220+
progress.clearFor(Collections.singletonList(new TopicPartition(testingTopic, 101)));
221+
assertThat(progress.completedPartitions(buildCommitOffsets(records)))
222+
.hasSize(2)
223+
.contains(new TopicPartition(testingTopic, 101),
224+
new TopicPartition(testingTopic, 103));
225+
}
226+
109227
private TopicPartition partition(int partition) {
110228
return new TopicPartition(testingTopic, partition);
111229
}

src/test/java/cn/leancloud/kafka/consumer/integration/JoinLeaveGroupTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public void runTest(TestContext cxt, TestStatistics statistics) throws Exception
4545
statistics.recordTotalSent(totalSent);
4646
await().atMost(10, SECONDS)
4747
.pollInterval(1, SECONDS)
48-
.until(() -> statistics.getReceiveRecordsCount() == totalSent);
48+
.until(() -> statistics.getReceiveRecordsCount() == statistics.getTotalSentCount());
4949
} finally {
5050
lingerConsumer.close();
5151
}

src/test/java/cn/leancloud/kafka/consumer/integration/TestRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public class TestRunner implements Closeable {
1616

1717
TestRunner(String topic, ConsumerFactory factory) {
1818
this.factory = factory;
19-
this.producer = new TestingProducer(Duration.ofMillis(100), 4);
19+
this.producer = new TestingProducer(Duration.ofMillis(100), 2);
2020
this.context = new TestContext(topic, producer, factory);
2121
}
2222

0 commit comments

Comments
 (0)