Skip to content

Commit 0f43bf2

Browse files
authored
Manual Assignment: Support Partition Ranges, Lists
* Polishing; parse internally; no need for SpEL; fix single range `0-5`. * Avoid second `stream()` collection.
1 parent 74feb40 commit 0f43bf2

File tree

4 files changed

+96
-8
lines changed

4 files changed

+96
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import java.util.concurrent.ConcurrentHashMap;
3535
import java.util.concurrent.atomic.AtomicInteger;
3636
import java.util.regex.Pattern;
37+
import java.util.stream.Collectors;
38+
import java.util.stream.Stream;
3739

3840
import org.apache.commons.logging.LogFactory;
3941

@@ -680,7 +682,9 @@ private void resolvePartitionAsInteger(String topic, Object resolvedValue,
680682
else if (resolvedValue instanceof String) {
681683
Assert.state(StringUtils.hasText((String) resolvedValue),
682684
() -> "partition in @TopicPartition for topic '" + topic + "' cannot be empty");
683-
result.add(new TopicPartitionOffset(topic, Integer.valueOf((String) resolvedValue)));
685+
result.addAll(parsePartitions((String) resolvedValue)
686+
.map(part -> new TopicPartitionOffset(topic, part))
687+
.collect(Collectors.toList()));
684688
}
685689
else if (resolvedValue instanceof Integer[]) {
686690
for (Integer partition : (Integer[]) resolvedValue) {
@@ -787,6 +791,38 @@ private <T> Collection<T> getBeansOfType(Class<T> type) {
787791
}
788792
}
789793

794+
/**
795+
* Parse a list of partitions into a {@link List}. Example: "0-5,10-15".
796+
* @param partsString the comma-delimited list of partitions/ranges.
797+
* @return the stream of partition numbers, sorted and de-duplicated.
798+
* @since 2.6.4
799+
*/
800+
private Stream<Integer> parsePartitions(String partsString) {
801+
String[] partsStrings = partsString.split(",");
802+
if (partsStrings.length == 1 && !partsStrings[0].contains("-")) {
803+
return Stream.of(Integer.parseInt(partsStrings[0].trim()));
804+
}
805+
List<Integer> parts = new ArrayList<>();
806+
for (String part : partsStrings) {
807+
if (part.contains("-")) {
808+
String[] startEnd = part.split("-");
809+
Assert.state(startEnd.length == 2, "Only one hyphen allowed for a range of partitions: " + part);
810+
int start = Integer.parseInt(startEnd[0].trim());
811+
int end = Integer.parseInt(startEnd[1].trim());
812+
Assert.state(end >= start, "Invalid range: " + part);
813+
for (int i = start; i <= end; i++) {
814+
parts.add(i);
815+
}
816+
}
817+
else {
818+
parsePartitions(part).forEach(p -> parts.add(p));
819+
}
820+
}
821+
return parts.stream()
822+
.sorted()
823+
.distinct();
824+
}
825+
790826
/**
791827
* An {@link MessageHandlerMethodFactory} adapter that offers a configurable underlying
792828
* instance to use. Useful if the factory to use is determined once the endpoints

spring-kafka/src/main/java/org/springframework/kafka/annotation/TopicPartition.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@
4040
String topic();
4141

4242
/**
43-
* The partitions within the topic.
44-
* Partitions specified here can't be duplicated in {@link #partitionOffsets()}.
45-
* @return the partitions within the topic. Property place
46-
* holders and SpEL expressions are supported, which must
47-
* resolve to Integers (or Strings that can be parsed as
48-
* Integers).
43+
* The partitions within the topic. Partitions specified here can't be duplicated in
44+
* {@link #partitionOffsets()}. Each string can contain a comma-delimited list of
45+
* partitions, or ranges of partitions (e.g. {@code 0-5, 7, 10-15}.
46+
* @return the partitions within the topic. Property place holders and SpEL
47+
* expressions are supported, which must resolve to Integers (or Strings that can be
48+
* parsed as Integers).
4949
*/
5050
String[] partitions() default {};
5151

spring-kafka/src/test/java/org/springframework/kafka/listener/ManualAssignmentInitialSeekTests.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,17 @@
2424
import static org.mockito.Mockito.inOrder;
2525
import static org.mockito.Mockito.mock;
2626

27+
import java.lang.reflect.Method;
2728
import java.time.Duration;
29+
import java.util.Arrays;
2830
import java.util.Collection;
2931
import java.util.Collections;
32+
import java.util.List;
3033
import java.util.Map;
3134
import java.util.concurrent.CountDownLatch;
3235
import java.util.concurrent.TimeUnit;
36+
import java.util.stream.Collectors;
37+
import java.util.stream.Stream;
3338

3439
import org.apache.kafka.clients.consumer.Consumer;
3540
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -42,11 +47,13 @@
4247
import org.springframework.context.annotation.Configuration;
4348
import org.springframework.kafka.annotation.EnableKafka;
4449
import org.springframework.kafka.annotation.KafkaListener;
50+
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
4551
import org.springframework.kafka.annotation.PartitionOffset;
4652
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
4753
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
4854
import org.springframework.kafka.core.ConsumerFactory;
4955
import org.springframework.kafka.listener.ContainerProperties.AckMode;
56+
import org.springframework.kafka.support.TopicPartitionOffset;
5057
import org.springframework.kafka.test.utils.KafkaTestUtils;
5158
import org.springframework.test.annotation.DirtiesContext;
5259
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
@@ -77,7 +84,7 @@ public class ManualAssignmentInitialSeekTests {
7784
*/
7885
@SuppressWarnings("unchecked")
7986
@Test
80-
public void discardRemainingRecordsFromPollAndSeek() throws Exception {
87+
void discardRemainingRecordsFromPollAndSeek() throws Exception {
8188
assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
8289
this.registry.stop();
8390
assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
@@ -90,6 +97,29 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception {
9097
assertThat(this.config.assignments).hasSize(3);
9198
}
9299

100+
@Test
101+
void parsePartitions() {
102+
TopicPartitionOffset[] topicPartitions = registry.getListenerContainer("pp")
103+
.getContainerProperties()
104+
.getTopicPartitions();
105+
List<Integer> collected = Arrays.stream(topicPartitions).map(tp -> tp.getPartition())
106+
.collect(Collectors.toList());
107+
assertThat(collected).containsExactly(0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15);
108+
}
109+
110+
@SuppressWarnings({ "rawtypes", "unchecked" })
111+
@Test
112+
void parseUnitTests() throws Exception {
113+
Method parser = KafkaListenerAnnotationBeanPostProcessor.class.getDeclaredMethod("parsePartitions",
114+
String.class);
115+
parser.setAccessible(true);
116+
KafkaListenerAnnotationBeanPostProcessor bpp = new KafkaListenerAnnotationBeanPostProcessor();
117+
assertThat((Stream<Integer>) parser.invoke(bpp, "0-2")).containsExactly(0, 1, 2);
118+
assertThat((Stream<Integer>) parser.invoke(bpp, " 0-2 , 5")).containsExactly(0, 1, 2, 5);
119+
assertThat((Stream<Integer>) parser.invoke(bpp, "0-2,5-6")).containsExactly(0, 1, 2, 5, 6);
120+
assertThat((Stream<Integer>) parser.invoke(bpp, "5-6,0-2,0-2")).containsExactly(0, 1, 2, 5, 6);
121+
}
122+
93123
@Configuration
94124
@EnableKafka
95125
public static class Config extends AbstractConsumerSeekAware {
@@ -111,6 +141,12 @@ public static class Config extends AbstractConsumerSeekAware {
111141
public void foo(String in) {
112142
}
113143

144+
@KafkaListener(id = "pp", autoStartup = "false",
145+
topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "foo",
146+
partitions = "0-5, 7, 10-15"))
147+
public void bar(String in) {
148+
}
149+
114150
@SuppressWarnings({ "rawtypes" })
115151
@Bean
116152
public ConsumerFactory consumerFactory() {

src/reference/asciidoc/kafka.adoc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,6 +1304,22 @@ There must only be one `@PartitionOffset` with the wildcard in each `@TopicParti
13041304
In addition, when the listener implements `ConsumerSeekAware`, `onPartitionsAssigned` is now called, even when using manual assignment.
13051305
This allows, for example, any arbitrary seek operations at that time.
13061306

1307+
Starting with version 2.6.4, you can specify a comma-delimited list of partitions, or partition ranges:
1308+
1309+
====
1310+
[source, java]
1311+
----
1312+
@KafkaListener(id = "pp", autoStartup = "false",
1313+
topicPartitions = @TopicPartition(topic = "topic1",
1314+
partitions = "0-5, 7, 10-15"))
1315+
public void process(String in) {
1316+
...
1317+
}
1318+
----
1319+
====
1320+
1321+
The range is inclusive; the example above will assign partitions `0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15`.
1322+
13071323
====== Manual Acknowledgment
13081324

13091325
When using manual `AckMode`, you can also provide the listener with the `Acknowledgment`.

0 commit comments

Comments
 (0)