Skip to content

Commit d717864

Browse files
committed
(feat) clear progress based on assigned partitions in consumer
1 parent 4fac789 commit d717864

File tree

6 files changed

+34
-30
lines changed

6 files changed

+34
-30
lines changed

src/main/java/cn/leancloud/kafka/consumer/AbstractRecommitAwareCommitPolicy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ private Map<TopicPartition, OffsetAndMetadata> offsetsForRecommit() {
5858
for (TopicPartition partition : consumer.assignment()) {
5959
final OffsetAndMetadata offset = consumer.committed(partition);
6060
if (offset != null) {
61-
ret.putIfAbsent(partition, offset);
61+
ret.put(partition, offset);
6262
}
6363
}
6464

src/main/java/cn/leancloud/kafka/consumer/RebalanceListener.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ final class RebalanceListener<K, V> implements ConsumerRebalanceListener {
2222
private final ProcessRecordsProgress progress;
2323
private final Set<TopicPartition> knownPartitions;
2424
private final ConsumerSeekDestination forceSeekTo;
25-
private Set<TopicPartition> lastRevokedPartitions;
2625
private Set<TopicPartition> pausedPartitions;
2726

2827
RebalanceListener(Consumer<K, V> consumer, ProcessRecordsProgress progress, CommitPolicy policy, ConsumerSeekDestination forceSeekTo) {
@@ -32,12 +31,10 @@ final class RebalanceListener<K, V> implements ConsumerRebalanceListener {
3231
this.pausedPartitions = Collections.emptySet();
3332
this.knownPartitions = new HashSet<>();
3433
this.forceSeekTo = forceSeekTo;
35-
this.lastRevokedPartitions = new HashSet<>();
3634
}
3735

3836
@Override
3937
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
40-
lastRevokedPartitions = new HashSet<>(partitions);
4138
pausedPartitions = consumer.paused();
4239
if (!pausedPartitions.isEmpty()) {
4340
pausedPartitions = new HashSet<>(pausedPartitions);
@@ -55,14 +52,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
5552
pausePreviousPausedPartitions(partitions);
5653
}
5754

58-
// We can't use removeAll because of the bad performance of removeAll(#Collection) on HashSet
59-
for (TopicPartition p : partitions) {
60-
lastRevokedPartitions.remove(p);
61-
}
62-
63-
clearProgressForRevokedPartitions(lastRevokedPartitions);
64-
65-
lastRevokedPartitions.clear();
55+
clearProgressForRevokedPartitions();
6656
}
6757

6858
private void seekOnAssignedPartitions(Collection<TopicPartition> partitions) {
@@ -95,10 +85,12 @@ private void pausePreviousPausedPartitions(Collection<TopicPartition> partitions
9585
pausedPartitions = Collections.emptySet();
9686
}
9787

98-
private void clearProgressForRevokedPartitions(Collection<TopicPartition> revokedPartitions) {
99-
// revoke those partitions which was revoked and not reassigned
100-
if (!revokedPartitions.isEmpty()) {
101-
progress.clearFor(revokedPartitions);
88+
private void clearProgressForRevokedPartitions() {
89+
final Set<TopicPartition> partitionsWithProgress = progress.allPartitions();
90+
partitionsWithProgress.removeAll(consumer.assignment());
91+
92+
if (!partitionsWithProgress.isEmpty()) {
93+
progress.clearFor(partitionsWithProgress);
10294
}
10395
}
10496
}

src/test/java/cn/leancloud/kafka/consumer/RebalanceListenerTest.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,23 +53,35 @@ public void testPauseNotFinishedPartitionsOnPartitionAssign() {
5353
verify(consumer, times(1)).pause(new HashSet<>(partitionStillNeedsToPause));
5454
}
5555

56+
@Test
57+
public void testPausedPartitionsWasAllRevoked() {
58+
listener = new RebalanceListener<>(consumer, progress, policy, ConsumerSeekDestination.NONE);
59+
final List<TopicPartition> pausedPartitions = toPartitions(IntStream.range(0, 30).boxed().collect(toList()));
60+
final List<TopicPartition> partitionToResumeAfterCommit = toPartitions(IntStream.range(0, 20).boxed().collect(toList()));
61+
final List<TopicPartition> assignedPartitions = toPartitions(IntStream.range(10, 20).boxed().collect(toList()));
62+
63+
when(consumer.paused()).thenReturn(new HashSet<>(pausedPartitions));
64+
when(policy.partialCommitSync(progress)).thenReturn(new HashSet<>(partitionToResumeAfterCommit));
65+
66+
listener.onPartitionsRevoked(pausedPartitions);
67+
listener.onPartitionsAssigned(assignedPartitions);
68+
69+
verify(consumer, never()).pause(any());
70+
}
71+
5672
@Test
5773
public void testRevokePartitions() {
74+
final Set<TopicPartition> allPartitions = new HashSet<>(toPartitions(IntStream.range(0, 30).boxed().collect(toList())));
75+
final Set<TopicPartition> reAssignedPartitions = new HashSet<>(toPartitions(IntStream.range(0, 20).boxed().collect(toList())));
76+
final Set<TopicPartition> revokedPartitions = new HashSet<>(toPartitions(IntStream.range(20, 30).boxed().collect(toList())));
5877
final ProcessRecordsProgress progress = mock(ProcessRecordsProgress.class);
78+
when(progress.allPartitions()).thenReturn(allPartitions);
79+
when(consumer.assignment()).thenReturn(reAssignedPartitions);
5980
listener = new RebalanceListener<>(consumer, progress, policy, ConsumerSeekDestination.NONE);
60-
final List<TopicPartition> allPartitions = toPartitions(IntStream.range(0, 30).boxed().collect(toList()));
61-
final List<TopicPartition> reAssignedPartitions = toPartitions(IntStream.range(0, 20).boxed().collect(toList()));
62-
final Set<TopicPartition> revokedPartitions = new HashSet<>(toPartitions(IntStream.range(20, 30).boxed().collect(toList())));
63-
64-
doAnswer(invocation -> {
65-
assertThat(revokedPartitions).containsExactlyInAnyOrderElementsOf(invocation.getArgument(0));
66-
return null;
67-
}).when(progress).clearFor(anySet());
6881

69-
listener.onPartitionsRevoked(allPartitions);
7082
listener.onPartitionsAssigned(reAssignedPartitions);
7183

72-
verify(progress, times(1)).clearFor(anyCollection());
84+
verify(progress, times(1)).clearFor(revokedPartitions);
7385
}
7486

7587
@Test

src/test/java/cn/leancloud/kafka/consumer/integration/Bootstrap.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ public LcKafkaConsumer<Integer, String> buildConsumer(String consumerName, TestS
318318
new StringDeserializer())
319319
.workerPool(workerPool, false)
320320
.recommitInterval(Duration.ofSeconds(1))
321-
.buildAsync();
321+
.buildPartialAsync();
322322
}
323323
}
324324

@@ -341,7 +341,7 @@ public LcKafkaConsumer<Integer, String> buildConsumer(String consumerName, TestS
341341
new IntegerDeserializer(),
342342
new StringDeserializer())
343343
.recommitInterval(Duration.ofSeconds(1))
344-
.buildAsync();
344+
.buildPartialAsync();
345345
}
346346
}
347347

src/test/java/cn/leancloud/kafka/consumer/integration/JoinLeaveGroupTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public void runTest(TestContext cxt, TestStatistics statistics) throws Exception
2626
lingerConsumer.subscribe(Collections.singletonList(cxt.topic()));
2727

2828
final List<LcKafkaConsumer<Integer, String>> postJoinConsumers = new ArrayList<>();
29-
for (int i = 0; i < 5; ++i) {
29+
for (int i = 0; i < 8; ++i) {
3030
final LcKafkaConsumer<Integer, String> consumer = cxt.factory().buildConsumer(name() + "-" + i, statistics);
3131
consumer.subscribe(Collections.singletonList(cxt.topic()));
3232
postJoinConsumers.add(consumer);

src/test/java/cn/leancloud/kafka/consumer/integration/TestRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public class TestRunner implements Closeable {
1616

1717
TestRunner(String topic, ConsumerFactory factory) {
1818
this.factory = factory;
19-
this.producer = new TestingProducer(Duration.ofMillis(100), 2);
19+
this.producer = new TestingProducer(Duration.ofMillis(100), 4);
2020
this.context = new TestContext(topic, producer, factory);
2121
}
2222

0 commit comments

Comments
 (0)