Skip to content

Commit cd1fc47

Browse files
committed
(feat) no op commit policy
1 parent 7263c77 commit cd1fc47

File tree

8 files changed

+86
-102
lines changed

8 files changed

+86
-102
lines changed

src/main/java/cn/leancloud/kafka/consumer/AbstractCommitPolicy.java

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.apache.kafka.common.TopicPartition;
77

88
import java.util.HashMap;
9+
import java.util.HashSet;
910
import java.util.Map;
1011
import java.util.Set;
1112

@@ -51,13 +52,27 @@ public Set<TopicPartition> partialCommit() {
5152
return partitions;
5253
}
5354

54-
Set<TopicPartition> checkCompletedPartitions() {
55-
return completedTopicOffsets
56-
.entrySet()
57-
.stream()
58-
.filter(entry -> topicOffsetMeetHighWaterMark(entry.getKey(), entry.getValue()))
59-
.map(Map.Entry::getKey)
60-
.collect(toSet());
55+
Set<TopicPartition> getCompletedPartitions(boolean noPendingRecords) {
56+
final Set<TopicPartition> partitions;
57+
if (noPendingRecords) {
58+
assert checkCompletedPartitions().equals(topicOffsetHighWaterMark.keySet())
59+
: "expect: " + checkCompletedPartitions() + " actual: " + topicOffsetHighWaterMark.keySet();
60+
partitions = new HashSet<>(topicOffsetHighWaterMark.keySet());
61+
} else {
62+
partitions = checkCompletedPartitions();
63+
}
64+
return partitions;
65+
}
66+
67+
void clearCachedCompletedPartitionsRecords(Set<TopicPartition> completedPartitions, boolean noPendingRecords) {
68+
completedTopicOffsets.clear();
69+
if (noPendingRecords) {
70+
topicOffsetHighWaterMark.clear();
71+
} else {
72+
for (TopicPartition p : completedPartitions) {
73+
topicOffsetHighWaterMark.remove(p);
74+
}
75+
}
6176
}
6277

6378
Map<TopicPartition, Long> topicOffsetHighWaterMark() {
@@ -68,6 +83,15 @@ Map<TopicPartition, OffsetAndMetadata> completedTopicOffsets() {
6883
return completedTopicOffsets;
6984
}
7085

86+
private Set<TopicPartition> checkCompletedPartitions() {
87+
return completedTopicOffsets
88+
.entrySet()
89+
.stream()
90+
.filter(entry -> topicOffsetMeetHighWaterMark(entry.getKey(), entry.getValue()))
91+
.map(Map.Entry::getKey)
92+
.collect(toSet());
93+
}
94+
7195
private boolean topicOffsetMeetHighWaterMark(TopicPartition topicPartition, OffsetAndMetadata offset) {
7296
final Long offsetHighWaterMark = topicOffsetHighWaterMark.get(topicPartition);
7397
if (offsetHighWaterMark != null) {

src/main/java/cn/leancloud/kafka/consumer/AsyncCommitPolicy.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
4444
}
4545

4646
final Set<TopicPartition> partitions = new HashSet<>(completedTopicOffsets.keySet());
47+
// it's OK to clear these collections here and we will not left any complete offset without commit even
48+
// when this async commit failed because if the async commit failed we will do a sync commit after all
4749
completedTopicOffsets.clear();
4850
topicOffsetHighWaterMark.clear();
4951
return partitions;
Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,24 @@
11
package cn.leancloud.kafka.consumer;
22

3-
import org.apache.kafka.clients.consumer.ConsumerRecord;
3+
import org.apache.kafka.clients.consumer.Consumer;
44
import org.apache.kafka.common.TopicPartition;
55

66
import java.util.Collections;
77
import java.util.Set;
88

9-
final class AutoCommitPolicy<K, V> implements CommitPolicy<K, V> {
10-
private static final AutoCommitPolicy INSTANCE = new AutoCommitPolicy();
11-
12-
@SuppressWarnings("unchecked")
13-
static <K, V> AutoCommitPolicy<K, V> getInstance() {
14-
return (AutoCommitPolicy<K, V>) INSTANCE;
15-
}
16-
17-
@Override
18-
public void addPendingRecord(ConsumerRecord<K, V> record) {
19-
20-
}
21-
22-
@Override
23-
public void completeRecord(ConsumerRecord<K, V> record) {
24-
9+
class AutoCommitPolicy<K, V> extends AbstractCommitPolicy<K, V> {
10+
AutoCommitPolicy(Consumer<K, V> consumer) {
11+
super(consumer);
2512
}
2613

2714
@Override
2815
public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
29-
return Collections.emptySet();
30-
}
16+
if (completedTopicOffsets.isEmpty()) {
17+
return Collections.emptySet();
18+
}
3119

32-
@Override
33-
public Set<TopicPartition> partialCommit() {
34-
return Collections.emptySet();
20+
final Set<TopicPartition> partitions = getCompletedPartitions(noPendingRecords);
21+
clearCachedCompletedPartitionsRecords(partitions, noPendingRecords);
22+
return partitions;
3523
}
3624
}

src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilder.java

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,11 @@
1010
import java.util.HashMap;
1111
import java.util.Map;
1212
import java.util.concurrent.ExecutorService;
13-
import java.util.concurrent.Executors;
14-
import java.util.concurrent.ThreadFactory;
15-
1613

1714
import static cn.leancloud.kafka.consumer.BasicConsumerConfigs.ENABLE_AUTO_COMMIT;
1815
import static java.util.Objects.requireNonNull;
1916

2017
public final class LcKafkaConsumerBuilder<K, V> {
21-
private static final ThreadFactory threadFactory = new NamedThreadFactory("lc-kafka-consumer-task-worker-pool-");
22-
2318
/**
2419
* Create a {@code LcKafkaConsumerBuilder} used to build {@link LcKafkaConsumer}.
2520
*
@@ -70,6 +65,8 @@ private static void requireArgument(boolean expression, String template, Object.
7065
private long pollTimeout = 100;
7166
private int maxPendingAsyncCommits = 10;
7267
private long gracefulShutdownMillis = 10_000;
68+
private ExecutorService workerPool = ImmediateExecutorService.INSTANCE;
69+
private boolean shutdownWorkerPoolOnStop = false;
7370
private Map<String, Object> configs;
7471
private MessageHandler<K, V> messageHandler;
7572
@Nullable
@@ -80,9 +77,6 @@ private static void requireArgument(boolean expression, String template, Object.
8077
private Deserializer<V> valueDeserializer;
8178
@Nullable
8279
private CommitPolicy<K, V> policy;
83-
@Nullable
84-
private ExecutorService workerPool;
85-
private boolean shutdownWorkerPoolOnStop;
8680

8781
private LcKafkaConsumerBuilder(Map<String, Object> kafkaConsumerConfigs,
8882
MessageHandler<K, V> messageHandler) {
@@ -239,12 +233,7 @@ public LcKafkaConsumerBuilder<K, V> workerPool(ExecutorService workerPool, boole
239233
public <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> buildAuto() {
240234
checkConfigs(AutoCommitConsumerConfigs.values());
241235
consumer = buildConsumer(true);
242-
policy = AutoCommitPolicy.getInstance();
243-
if (workerPool != null && shutdownWorkerPoolOnStop) {
244-
throw new IllegalArgumentException("auto commit consumer don't need a worker pool");
245-
}
246-
workerPool = ImmediateExecutorService.INSTANCE;
247-
shutdownWorkerPoolOnStop = false;
236+
policy = workerPool == ImmediateExecutorService.INSTANCE ? NoOpCommitPolicy.getInstance() : new AutoCommitPolicy<>(consumer);
248237
return doBuild();
249238
}
250239

@@ -282,7 +271,6 @@ MessageHandler<K, V> getMessageHandler() {
282271
}
283272

284273
ExecutorService getWorkerPool() {
285-
assert workerPool != null;
286274
return workerPool;
287275
}
288276

@@ -327,11 +315,6 @@ private void checkConfigs(KafkaConfigsChecker[] checkers) {
327315
}
328316

329317
private <K1 extends K, V1 extends V> LcKafkaConsumer<K1, V1> doBuild() {
330-
if (workerPool == null) {
331-
workerPool = Executors.newCachedThreadPool(threadFactory);
332-
shutdownWorkerPoolOnStop = true;
333-
}
334-
335318
@SuppressWarnings("unchecked")
336319
final LcKafkaConsumer<K1, V1> c = (LcKafkaConsumer<K1, V1>) new LcKafkaConsumer<>(this);
337320
return c;
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package cn.leancloud.kafka.consumer;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerRecord;
4+
import org.apache.kafka.common.TopicPartition;
5+
6+
import java.util.Collections;
7+
import java.util.Set;
8+
9+
final class NoOpCommitPolicy<K, V> implements CommitPolicy<K, V> {
10+
private static final NoOpCommitPolicy INSTANCE = new NoOpCommitPolicy();
11+
12+
@SuppressWarnings("unchecked")
13+
static <K, V> NoOpCommitPolicy<K, V> getInstance() {
14+
return (NoOpCommitPolicy<K, V>) INSTANCE;
15+
}
16+
17+
@Override
18+
public void addPendingRecord(ConsumerRecord<K, V> record) {
19+
20+
}
21+
22+
@Override
23+
public void completeRecord(ConsumerRecord<K, V> record) {
24+
25+
}
26+
27+
@Override
28+
public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
29+
return Collections.emptySet();
30+
}
31+
32+
@Override
33+
public Set<TopicPartition> partialCommit() {
34+
return Collections.emptySet();
35+
}
36+
}

src/main/java/cn/leancloud/kafka/consumer/PartialAsyncCommitPolicy.java

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,7 @@ public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
3131
consumer.commitSync(completedTopicOffsets);
3232
pendingAsyncCommitCounter = 0;
3333
forceSync = false;
34-
completedTopicOffsets.clear();
35-
if (noPendingRecords) {
36-
topicOffsetHighWaterMark.clear();
37-
} else {
38-
for (TopicPartition p : partitions) {
39-
topicOffsetHighWaterMark.remove(p);
40-
}
41-
}
34+
clearCachedCompletedPartitionsRecords(partitions, noPendingRecords);
4235
} else {
4336
++pendingAsyncCommitCounter;
4437
consumer.commitAsync(completedTopicOffsets, (offsets, exception) -> {
@@ -61,16 +54,4 @@ public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
6154
}
6255
return partitions;
6356
}
64-
65-
private Set<TopicPartition> getCompletedPartitions(boolean noPendingRecords) {
66-
final Set<TopicPartition> partitions;
67-
if (noPendingRecords) {
68-
assert checkCompletedPartitions().equals(topicOffsetHighWaterMark.keySet())
69-
: "expect: " + checkCompletedPartitions() + " actual: " + topicOffsetHighWaterMark.keySet();
70-
partitions = new HashSet<>(topicOffsetHighWaterMark.keySet());
71-
} else {
72-
partitions = checkCompletedPartitions();
73-
}
74-
return partitions;
75-
}
7657
}

src/main/java/cn/leancloud/kafka/consumer/PartialSyncCommitPolicy.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import org.apache.kafka.common.TopicPartition;
55

66
import java.util.Collections;
7-
import java.util.HashSet;
87
import java.util.Set;
98

109
final class PartialSyncCommitPolicy<K, V> extends AbstractCommitPolicy<K, V> {
@@ -20,20 +19,8 @@ public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
2019

2120
consumer.commitSync(completedTopicOffsets);
2221

23-
final Set<TopicPartition> partitions;
24-
if (noPendingRecords) {
25-
assert checkCompletedPartitions().equals(topicOffsetHighWaterMark.keySet())
26-
: "expect: " + checkCompletedPartitions() + " actual: " + topicOffsetHighWaterMark.keySet();
27-
partitions = new HashSet<>(topicOffsetHighWaterMark.keySet());
28-
completedTopicOffsets.clear();
29-
topicOffsetHighWaterMark.clear();
30-
} else {
31-
partitions = checkCompletedPartitions();
32-
completedTopicOffsets.clear();
33-
for (TopicPartition p : partitions) {
34-
topicOffsetHighWaterMark.remove(p);
35-
}
36-
}
22+
final Set<TopicPartition> partitions = getCompletedPartitions(noPendingRecords);
23+
clearCachedCompletedPartitionsRecords(partitions, noPendingRecords);
3724
return partitions;
3825
}
3926
}

src/test/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilderTest.java

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -165,22 +165,6 @@ public void testAutoConsumerWithoutAutoCommitInterval() {
165165
.hasMessageContaining("expect \"auto.commit.interval.ms\"");
166166
}
167167

168-
@Test
169-
public void testAutoConsumerWithShouldShutdownWorkerPool() {
170-
configs.put("max.poll.records", "10");
171-
configs.put("max.poll.interval.ms", "1000");
172-
configs.put("auto.commit.interval.ms", "1000");
173-
configs.put("auto.offset.reset", "latest");
174-
assertThatThrownBy(() -> LcKafkaConsumerBuilder.newBuilder(configs, testingHandler, keyDeserializer, valueDeserializer)
175-
.mockKafkaConsumer(new MockConsumer<>(OffsetResetStrategy.LATEST))
176-
.pollTimeoutMillis(1000)
177-
.maxPendingAsyncCommits(100)
178-
.workerPool(workerPool, true)
179-
.buildAuto())
180-
.isInstanceOf(IllegalArgumentException.class)
181-
.hasMessage("auto commit consumer don't need a worker pool");
182-
}
183-
184168
@Test
185169
public void testAutoConsumer() {
186170
configs.put("max.poll.records", "10");
@@ -191,11 +175,10 @@ public void testAutoConsumer() {
191175
.mockKafkaConsumer(new MockConsumer<>(OffsetResetStrategy.LATEST))
192176
.pollTimeoutMillis(1000)
193177
.maxPendingAsyncCommits(100)
194-
.workerPool(workerPool, false)
195178
.buildAuto();
196179

197180
assertThat(consumer).isNotNull();
198-
assertThat(consumer.policy()).isInstanceOf(AutoCommitPolicy.class);
181+
assertThat(consumer.policy()).isInstanceOf(NoOpCommitPolicy.class);
199182
consumer.close();
200183
}
201184

0 commit comments

Comments
 (0)