Skip to content

Commit d64394d

Browse files
authored
Merge pull request #12 from leancloud/feat/force-seek-to-beginning
support force seek offsets to beginning or end
2 parents 6324eff + 1caefba commit d64394d

File tree

9 files changed

+200
-17
lines changed

9 files changed

+200
-17
lines changed

.codecov.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ coverage:
22
status:
33
project:
44
default:
5-
threshold: 0.25
5+
threshold: 0.1%
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package cn.leancloud.kafka.consumer;
2+
3+
import org.apache.kafka.clients.consumer.Consumer;
4+
import org.apache.kafka.common.TopicPartition;
5+
6+
import java.util.Collection;
7+
8+
/**
9+
* The destination for a consumer reset offset operation.
10+
*/
11+
public enum ConsumerSeekDestination {
12+
/**
13+
* Do nothing. Do not reset offset of the given consumer to anywhere.
14+
*/
15+
NONE {
16+
@Override
17+
public void seek(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
18+
19+
}
20+
},
21+
/**
22+
* Reset offsets for all the given partitions of a given consumer to beginning.
23+
*/
24+
BEGINNING {
25+
@Override
26+
public void seek(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
27+
consumer.seekToBeginning(partitions);
28+
}
29+
},
30+
/**
31+
* Reset offsets for all the given partitions of a given consumer to end.
32+
*/
33+
END {
34+
@Override
35+
public void seek(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
36+
consumer.seekToEnd(partitions);
37+
}
38+
};
39+
40+
/**
41+
* Reset offsets for all the given partitions of a given consumer.
42+
*
43+
* @param consumer the consumer to reset offset
44+
* @param partitions the partitions to reset offsets
45+
*/
46+
abstract public void seek(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
47+
}

src/main/java/cn/leancloud/kafka/consumer/Fetcher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,8 @@ boolean timeout() {
9393

9494
Fetcher(LcKafkaConsumerBuilder<K, V> consumerBuilder) {
9595
this(consumerBuilder.getConsumer(), consumerBuilder.getPollTimeout(), consumerBuilder.getConsumerRecordHandler(),
96-
consumerBuilder.getWorkerPool(), consumerBuilder.getPolicy(), consumerBuilder.gracefulShutdownTimeout(),
97-
consumerBuilder.handleRecordTimeout());
96+
consumerBuilder.getWorkerPool(), consumerBuilder.getPolicy(), consumerBuilder.getGracefulShutdownTimeout(),
97+
consumerBuilder.getHandleRecordTimeout());
9898
}
9999

100100
Fetcher(Consumer<K, V> consumer,
@@ -153,7 +153,7 @@ public void run() {
153153

154154
unsubscribedStatus = UnsubscribedStatus.ERROR;
155155
markClosed();
156-
logger.error("Fetcher quit with unexpected exception. Will rebalance after poll timeout.", ex);
156+
logger.error("Fetcher quit with unexpected exception.", ex);
157157
break;
158158
}
159159
}

src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumer.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ int code() {
5151
private final ExecutorService workerPool;
5252
private final CommitPolicy<K, V> policy;
5353
private final boolean shutdownWorkerPoolOnStop;
54+
private final ConsumerSeekDestination forceSeekTo;
5455
private volatile State state;
5556

5657
LcKafkaConsumer(LcKafkaConsumerBuilder<K, V> builder) {
@@ -60,6 +61,7 @@ int code() {
6061
this.shutdownWorkerPoolOnStop = builder.isShutdownWorkerPoolOnStop();
6162
this.policy = builder.getPolicy();
6263
this.fetcher = new Fetcher<>(builder);
64+
this.forceSeekTo = builder.getForceSeekTo();
6365
this.fetcherThread = new Thread(fetcher);
6466
}
6567

@@ -89,7 +91,7 @@ public synchronized CompletableFuture<UnsubscribedStatus> subscribe(Collection<S
8991

9092
ensureInInit();
9193

92-
consumer.subscribe(topics, new RebalanceListener<>(consumer, policy));
94+
consumer.subscribe(topics, new RebalanceListener<>(consumer, policy, forceSeekTo));
9395

9496
fetcherThread.setName(fetcherThreadName(topics));
9597
fetcherThread.start();
@@ -118,7 +120,7 @@ public synchronized CompletableFuture<UnsubscribedStatus> subscribe(Pattern patt
118120

119121
ensureInInit();
120122

121-
consumer.subscribe(pattern, new RebalanceListener<>(consumer, policy));
123+
consumer.subscribe(pattern, new RebalanceListener<>(consumer, policy, forceSeekTo));
122124

123125
fetcherThread.setName(fetcherThreadName(pattern));
124126
fetcherThread.start();
@@ -189,8 +191,11 @@ private CompletableFuture<UnsubscribedStatus> setupUnsubscribedFuture() {
189191
// returned CompletableFuture
190192
final CompletableFuture<UnsubscribedStatus> ret = new CompletableFuture<>();
191193
fetcher.unsubscribeStatusFuture().thenAccept(status -> {
192-
close();
193-
ret.complete(status);
194+
try {
195+
close();
196+
} finally {
197+
ret.complete(status);
198+
}
194199
});
195200
return ret;
196201
}

src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilder.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ private static void requireArgument(boolean expression, String template, Object.
8484
private Duration handleRecordTimeout = Duration.ZERO;
8585
private Map<String, Object> configs;
8686
private ConsumerRecordHandler<K, V> consumerRecordHandler;
87+
private ConsumerSeekDestination forceSeekTo = ConsumerSeekDestination.NONE;
8788
@Nullable
8889
private Consumer<K, V> consumer;
8990
@Nullable
@@ -362,6 +363,25 @@ public LcKafkaConsumerBuilder<K, V> workerPool(ExecutorService workerPool, boole
362363
return this;
363364
}
364365

366+
/**
367+
* After subscribed to any topic, force seek offsets for every assigned partitions to beginning or end no matter
368+
* where the recorded offsets for the group of this consumer are. The difference between this configuration with
369+
* "auto.offset.reset" in kafka configurations is that "auto.offset.reset" only reset offset to beginning or end
370+
* when there's no recorded offsets or the previous recorded offsets was removed, while {@code forceSeekTo()} will
371+
* reset offsets even when there are offsets records.
372+
* <p>
373+
* Please note that the reset offset operation will take place only once when a partition is assigned on the
374+
* first time. If a partition was revoked then assigned again, the offset of this partition will not reset again.
375+
*
376+
* @param destination where to seek to
377+
* @return this
378+
*/
379+
public LcKafkaConsumerBuilder<K, V> forceSeekTo(ConsumerSeekDestination destination) {
380+
requireNonNull(destination, "destination");
381+
this.forceSeekTo = destination;
382+
return this;
383+
}
384+
365385
/**
366386
* Build a consumer which commits offset automatically at fixed interval. It is both OK for with or without a
367387
* worker thread pool. But without a worker pool, please tune the {@code max.poll.interval.ms} in
@@ -530,11 +550,11 @@ Duration getPollTimeout() {
530550
return pollTimeout;
531551
}
532552

533-
Duration gracefulShutdownTimeout() {
553+
Duration getGracefulShutdownTimeout() {
534554
return gracefulShutdownTimeout;
535555
}
536556

537-
Duration handleRecordTimeout() {
557+
Duration getHandleRecordTimeout() {
538558
return handleRecordTimeout;
539559
}
540560

@@ -543,6 +563,10 @@ CommitPolicy<K, V> getPolicy() {
543563
return policy;
544564
}
545565

566+
ConsumerSeekDestination getForceSeekTo() {
567+
return forceSeekTo;
568+
}
569+
546570
private Consumer<K, V> buildConsumer(boolean autoCommit) {
547571
checkConfigs(BasicConsumerConfigs.values());
548572
ENABLE_AUTO_COMMIT.set(configs, Boolean.toString(autoCommit));

src/main/java/cn/leancloud/kafka/consumer/RebalanceListener.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,24 @@
1111
import java.util.HashSet;
1212
import java.util.Set;
1313

14+
import static cn.leancloud.kafka.consumer.ConsumerSeekDestination.NONE;
1415
import static java.util.stream.Collectors.toSet;
1516

1617
final class RebalanceListener<K, V> implements ConsumerRebalanceListener {
1718
private static final Logger logger = LoggerFactory.getLogger(RebalanceListener.class);
1819

1920
private final CommitPolicy<K, V> policy;
2021
private final Consumer<K, V> consumer;
22+
private final Set<TopicPartition> knownPartitions;
23+
private final ConsumerSeekDestination forceSeekTo;
2124
private Set<TopicPartition> pausedPartitions;
2225

23-
RebalanceListener(Consumer<K, V> consumer, CommitPolicy<K, V> policy) {
26+
RebalanceListener(Consumer<K, V> consumer, CommitPolicy<K, V> policy, ConsumerSeekDestination forceSeekTo) {
2427
this.policy = policy;
2528
this.consumer = consumer;
2629
this.pausedPartitions = Collections.emptySet();
30+
this.knownPartitions = new HashSet<>();
31+
this.forceSeekTo = forceSeekTo;
2732
}
2833

2934
@Override
@@ -37,6 +42,21 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
3742

3843
@Override
3944
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
45+
if (forceSeekTo != NONE) {
46+
final Set<TopicPartition> newPartitions = new HashSet<>();
47+
for (TopicPartition p : partitions) {
48+
if (!knownPartitions.contains(p)) {
49+
newPartitions.add(p);
50+
logger.info("Assigned new partition: {}, force seeking it's offset to {}", p, forceSeekTo);
51+
}
52+
}
53+
54+
if (!newPartitions.isEmpty()) {
55+
forceSeekTo.seek(consumer, newPartitions);
56+
knownPartitions.addAll(newPartitions);
57+
}
58+
}
59+
4060
if (!pausedPartitions.isEmpty()) {
4161
final Set<TopicPartition> partitionToPause = partitions
4262
.stream()

src/test/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilderTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,14 @@ public void testNegativeHandleRecordTimeout() {
115115
.hasMessage("handleRecordTimeout: PT-1S (expect positive or zero duration)");
116116
}
117117

118+
@Test
119+
public void testNullForceSeekDestination() {
120+
assertThatThrownBy(() -> LcKafkaConsumerBuilder.newBuilder(configs, testingHandler, keyDeserializer, valueDeserializer)
121+
.forceSeekTo(null))
122+
.isInstanceOf(NullPointerException.class)
123+
.hasMessage("destination");
124+
}
125+
118126
@Test
119127
public void testNegativeShutdownTimeoutMs() {
120128
assertThatThrownBy(() -> LcKafkaConsumerBuilder.newBuilder(configs, testingHandler, keyDeserializer, valueDeserializer)
@@ -231,6 +239,7 @@ public void testSyncConsumer() {
231239
final LcKafkaConsumer<Object, Object> consumer = LcKafkaConsumerBuilder.newBuilder(configs, testingHandler)
232240
.mockKafkaConsumer(new MockConsumer<>(OffsetResetStrategy.LATEST))
233241
.pollTimeout(Duration.ofMillis(1000))
242+
.forceSeekTo(ConsumerSeekDestination.END)
234243
.maxPendingAsyncCommits(100)
235244
.workerPool(workerPool, false)
236245
.recommitInterval(Duration.ofMinutes(20))

src/test/java/cn/leancloud/kafka/consumer/RebalanceListenerTest.java

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,27 @@
55
import org.junit.After;
66
import org.junit.Before;
77
import org.junit.Test;
8+
import org.mockito.ArgumentCaptor;
89

10+
import java.util.Collection;
911
import java.util.HashSet;
1012
import java.util.List;
1113
import java.util.stream.IntStream;
1214

1315
import static cn.leancloud.kafka.consumer.TestingUtils.toPartitions;
1416
import static java.util.stream.Collectors.toList;
17+
import static org.assertj.core.api.Assertions.assertThat;
1518
import static org.mockito.Mockito.*;
1619

1720
public class RebalanceListenerTest {
1821
private CommitPolicy<Object, Object> policy;
1922
private Consumer<Object, Object> consumer;
20-
private RebalanceListener listener;
23+
private RebalanceListener<Object, Object> listener;
2124

2225
@Before
2326
public void setUp() {
2427
policy = mock(CommitPolicy.class);
2528
consumer = mock(Consumer.class);
26-
listener = new RebalanceListener(consumer, policy);
2729
}
2830

2931
@After
@@ -33,6 +35,7 @@ public void tearDown() {
3335

3436
@Test
3537
public void testPauseNotFinishedPartitionsOnPartitionAssign() {
38+
listener = new RebalanceListener<>(consumer, policy, ConsumerSeekDestination.NONE);
3639
final List<TopicPartition> pausedPartitions = toPartitions(IntStream.range(0, 30).boxed().collect(toList()));
3740
final List<TopicPartition> partitionToResumeAfterCommit = toPartitions(IntStream.range(0, 20).boxed().collect(toList()));
3841
final List<TopicPartition> assignedPartitions = toPartitions(IntStream.range(10, 25).boxed().collect(toList()));
@@ -46,4 +49,82 @@ public void testPauseNotFinishedPartitionsOnPartitionAssign() {
4649

4750
verify(consumer, times(1)).pause(new HashSet<>(partitionStillNeedsToPause));
4851
}
52+
53+
@Test
54+
public void testForceSeekToBeginningForAllPartitions() {
55+
listener = new RebalanceListener<>(consumer, policy, ConsumerSeekDestination.BEGINNING);
56+
57+
final List<TopicPartition> assignedPartitions = toPartitions(IntStream.range(0, 30).boxed().collect(toList()));
58+
listener.onPartitionsAssigned(assignedPartitions);
59+
60+
verify(consumer, times(1)).seekToBeginning(argThat(partitions ->
61+
new HashSet<>(assignedPartitions).equals(new HashSet<>(partitions))
62+
));
63+
}
64+
65+
@Test
66+
public void testForceSeekToBeginningForNewPartitions() {
67+
listener = new RebalanceListener<>(consumer, policy, ConsumerSeekDestination.BEGINNING);
68+
69+
final List<TopicPartition> initialPartitions = toPartitions(IntStream.range(0, 15).boxed().collect(toList()));
70+
final List<TopicPartition> newAssignedPartitions = toPartitions(IntStream.range(10, 30).boxed().collect(toList()));
71+
72+
listener.onPartitionsAssigned(initialPartitions);
73+
listener.onPartitionsAssigned(newAssignedPartitions);
74+
75+
ArgumentCaptor<Collection<TopicPartition>> seekToBeginningArgs = ArgumentCaptor.forClass(Collection.class);
76+
77+
verify(consumer, times(2)).seekToBeginning(seekToBeginningArgs.capture());
78+
79+
assertThat(seekToBeginningArgs.getAllValues().get(0)).containsExactlyInAnyOrderElementsOf(initialPartitions);
80+
81+
newAssignedPartitions.removeAll(initialPartitions);
82+
assertThat(seekToBeginningArgs.getAllValues().get(1)).containsExactlyInAnyOrderElementsOf(newAssignedPartitions);
83+
}
84+
85+
@Test
86+
public void testForceSeekToBeginningWithoutNewPartitions() {
87+
listener = new RebalanceListener<>(consumer, policy, ConsumerSeekDestination.BEGINNING);
88+
89+
final List<TopicPartition> assignedPartitions = toPartitions(IntStream.range(0, 30).boxed().collect(toList()));
90+
listener.onPartitionsAssigned(assignedPartitions);
91+
listener.onPartitionsAssigned(assignedPartitions);
92+
listener.onPartitionsAssigned(assignedPartitions);
93+
94+
verify(consumer, times(1)).seekToBeginning(argThat(partitions ->
95+
new HashSet<>(assignedPartitions).equals(new HashSet<>(partitions))
96+
));
97+
}
98+
99+
@Test
100+
public void testForceSeekToEndForAllPartitions() {
101+
listener = new RebalanceListener<>(consumer, policy, ConsumerSeekDestination.END);
102+
103+
final List<TopicPartition> assignedPartitions = toPartitions(IntStream.range(0, 30).boxed().collect(toList()));
104+
listener.onPartitionsAssigned(assignedPartitions);
105+
106+
verify(consumer, times(1)).seekToEnd(argThat(partitions ->
107+
new HashSet<>(assignedPartitions).equals(new HashSet<>(partitions))
108+
));
109+
}
110+
111+
@Test
112+
public void testForceSeekToEndForNewPartitions() {
113+
listener = new RebalanceListener<>(consumer, policy, ConsumerSeekDestination.END);
114+
115+
final List<TopicPartition> initialPartitions = toPartitions(IntStream.range(0, 15).boxed().collect(toList()));
116+
final List<TopicPartition> newAssignedPartitions = toPartitions(IntStream.range(10, 30).boxed().collect(toList()));
117+
118+
listener.onPartitionsAssigned(initialPartitions);
119+
listener.onPartitionsAssigned(newAssignedPartitions);
120+
121+
ArgumentCaptor<Collection<TopicPartition>> seekToEndArgs = ArgumentCaptor.forClass(Collection.class);
122+
123+
verify(consumer, times(2)).seekToEnd(seekToEndArgs.capture());
124+
125+
assertThat(seekToEndArgs.getAllValues().get(0)).containsExactlyInAnyOrderElementsOf(initialPartitions);
126+
127+
newAssignedPartitions.removeAll(initialPartitions);
128+
assertThat(seekToEndArgs.getAllValues().get(1)).containsExactlyInAnyOrderElementsOf(newAssignedPartitions);
129+
}
49130
}

src/test/java/cn/leancloud/kafka/consumer/integration/Bootstrap.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package cn.leancloud.kafka.consumer.integration;
22

3-
import cn.leancloud.kafka.consumer.ConsumerRecordHandler;
4-
import cn.leancloud.kafka.consumer.LcKafkaConsumer;
5-
import cn.leancloud.kafka.consumer.LcKafkaConsumerBuilder;
6-
import cn.leancloud.kafka.consumer.NamedThreadFactory;
3+
import cn.leancloud.kafka.consumer.*;
74
import org.apache.kafka.clients.consumer.ConsumerRecords;
85
import org.apache.kafka.clients.consumer.KafkaConsumer;
96
import org.apache.kafka.clients.consumer.OffsetAndMetadata;

0 commit comments

Comments
 (0)