Skip to content

Commit b00ff54

Browse files
authored
Merge pull request #22 from leancloud/feat/no-commit-while-rebalance
no commit while partition rebalancing
2 parents 30e441e + 837d91b commit b00ff54

File tree

10 files changed

+136
-8
lines changed

10 files changed

+136
-8
lines changed

src/main/java/cn/leancloud/kafka/consumer/AbstractCommitPolicy.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ void onError(RetriableException e) {
4646
protected final Consumer<K, V> consumer;
4747
private final long syncCommitRetryIntervalMs;
4848
private final int maxAttemptsForEachSyncCommit;
49+
private boolean commitPuased;
4950

5051
AbstractCommitPolicy(Consumer<K, V> consumer, Duration syncCommitRetryInterval, int maxAttemptsForEachSyncCommit) {
5152
this.consumer = consumer;
@@ -65,6 +66,21 @@ public Set<TopicPartition> partialCommitSync(ProcessRecordsProgress progress) {
6566
return progress.clearCompletedPartitions(offsetsToCommit);
6667
}
6768

69+
@Override
70+
public void pauseCommit() {
71+
commitPuased = true;
72+
}
73+
74+
@Override
75+
public void resumeCommit() {
76+
commitPuased = false;
77+
}
78+
79+
@Override
80+
public boolean commitPaused() {
81+
return commitPuased;
82+
}
83+
6884
Set<TopicPartition> fullCommitSync(ProcessRecordsProgress progress) {
6985
commitSyncWithRetry();
7086

src/main/java/cn/leancloud/kafka/consumer/AbstractRecommitAwareCommitPolicy.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.apache.kafka.common.TopicPartition;
66

77
import java.time.Duration;
8+
import java.util.Collections;
89
import java.util.HashMap;
910
import java.util.Map;
1011
import java.util.Set;
@@ -24,6 +25,10 @@ abstract class AbstractRecommitAwareCommitPolicy<K, V> extends AbstractCommitPol
2425

2526
@Override
2627
public final Set<TopicPartition> tryCommit(boolean noPendingRecords, ProcessRecordsProgress progress) {
28+
if (commitPaused()) {
29+
return Collections.emptySet();
30+
}
31+
2732
if (needRecommit()) {
2833
commitSyncWithRetry(offsetsForRecommit());
2934
updateNextRecommitTime();

src/main/java/cn/leancloud/kafka/consumer/AutoCommitPolicy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ final class AutoCommitPolicy<K, V> extends AbstractCommitPolicy<K, V> {
1616

1717
@Override
1818
public Set<TopicPartition> tryCommit(boolean noPendingRecords, ProcessRecordsProgress progress) {
19-
if (progress.noOffsetsToCommit()) {
19+
if (commitPaused() || progress.noOffsetsToCommit()) {
2020
return emptySet();
2121
}
2222

src/main/java/cn/leancloud/kafka/consumer/CommitPolicy.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import org.apache.kafka.clients.consumer.ConsumerRecord;
44
import org.apache.kafka.common.TopicPartition;
55

6-
import java.util.Collection;
76
import java.util.Set;
87

98
interface CommitPolicy {
@@ -27,4 +26,25 @@ interface CommitPolicy {
2726
* @return those {@link TopicPartition}s which have no pending {@code ConsumerRecord}s
2827
*/
2928
Set<TopicPartition> partialCommitSync(ProcessRecordsProgress progress);
29+
30+
/**
31+
* Pause commit until {@link #resumeCommit()} is called. Next {@link #tryCommit(boolean, ProcessRecordsProgress)}
32+
* will return empty Set without commit anything.
33+
* <p>
34+
* Please note that this method has no effect on {@link #partialCommitSync(ProcessRecordsProgress)} which will
35+
* even this method was called.
36+
*/
37+
void pauseCommit();
38+
39+
/**
40+
* Resume commit and next {@link #tryCommit(boolean, ProcessRecordsProgress)} will do the commitment stuff.
41+
*/
42+
void resumeCommit();
43+
44+
/**
45+
* Check weather commit was paused.
46+
*
47+
* @return true when {@link #pauseCommit()} has been called
48+
*/
49+
boolean commitPaused();
3050
}

src/main/java/cn/leancloud/kafka/consumer/NoOpCommitPolicy.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,18 @@ public Set<TopicPartition> tryCommit(boolean noPendingRecords, ProcessRecordsPro
2222
public Set<TopicPartition> partialCommitSync(ProcessRecordsProgress progress) {
2323
return emptySet();
2424
}
25+
26+
@Override
27+
public void pauseCommit() {
28+
}
29+
30+
@Override
31+
public void resumeCommit() {
32+
33+
}
34+
35+
@Override
36+
public boolean commitPaused() {
37+
return false;
38+
}
2539
}

src/main/java/cn/leancloud/kafka/consumer/RebalanceListener.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
4040
pausedPartitions = new HashSet<>(pausedPartitions);
4141
pausedPartitions.removeAll(policy.partialCommitSync(progress));
4242
}
43+
policy.pauseCommit();
4344
}
4445

4546
@Override
@@ -48,11 +49,14 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
4849
seekOnAssignedPartitions(partitions);
4950
}
5051

52+
clearProgressForRevokedPartitions();
53+
54+
policy.resumeCommit();
55+
final Set<TopicPartition> resumedPartitions = policy.partialCommitSync(progress);
5156
if (!pausedPartitions.isEmpty()) {
57+
pausedPartitions.removeAll(resumedPartitions);
5258
pausePreviousPausedPartitions(partitions);
5359
}
54-
55-
clearProgressForRevokedPartitions();
5660
}
5761

5862
private void seekOnAssignedPartitions(Collection<TopicPartition> partitions) {

src/test/java/cn/leancloud/kafka/consumer/AbstractCommitPolicyTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,15 @@ public void testInterruptOnRetry() {
180180
assertThat(Thread.interrupted());
181181
}
182182

183+
@Test
184+
public void testPauseResume() {
185+
assertThat(policy.commitPaused()).isFalse();
186+
policy.pauseCommit();
187+
assertThat(policy.commitPaused()).isTrue();
188+
policy.resumeCommit();
189+
assertThat(policy.commitPaused()).isFalse();
190+
}
191+
183192
private TopicPartition partition(int partition) {
184193
return new TopicPartition(testingTopic, partition);
185194
}

src/test/java/cn/leancloud/kafka/consumer/AbstractRecommitAwareCommitPolicyTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,26 @@ public void testRecommit() {
6060
verify(consumer, times(1)).commitSync(previousCommitOffsets);
6161
assertThat(policy.nextRecommitNanos()).isGreaterThan(now + Duration.ofMillis(200).toNanos());
6262
}
63+
64+
@Test
65+
public void testNoRecommitWhenPaused() {
66+
final Consumer<Object, Object> consumer = mock(Consumer.class);
67+
final List<ConsumerRecord<Object, Object>> prevRecords = prepareConsumerRecords(toPartitions(IntStream.range(0, 10).boxed().collect(toList())), 1, 10);
68+
final Map<TopicPartition, OffsetAndMetadata> previousCommitOffsets = buildCommitOffsets(prevRecords);
69+
final long now = System.nanoTime();
70+
71+
when(consumer.assignment()).thenReturn(new HashSet<>(partitions));
72+
when(consumer.committed(any())).thenAnswer(invocation ->
73+
previousCommitOffsets.get(invocation.getArgument(0))
74+
);
75+
76+
policy = new TestingPolicy(consumer, Duration.ZERO, 3, Duration.ofMillis(200));
77+
policy.pauseCommit();
78+
79+
policy.updateNextRecommitTime(now - Duration.ofMillis(200).toNanos());
80+
assertThat(policy.tryCommit(true, new ProcessRecordsProgress())).isEmpty();
81+
82+
verify(consumer, never()).commitSync(any());
83+
assertThat(policy.nextRecommitNanos()).isEqualTo(now);
84+
}
6385
}

src/test/java/cn/leancloud/kafka/consumer/AutoCommitPolicyTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,24 @@ public void tearDown() {
3737
consumer.close();
3838
}
3939

40+
@Test
41+
public void testPauseResumeCommit() {
42+
addCompleteRecordsInPolicy(progress, pendingRecords);
43+
44+
policy.pauseCommit();
45+
assertThat(policy.tryCommit(true, progress)).isEmpty();
46+
assertThat(progress.noCompletedRecords()).isFalse();
47+
assertThat(progress.noPendingRecords()).isFalse();
48+
49+
policy.resumeCommit();
50+
assertThat(policy.tryCommit(true, progress))
51+
.hasSize(partitions.size())
52+
.containsExactlyInAnyOrderElementsOf(partitions);
53+
54+
assertThat(progress.noCompletedRecords()).isTrue();
55+
assertThat(progress.noPendingRecords()).isTrue();
56+
}
57+
4058
@Test
4159
public void testOnlyConsumedRecords() {
4260
assertThat(policy.tryCommit(true, progress)).isEmpty();

src/test/java/cn/leancloud/kafka/consumer/RebalanceListenerTest.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,7 @@
77
import org.junit.Test;
88
import org.mockito.ArgumentCaptor;
99

10-
import java.util.Collection;
11-
import java.util.HashSet;
12-
import java.util.List;
13-
import java.util.Set;
10+
import java.util.*;
1411
import java.util.stream.IntStream;
1512

1613
import static cn.leancloud.kafka.consumer.TestingUtils.toPartitions;
@@ -84,6 +81,29 @@ public void testRevokePartitions() {
8481
verify(progress, times(1)).clearFor(revokedPartitions);
8582
}
8683

84+
@Test
85+
public void testPauseResumeCommit() {
86+
listener = new RebalanceListener<>(consumer, progress, policy, ConsumerSeekDestination.NONE);
87+
final List<TopicPartition> pausedPartitions = toPartitions(IntStream.range(0, 30).boxed().collect(toList()));
88+
final List<TopicPartition> partitionToResumeAfterCommit = toPartitions(IntStream.range(0, 20).boxed().collect(toList()));
89+
final List<TopicPartition> assignedPartitions = toPartitions(IntStream.range(10, 25).boxed().collect(toList()));
90+
final List<TopicPartition> partitionStillNeedsToPause = toPartitions(IntStream.range(20, 25).boxed().collect(toList()));
91+
92+
when(consumer.paused()).thenReturn(new HashSet<>(pausedPartitions));
93+
when(policy.partialCommitSync(progress))
94+
// Return empty Set on the first call
95+
.thenReturn(Collections.emptySet())
96+
// Return resumed partitions on the second call
97+
.thenReturn(new HashSet<>(partitionToResumeAfterCommit));
98+
99+
listener.onPartitionsRevoked(pausedPartitions);
100+
verify(policy, times(1)).pauseCommit();
101+
listener.onPartitionsAssigned(assignedPartitions);
102+
verify(policy, times(1)).resumeCommit();
103+
104+
verify(consumer, times(1)).pause(new HashSet<>(partitionStillNeedsToPause));
105+
}
106+
87107
@Test
88108
public void testForceSeekToBeginningForAllPartitions() {
89109
listener = new RebalanceListener<>(consumer, progress, policy, ConsumerSeekDestination.BEGINNING);

0 commit comments

Comments
 (0)