Skip to content

Commit 278498b

Browse files
committed
(feat) subscribe to pattern
1 parent 2f17f21 commit 278498b

File tree

2 files changed

+146
-11
lines changed

2 files changed

+146
-11
lines changed

src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumer.java

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,13 @@
77
import java.io.Closeable;
88
import java.util.Collection;
99
import java.util.Map;
10+
import java.util.Objects;
1011
import java.util.concurrent.ExecutorService;
1112
import java.util.concurrent.TimeUnit;
13+
import java.util.regex.Pattern;
14+
15+
import static java.lang.Integer.min;
16+
import static java.util.Objects.requireNonNull;
1217

1318
/**
1419
* {@code LcKafkaConsumer} is a wrapper over {@link Consumer}. It will use {@link Consumer} to consume
@@ -63,21 +68,50 @@ int code() {
6368
*
6469
* @param topics the topics to consume.
6570
* @throws IllegalStateException if this {@code LcKafkaConsumer} has closed or subscribed to some topics
66-
* @throws IllegalArgumentException if the input {@code topics} is empty
71+
* @throws NullPointerException if the input {@code topics} is null
72+
* @throws IllegalArgumentException if the input {@code topics} is empty or contains null or empty topic
6773
*/
6874
public synchronized void subscribe(Collection<String> topics) {
75+
requireNonNull(topics, "topics");
76+
6977
if (topics.isEmpty()) {
7078
throw new IllegalArgumentException("subscribe empty topics");
7179
}
7280

73-
if (subscribed() || closed()) {
74-
throw new IllegalStateException("client is in " + state + " state. expect: " + State.INIT);
81+
for (String topic : topics) {
82+
if (topic == null || topic.trim().isEmpty())
83+
throw new IllegalArgumentException("topic collection to subscribe to cannot contain null or empty topic");
7584
}
7685

86+
ensureInInit();
87+
7788
consumer.subscribe(topics, new RebalanceListener<>(consumer, policy));
7889

79-
final String firstTopic = topics.iterator().next();
80-
fetcherThread.setName("kafka-fetcher-for-" + firstTopic + (topics.size() > 1 ? "..." : ""));
90+
fetcherThread.setName(fetcherThreadName(topics));
91+
fetcherThread.start();
92+
93+
state = State.SUBSCRIBED;
94+
}
95+
96+
/**
97+
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
98+
* The pattern matching will be done periodically against all topics existing at the time of check.
99+
* This can be controlled through the {@code metadata.max.age.ms} configuration: by lowering
100+
* the max metadata age, the consumer will refresh metadata more often and check for matching topics.
101+
*
102+
* @param pattern {@link Pattern} to subscribe to.
103+
* @throws IllegalStateException if this {@code LcKafkaConsumer} has closed or subscribed to some topics
104+
* @throws NullPointerException if the input {@code pattern} is null
105+
* @throws IllegalArgumentException if the input {@code pattern} is null
106+
*/
107+
public synchronized void subscribe(Pattern pattern) {
108+
requireNonNull(pattern, "pattern");
109+
110+
ensureInInit();
111+
112+
consumer.subscribe(pattern, new RebalanceListener<>(consumer, policy));
113+
114+
fetcherThread.setName(fetcherThreadName(pattern));
81115
fetcherThread.start();
82116

83117
state = State.SUBSCRIBED;
@@ -130,4 +164,24 @@ boolean closed() {
130164
CommitPolicy<K, V> policy() {
131165
return policy;
132166
}
167+
168+
private void ensureInInit() {
169+
if (subscribed() || closed()) {
170+
throw new IllegalStateException("client is in " + state + " state. expect: " + State.INIT);
171+
}
172+
}
173+
174+
private String fetcherThreadName(Collection<String> topics) {
175+
final String firstTopic = topics.iterator().next();
176+
String postfix = firstTopic.substring(0, min(10, firstTopic.length()));
177+
postfix += topics.size() > 1 || firstTopic.length() > 10 ? "..." : "";
178+
return "kafka-fetcher-for-" + postfix;
179+
}
180+
181+
private String fetcherThreadName(Pattern pattern) {
182+
final String patternInString = pattern.toString();
183+
String postfix = patternInString.substring(0, min(10, patternInString.length()));
184+
postfix += patternInString.length() > 10 ? "..." : "";
185+
return "kafka-fetcher-for-" + postfix;
186+
}
133187
}

src/test/java/cn/leancloud/kafka/consumer/LcKafkaConsumerTest.java

Lines changed: 87 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,23 @@
22

33
import org.apache.kafka.clients.consumer.MockConsumer;
44
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
5+
import org.apache.kafka.common.PartitionInfo;
6+
import org.apache.kafka.common.TopicPartition;
57
import org.junit.After;
68
import org.junit.Before;
79
import org.junit.Test;
810

9-
import java.util.Collections;
10-
import java.util.HashMap;
11-
import java.util.List;
12-
import java.util.Map;
11+
import java.util.*;
1312
import java.util.concurrent.ExecutorService;
14-
13+
import java.util.function.Function;
14+
import java.util.regex.Pattern;
15+
import java.util.stream.Collectors;
16+
import java.util.stream.IntStream;
17+
18+
import static cn.leancloud.kafka.consumer.TestingUtils.assignPartitions;
19+
import static cn.leancloud.kafka.consumer.TestingUtils.testingTopic;
20+
import static java.util.stream.Collectors.toList;
21+
import static java.util.stream.Collectors.toMap;
1522
import static org.assertj.core.api.Assertions.assertThat;
1623
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1724
import static org.mockito.ArgumentMatchers.any;
@@ -41,6 +48,17 @@ public void tearDown() throws Exception {
4148
consumer.close();
4249
}
4350

51+
@Test
52+
public void testSubscribeNullTopics() {
53+
consumer = LcKafkaConsumerBuilder.newBuilder(configs, consumerRecordHandler)
54+
.mockKafkaConsumer(new MockConsumer<>(OffsetResetStrategy.LATEST))
55+
.buildSync();
56+
Collection<String> topics = null;
57+
assertThatThrownBy(() -> consumer.subscribe(topics))
58+
.isInstanceOf(NullPointerException.class)
59+
.hasMessage("topics");
60+
}
61+
4462
@Test
4563
public void testSubscribeWithEmptyTopics() {
4664
consumer = LcKafkaConsumerBuilder.newBuilder(configs, consumerRecordHandler)
@@ -52,7 +70,38 @@ public void testSubscribeWithEmptyTopics() {
5270
}
5371

5472
@Test
55-
public void testSubscribe() {
73+
public void testSubscribeContainsEmptyTopics() {
74+
consumer = LcKafkaConsumerBuilder.newBuilder(configs, consumerRecordHandler)
75+
.mockKafkaConsumer(new MockConsumer<>(OffsetResetStrategy.LATEST))
76+
.buildSync();
77+
assertThatThrownBy(() -> consumer.subscribe(Arrays.asList("Topic", "")))
78+
.isInstanceOf(IllegalArgumentException.class)
79+
.hasMessage("topic collection to subscribe to cannot contain null or empty topic");
80+
}
81+
82+
@Test
83+
public void testSubscribeContainsNullPattern() {
84+
consumer = LcKafkaConsumerBuilder.newBuilder(configs, consumerRecordHandler)
85+
.mockKafkaConsumer(new MockConsumer<>(OffsetResetStrategy.LATEST))
86+
.buildSync();
87+
Pattern pattern = null;
88+
assertThatThrownBy(() -> consumer.subscribe(pattern))
89+
.isInstanceOf(NullPointerException.class)
90+
.hasMessage("pattern");
91+
}
92+
93+
@Test
94+
public void testSubscribeNull() {
95+
consumer = LcKafkaConsumerBuilder.newBuilder(configs, consumerRecordHandler)
96+
.mockKafkaConsumer(new MockConsumer<>(OffsetResetStrategy.LATEST))
97+
.buildSync();
98+
assertThatThrownBy(() -> consumer.subscribe(Arrays.asList("Topic", null)))
99+
.isInstanceOf(IllegalArgumentException.class)
100+
.hasMessage("topic collection to subscribe to cannot contain null or empty topic");
101+
}
102+
103+
@Test
104+
public void testSubscribeTopics() {
56105
final MockConsumer<Object, Object> kafkaConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
57106
consumer = LcKafkaConsumerBuilder.newBuilder(configs, consumerRecordHandler)
58107
.mockKafkaConsumer(kafkaConsumer)
@@ -63,6 +112,23 @@ public void testSubscribe() {
63112
assertThat(consumer.subscribed()).isTrue();
64113
}
65114

115+
@Test
116+
public void testSubscribePattern() {
117+
final MockConsumer<Object, Object> kafkaConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
118+
consumer = LcKafkaConsumerBuilder.newBuilder(configs, consumerRecordHandler)
119+
.mockKafkaConsumer(kafkaConsumer)
120+
.buildSync();
121+
122+
final List<Integer> partitions = IntStream.range(0, 30).boxed().collect(toList());
123+
kafkaConsumer.updateEndOffsets(generateEndOffsets(partitions, 0));
124+
kafkaConsumer.updatePartitions(testingTopic, generatePartitionInfos(partitions));
125+
final Pattern pattern = Pattern.compile("Test.*");
126+
consumer.subscribe(pattern);
127+
128+
assertThat(kafkaConsumer.subscription()).containsExactlyElementsOf(testingTopics);
129+
assertThat(consumer.subscribed()).isTrue();
130+
}
131+
66132
@Test
67133
public void testSubscribedTwice() {
68134
consumer = LcKafkaConsumerBuilder.newBuilder(configs, consumerRecordHandler)
@@ -108,4 +174,19 @@ public void testGracefulShutdownWithoutShutdownWorkerPool() throws Exception {
108174
assertThat(consumer.closed()).isTrue();
109175
}
110176

177+
private List<PartitionInfo> generatePartitionInfos(List<Integer> partitions) {
178+
return partitions
179+
.stream()
180+
.map(p -> new PartitionInfo(testingTopic, p, null, null, null))
181+
.collect(toList());
182+
}
183+
184+
private Map<TopicPartition, Long> generateEndOffsets(List<Integer> partitions, long endOffset) {
185+
return partitions
186+
.stream()
187+
.map(p -> new TopicPartition(testingTopic, p))
188+
.collect(toMap(Function.identity(), (p) -> endOffset));
189+
190+
191+
}
111192
}

0 commit comments

Comments
 (0)