Skip to content

Commit 820a125

Browse files
authored
GH-1554: Partition wildcard for initial offsets
Resolves #1554 When using manual assignment it was difficult to specify the initial offset for multiple partitions - especially when dynamically determined. Add a wildcard to indicate the offset should be applied to all partitions. **cherry-pick to 2.5.x** * Call onPartitionsAssigned() with manual assignment Resolves #1553 * Fix test class name.
1 parent 069924c commit 820a125

File tree

10 files changed

+284
-39
lines changed

10 files changed

+284
-39
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -555,22 +555,32 @@ private List<TopicPartitionOffset> resolveTopicPartitionsList(TopicPartition top
555555
for (String partition : partitions) {
556556
resolvePartitionAsInteger((String) topic, resolveExpression(partition), result);
557557
}
558-
559-
for (PartitionOffset partitionOffset : partitionOffsets) {
560-
TopicPartitionOffset topicPartitionOffset =
561-
new TopicPartitionOffset((String) topic,
562-
resolvePartition(topic, partitionOffset),
563-
resolveInitialOffset(topic, partitionOffset),
564-
isRelative(topic, partitionOffset));
565-
if (!result.contains(topicPartitionOffset)) {
566-
result.add(topicPartitionOffset);
567-
}
568-
else {
569-
throw new IllegalArgumentException(
570-
String.format("@TopicPartition can't have the same partition configuration twice: [%s]",
571-
topicPartitionOffset));
558+
if (partitionOffsets.length == 1 && partitionOffsets[0].partition().equals("*")) {
559+
result.forEach(tpo -> {
560+
tpo.setOffset(resolveInitialOffset(tpo.getTopic(), partitionOffsets[0]));
561+
tpo.setRelativeToCurrent(isRelative(tpo.getTopic(), partitionOffsets[0]));
562+
});
563+
}
564+
else {
565+
for (PartitionOffset partitionOffset : partitionOffsets) {
566+
Assert.isTrue(!partitionOffset.partition().equals("*"), () ->
567+
"Partition wildcard '*' is only allowed in a single @PartitionOffset in " + result);
568+
TopicPartitionOffset topicPartitionOffset =
569+
new TopicPartitionOffset((String) topic,
570+
resolvePartition(topic, partitionOffset),
571+
resolveInitialOffset(topic, partitionOffset),
572+
isRelative(topic, partitionOffset));
573+
if (!result.contains(topicPartitionOffset)) {
574+
result.add(topicPartitionOffset);
575+
}
576+
else {
577+
throw new IllegalArgumentException(
578+
String.format("@TopicPartition can't have the same partition configuration twice: [%s]",
579+
topicPartitionOffset));
580+
}
572581
}
573582
}
583+
Assert.isTrue(result.size() > 0, () -> "At least one partition required for " + topic);
574584
return result;
575585
}
576586

spring-kafka/src/main/java/org/springframework/kafka/annotation/PartitionOffset.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,9 +31,10 @@
3131
public @interface PartitionOffset {
3232

3333
/**
34-
* The partition within the topic to listen on.
35-
* Property place holders and SpEL expressions are supported,
36-
* which must resolve to Integer (or String that can be parsed as Integer).
34+
* The partition within the topic to listen on. Property place holders and SpEL
35+
* expressions are supported, which must resolve to Integer (or String that can be
36+
* parsed as Integer). '*' indicates that the initial offset will be applied to all
37+
* partitions in the encompassing {@link TopicPartition}
3738
* @return partition within the topic.
3839
*/
3940
String partition();

spring-kafka/src/main/java/org/springframework/kafka/annotation/TopicPartition.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,7 +50,8 @@
5050
String[] partitions() default {};
5151

5252
/**
53-
* The partitions with initial offsets within the topic.
53+
* The partitions with initial offsets within the topic. There must only be one
54+
* instance of {@link PartitionOffset} if its 'partition' property is '*'.
5455
* Partitions specified here can't be duplicated in the {@link #partitions()}.
5556
* @return the {@link PartitionOffset} array.
5657
*/

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818

1919
import java.nio.ByteBuffer;
2020
import java.time.Duration;
21+
import java.util.AbstractMap.SimpleEntry;
2122
import java.util.ArrayList;
2223
import java.util.Arrays;
2324
import java.util.Collection;
2425
import java.util.Collections;
2526
import java.util.HashMap;
2627
import java.util.HashSet;
2728
import java.util.Iterator;
29+
import java.util.LinkedHashMap;
2830
import java.util.LinkedHashSet;
2931
import java.util.LinkedList;
3032
import java.util.List;
@@ -890,7 +892,7 @@ private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscr
890892
else {
891893
List<TopicPartitionOffset> topicPartitionsToAssign =
892894
Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
893-
this.definedPartitions = new HashMap<>(topicPartitionsToAssign.size());
895+
this.definedPartitions = new LinkedHashMap<>(topicPartitionsToAssign.size());
894896
for (TopicPartitionOffset topicPartition : topicPartitionsToAssign) {
895897
this.definedPartitions.put(topicPartition.getTopicPartition(),
896898
new OffsetMetadata(topicPartition.getOffset(), topicPartition.isRelativeToCurrent(),
@@ -2108,7 +2110,7 @@ private void initPartitionsIfNeeded() {
21082110
* called until we poll() the consumer. Users can use a ConsumerAwareRebalanceListener
21092111
* or a ConsumerSeekAware listener in that case.
21102112
*/
2111-
Map<TopicPartition, OffsetMetadata> partitions = new HashMap<>(this.definedPartitions);
2113+
Map<TopicPartition, OffsetMetadata> partitions = new LinkedHashMap<>(this.definedPartitions);
21122114
Set<TopicPartition> beginnings = partitions.entrySet().stream()
21132115
.filter(e -> SeekPosition.BEGINNING.equals(e.getValue().seekPosition))
21142116
.map(Entry::getKey)
@@ -2153,6 +2155,12 @@ else if (metadata.relativeToCurrent) {
21532155
}
21542156
}
21552157
}
2158+
if (this.consumerSeekAwareListener != null) {
2159+
this.consumerSeekAwareListener.onPartitionsAssigned(partitions.keySet().stream()
2160+
.map(tp -> new SimpleEntry<>(tp, this.consumer.position(tp)))
2161+
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())),
2162+
this.seekCallback);
2163+
}
21562164
}
21572165

21582166
private void logReset(TopicPartition topicPartition, long newOffset) {

spring-kafka/src/main/java/org/springframework/kafka/support/TopicPartitionOffset.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -70,11 +70,11 @@ public enum SeekPosition {
7070

7171
private final TopicPartition topicPartition;
7272

73-
private final Long offset;
73+
private final SeekPosition position;
7474

75-
private final boolean relativeToCurrent;
75+
private Long offset;
7676

77-
private final SeekPosition position;
77+
private boolean relativeToCurrent;
7878

7979
/**
8080
* Construct an instance with no initial offset management.
@@ -171,10 +171,28 @@ public Long getOffset() {
171171
return this.offset;
172172
}
173173

174+
/**
175+
* Set the offset.
176+
* @param offset the offset.
177+
* @since 2.5.5
178+
*/
179+
public void setOffset(Long offset) {
180+
this.offset = offset;
181+
}
182+
174183
public boolean isRelativeToCurrent() {
175184
return this.relativeToCurrent;
176185
}
177186

187+
/**
188+
* Set whether the offset is relative to the current position.
189+
* @param relativeToCurrent true for relative to current.
190+
* @since 2.5.5
191+
*/
192+
public void setRelativeToCurrent(boolean relativeToCurrent) {
193+
this.relativeToCurrent = relativeToCurrent;
194+
}
195+
178196
public SeekPosition getPosition() {
179197
return this.position;
180198
}
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Copyright 2017-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.BDDMockito.given;
22+
import static org.mockito.BDDMockito.willAnswer;
23+
import static org.mockito.Mockito.atLeastOnce;
24+
import static org.mockito.Mockito.inOrder;
25+
import static org.mockito.Mockito.mock;
26+
27+
import java.time.Duration;
28+
import java.util.Collection;
29+
import java.util.Collections;
30+
import java.util.Map;
31+
import java.util.concurrent.CountDownLatch;
32+
import java.util.concurrent.TimeUnit;
33+
34+
import org.apache.kafka.clients.consumer.Consumer;
35+
import org.apache.kafka.clients.consumer.ConsumerRecords;
36+
import org.apache.kafka.common.TopicPartition;
37+
import org.junit.jupiter.api.Test;
38+
import org.mockito.InOrder;
39+
40+
import org.springframework.beans.factory.annotation.Autowired;
41+
import org.springframework.context.annotation.Bean;
42+
import org.springframework.context.annotation.Configuration;
43+
import org.springframework.kafka.annotation.EnableKafka;
44+
import org.springframework.kafka.annotation.KafkaListener;
45+
import org.springframework.kafka.annotation.PartitionOffset;
46+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
47+
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
48+
import org.springframework.kafka.core.ConsumerFactory;
49+
import org.springframework.kafka.listener.ContainerProperties.AckMode;
50+
import org.springframework.kafka.test.utils.KafkaTestUtils;
51+
import org.springframework.test.annotation.DirtiesContext;
52+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
53+
54+
/**
55+
* @author Gary Russell
56+
* @since 2.0.1
57+
*
58+
*/
59+
@SpringJUnitConfig
60+
@DirtiesContext
61+
public class ManualAssignmentInitialSeekTests {
62+
63+
@SuppressWarnings("rawtypes")
64+
@Autowired
65+
private Consumer consumer;
66+
67+
@Autowired
68+
private Config config;
69+
70+
@Autowired
71+
private KafkaListenerEndpointRegistry registry;
72+
73+
/*
74+
* Deliver 6 records from three partitions, fail on the second record second
75+
* partition, first attempt; verify partition 0,1 committed and a total of 7 records
76+
* handled after seek.
77+
*/
78+
@SuppressWarnings("unchecked")
79+
@Test
80+
public void discardRemainingRecordsFromPollAndSeek() throws Exception {
81+
assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
82+
this.registry.stop();
83+
assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
84+
InOrder inOrder = inOrder(this.consumer);
85+
inOrder.verify(this.consumer).assign(any(Collection.class));
86+
inOrder.verify(this.consumer).seekToBeginning(any());
87+
inOrder.verify(this.consumer, atLeastOnce()).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
88+
assertThat(this.config.registerSeekCallbackCalled).isTrue();
89+
assertThat(this.config.partitionsAssignedCalled).isTrue();
90+
assertThat(this.config.assignments).hasSize(3);
91+
}
92+
93+
@Configuration
94+
@EnableKafka
95+
public static class Config extends AbstractConsumerSeekAware {
96+
97+
final CountDownLatch pollLatch = new CountDownLatch(1);
98+
99+
final CountDownLatch closeLatch = new CountDownLatch(1);
100+
101+
volatile boolean registerSeekCallbackCalled;
102+
103+
volatile boolean partitionsAssignedCalled;
104+
105+
volatile Map<TopicPartition, Long> assignments;
106+
107+
@KafkaListener(groupId = "grp",
108+
topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "foo",
109+
partitions = "#{'0,1,2'.split(',')}",
110+
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")))
111+
public void foo(String in) {
112+
}
113+
114+
@SuppressWarnings({ "rawtypes" })
115+
@Bean
116+
public ConsumerFactory consumerFactory() {
117+
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
118+
final Consumer consumer = consumer();
119+
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
120+
.willReturn(consumer);
121+
return consumerFactory;
122+
}
123+
124+
@SuppressWarnings({ "rawtypes", "unchecked" })
125+
@Bean
126+
public Consumer consumer() {
127+
final Consumer consumer = mock(Consumer.class);
128+
willAnswer(i -> {
129+
this.pollLatch.countDown();
130+
try {
131+
Thread.sleep(50);
132+
}
133+
catch (InterruptedException e) {
134+
Thread.currentThread().interrupt();
135+
}
136+
return new ConsumerRecords(Collections.emptyMap());
137+
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
138+
willAnswer(i -> {
139+
this.closeLatch.countDown();
140+
return null;
141+
}).given(consumer).close();
142+
return consumer;
143+
}
144+
145+
@SuppressWarnings({ "rawtypes", "unchecked" })
146+
@Bean
147+
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
148+
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
149+
factory.setConsumerFactory(consumerFactory());
150+
factory.setErrorHandler(new SeekToCurrentErrorHandler());
151+
factory.getContainerProperties().setAckMode(AckMode.RECORD);
152+
factory.getContainerProperties().setDeliveryAttemptHeader(true);
153+
return factory;
154+
}
155+
156+
@Override
157+
public void registerSeekCallback(ConsumerSeekCallback callback) {
158+
super.registerSeekCallback(callback);
159+
this.registerSeekCallbackCalled = true;
160+
}
161+
162+
@Override
163+
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
164+
super.onPartitionsAssigned(assignments, callback);
165+
this.partitionsAssignedCalled = true;
166+
this.assignments = assignments;
167+
callback.seekToBeginning(assignments.keySet());
168+
}
169+
170+
}
171+
172+
}

0 commit comments

Comments
 (0)