Skip to content

Commit 39276ea

Browse files
tomazfernandesgaryrussell
authored andcommitted
GH-2220: Fix TopicPartitionOffset for Retry Topics (#2223)
* GH-2220: Fix TopicPartitionOffset for Retry Topics Resolves #2220 * Register NewTopics for Retry Topics only once per topic. * Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic. * Move default partition to constant field. * Address review comments Make EndpointCustomizerFactory ctor public * Remove main endpoint customization * Remove failing @SuppressWarnings * Revert TPO hashCode changes Adjust tests Improve TPO and topics handling * Enable customizing main endpoint TPOs
1 parent f831072 commit 39276ea

File tree

8 files changed

+376
-18
lines changed

8 files changed

+376
-18
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,9 @@ public KafkaTemplate<String, Object> kafkaTemplate() {
148148
----
149149
====
150150

151+
IMPORTANT: Multiple `@KafkaListener` annotations can be used for the same topic with or without manual partition assignment along with non-blocking retries, but only one configuration will be used for a given topic.
152+
It's best to use a single `RetryTopicConfiguration` bean for configuration of such topics; if multiple `@RetryableTopic` annotations are being used for the same topic, all of them should have the same values, otherwise one of them will be applied to all of that topic's listeners and the other annotations' values will be ignored.
153+
151154
==== Features
152155

153156
Most of the features are available both for the `@RetryableTopic` annotation and the `RetryTopicConfiguration` beans.

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -175,6 +175,16 @@ public long delay() {
175175
return this.delayMs;
176176
}
177177

178+
/**
179+
* Return the number of partitions the
180+
* retry topics should be created with.
181+
* @return the number of partitions.
182+
* @since 2.7.13
183+
*/
184+
public int numPartitions() {
185+
return this.numPartitions;
186+
}
187+
178188
@Override
179189
public boolean equals(Object o) {
180190
if (this == o) {

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020
import java.util.Arrays;
2121
import java.util.Collection;
2222
import java.util.stream.Collectors;
23+
import java.util.stream.Stream;
2324

2425
import org.springframework.beans.factory.BeanFactory;
2526
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
@@ -40,6 +41,8 @@
4041
*/
4142
public class EndpointCustomizerFactory {
4243

44+
private static final int DEFAULT_PARTITION_FOR_MANUAL_ASSIGNMENT = 0;
45+
4346
private final DestinationTopic.Properties destinationProperties;
4447

4548
private final EndpointHandlerMethod beanMethod;
@@ -48,7 +51,7 @@ public class EndpointCustomizerFactory {
4851

4952
private final RetryTopicNamesProviderFactory retryTopicNamesProviderFactory;
5053

51-
EndpointCustomizerFactory(DestinationTopic.Properties destinationProperties, EndpointHandlerMethod beanMethod,
54+
public EndpointCustomizerFactory(DestinationTopic.Properties destinationProperties, EndpointHandlerMethod beanMethod,
5255
BeanFactory beanFactory, RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {
5356

5457
this.destinationProperties = destinationProperties;
@@ -69,7 +72,14 @@ protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties pr
6972
Collection<EndpointCustomizer.TopicNamesHolder> topics = customizeAndRegisterTopics(namesProvider, endpoint);
7073
endpoint.setId(namesProvider.getEndpointId(endpoint));
7174
endpoint.setGroupId(namesProvider.getGroupId(endpoint));
72-
endpoint.setTopics(topics.stream().map(EndpointCustomizer.TopicNamesHolder::getCustomizedTopic).toArray(String[]::new));
75+
if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) {
76+
endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider,
77+
endpoint.getTopicPartitionsToAssign()));
78+
}
79+
else {
80+
endpoint.setTopics(endpoint.getTopics().stream()
81+
.map(namesProvider::getTopicName).toArray(String[]::new));
82+
}
7383
endpoint.setClientIdPrefix(namesProvider.getClientIdPrefix(endpoint));
7484
endpoint.setGroup(namesProvider.getGroup(endpoint));
7585
endpoint.setBean(bean);
@@ -78,6 +88,29 @@ protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties pr
7888
};
7989
}
8090

91+
private static TopicPartitionOffset[] getTopicPartitions(DestinationTopic.Properties properties,
92+
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider,
93+
TopicPartitionOffset[] topicPartitionOffsets) {
94+
return Stream.of(topicPartitionOffsets)
95+
.map(tpo -> properties.isMainEndpoint()
96+
? getTPOForMainTopic(namesProvider, tpo)
97+
: getTPOForRetryTopics(properties, namesProvider, tpo))
98+
.toArray(TopicPartitionOffset[]::new);
99+
}
100+
101+
private static TopicPartitionOffset getTPOForRetryTopics(DestinationTopic.Properties properties, RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider, TopicPartitionOffset tpo) {
102+
return new TopicPartitionOffset(namesProvider.getTopicName(tpo.getTopic()),
103+
tpo.getPartition() <= properties.numPartitions() ? tpo.getPartition() : DEFAULT_PARTITION_FOR_MANUAL_ASSIGNMENT,
104+
(Long) null);
105+
}
106+
107+
private static TopicPartitionOffset getTPOForMainTopic(RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider, TopicPartitionOffset tpo) {
108+
TopicPartitionOffset newTpo = new TopicPartitionOffset(namesProvider.getTopicName(tpo.getTopic()),
109+
tpo.getPartition(), tpo.getOffset(), tpo.getPosition());
110+
newTpo.setRelativeToCurrent(tpo.isRelativeToCurrent());
111+
return newTpo;
112+
}
113+
81114
protected Collection<EndpointCustomizer.TopicNamesHolder> customizeAndRegisterTopics(
82115
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider,
83116
MethodKafkaListenerEndpoint<?, ?> endpoint) {

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -339,10 +339,14 @@ private Consumer<Collection<String>> getTopicCreationFunction(RetryTopicConfigur
339339
}
340340

341341
protected void createNewTopicBeans(Collection<String> topics, RetryTopicConfiguration.TopicCreation config) {
342-
topics.forEach(topic ->
343-
((DefaultListableBeanFactory) this.beanFactory)
344-
.registerSingleton(topic + "-topicRegistrationBean",
345-
new NewTopic(topic, config.getNumPartitions(), config.getReplicationFactor()))
342+
topics.forEach(topic -> {
343+
DefaultListableBeanFactory bf = ((DefaultListableBeanFactory) this.beanFactory);
344+
String beanName = topic + "-topicRegistrationBean";
345+
if (!bf.containsBean(beanName)) {
346+
bf.registerSingleton(beanName,
347+
new NewTopic(topic, config.getNumPartitions(), config.getReplicationFactor()));
348+
}
349+
}
346350
);
347351
}
348352

spring-kafka/src/main/java/org/springframework/kafka/support/TopicPartitionOffset.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2020 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.retrytopic;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.BDDMockito.given;
21+
22+
import java.lang.reflect.Method;
23+
import java.util.Arrays;
24+
import java.util.List;
25+
import java.util.function.Predicate;
26+
27+
import org.junit.jupiter.api.Test;
28+
import org.junit.jupiter.api.extension.ExtendWith;
29+
import org.mockito.Mock;
30+
import org.mockito.junit.jupiter.MockitoExtension;
31+
32+
import org.springframework.beans.factory.BeanFactory;
33+
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
34+
import org.springframework.kafka.support.TopicPartitionOffset;
35+
36+
/**
37+
* @author Tomaz Fernandes
38+
* @since 2.8.5
39+
*/
40+
@ExtendWith(MockitoExtension.class)
41+
class EndpointCustomizerFactoryTests {
42+
43+
@Mock
44+
private DestinationTopic.Properties properties;
45+
46+
@Mock
47+
private EndpointHandlerMethod beanMethod;
48+
49+
@Mock
50+
private BeanFactory beanFactory;
51+
52+
@Mock
53+
private RetryTopicNamesProviderFactory retryTopicNamesProviderFactory;
54+
55+
@Mock
56+
private MethodKafkaListenerEndpoint<?, ?> endpoint;
57+
58+
private final String[] topics = {"myTopic1", "myTopic2"};
59+
60+
private final Method method = EndpointCustomizerFactory.class.getDeclaredMethods()[0];
61+
62+
@Test
63+
void shouldNotCustomizeEndpointForMainTopicWithTopics() {
64+
65+
given(beanMethod.resolveBean(this.beanFactory)).willReturn(method);
66+
given(endpoint.getTopics()).willReturn(Arrays.asList(topics));
67+
given(properties.suffix()).willReturn("");
68+
RetryTopicNamesProviderFactory.RetryTopicNamesProvider provider =
69+
new SuffixingRetryTopicNamesProviderFactory().createRetryTopicNamesProvider(properties);
70+
given(retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties)).willReturn(provider);
71+
72+
EndpointCustomizer endpointCustomizer = new EndpointCustomizerFactory(properties, beanMethod,
73+
beanFactory, retryTopicNamesProviderFactory).createEndpointCustomizer();
74+
75+
List<EndpointCustomizer.TopicNamesHolder> holders =
76+
(List<EndpointCustomizer.TopicNamesHolder>) endpointCustomizer.customizeEndpointAndCollectTopics(endpoint);
77+
78+
assertThat(holders).hasSize(2).element(0)
79+
.matches(assertMainTopic(0));
80+
assertThat(holders).element(1)
81+
.matches(assertMainTopic(1));
82+
83+
}
84+
85+
@Test
86+
void shouldNotCustomizeEndpointForMainTopicWithTPO() {
87+
88+
given(beanMethod.resolveBean(this.beanFactory)).willReturn(method);
89+
given(properties.isMainEndpoint()).willReturn(true);
90+
given(properties.suffix()).willReturn("");
91+
RetryTopicNamesProviderFactory.RetryTopicNamesProvider provider =
92+
new SuffixingRetryTopicNamesProviderFactory().createRetryTopicNamesProvider(properties);
93+
given(retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties)).willReturn(provider);
94+
95+
String testString = "testString";
96+
MethodKafkaListenerEndpoint<Object, Object> endpointTPO = new MethodKafkaListenerEndpoint<>();
97+
endpointTPO.setTopicPartitions(new TopicPartitionOffset(topics[0], 0, 0L),
98+
new TopicPartitionOffset(topics[1], 1, 1L));
99+
endpointTPO.setMethod(this.method);
100+
endpointTPO.setId(testString);
101+
endpointTPO.setClientIdPrefix(testString);
102+
endpointTPO.setGroup(testString);
103+
104+
EndpointCustomizer endpointCustomizer = new EndpointCustomizerFactory(properties, beanMethod,
105+
beanFactory, retryTopicNamesProviderFactory).createEndpointCustomizer();
106+
107+
List<EndpointCustomizer.TopicNamesHolder> holders =
108+
(List<EndpointCustomizer.TopicNamesHolder>) endpointCustomizer.customizeEndpointAndCollectTopics(endpointTPO);
109+
110+
assertThat(holders).hasSize(2).element(0)
111+
.matches(assertMainTopic(0));
112+
assertThat(holders).element(1)
113+
.matches(assertMainTopic(1));
114+
115+
assertThat(endpointTPO.getTopics())
116+
.isEmpty();
117+
118+
TopicPartitionOffset[] topicPartitionsToAssign = endpointTPO.getTopicPartitionsToAssign();
119+
assertThat(topicPartitionsToAssign).hasSize(2);
120+
assertThat(equalsTopicPartitionOffset(topicPartitionsToAssign[0],
121+
new TopicPartitionOffset(topics[0], 0, 0L))).isTrue();
122+
assertThat(equalsTopicPartitionOffset(topicPartitionsToAssign[1],
123+
new TopicPartitionOffset(topics[1], 1, 1L))).isTrue();
124+
125+
}
126+
127+
private Predicate<EndpointCustomizer.TopicNamesHolder> assertMainTopic(int index) {
128+
return holder -> holder.getCustomizedTopic().equals(topics[index])
129+
&& holder.getMainTopic().equals(topics[index]);
130+
}
131+
132+
@Test
133+
void shouldCustomizeEndpointForRetryTopic() {
134+
135+
MethodKafkaListenerEndpoint<Object, Object> endpoint = new MethodKafkaListenerEndpoint<>();
136+
String testString = "testString";
137+
endpoint.setTopics(this.topics);
138+
endpoint.setMethod(this.method);
139+
endpoint.setId(testString);
140+
endpoint.setClientIdPrefix(testString);
141+
endpoint.setGroup(testString);
142+
143+
MethodKafkaListenerEndpoint<Object, Object> endpointTPO = new MethodKafkaListenerEndpoint<>();
144+
endpointTPO.setTopicPartitions(new TopicPartitionOffset(topics[0], 0, 0L),
145+
new TopicPartitionOffset(topics[1], 1, 1L));
146+
endpointTPO.setMethod(this.method);
147+
endpointTPO.setId(testString);
148+
endpointTPO.setClientIdPrefix(testString);
149+
endpointTPO.setGroup(testString);
150+
151+
String suffix = "-retry";
152+
given(beanMethod.resolveBean(this.beanFactory)).willReturn(method);
153+
given(properties.isMainEndpoint()).willReturn(false);
154+
given(properties.suffix()).willReturn(suffix);
155+
given(properties.numPartitions()).willReturn(2);
156+
157+
RetryTopicNamesProviderFactory.RetryTopicNamesProvider provider =
158+
new SuffixingRetryTopicNamesProviderFactory().createRetryTopicNamesProvider(properties);
159+
given(retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties)).willReturn(provider);
160+
161+
EndpointCustomizer endpointCustomizer = new EndpointCustomizerFactory(properties, beanMethod,
162+
beanFactory, retryTopicNamesProviderFactory).createEndpointCustomizer();
163+
164+
List<EndpointCustomizer.TopicNamesHolder> holders =
165+
(List<EndpointCustomizer.TopicNamesHolder>) endpointCustomizer.customizeEndpointAndCollectTopics(endpoint);
166+
167+
String topic1WithSuffix = topics[0] + suffix;
168+
String topic2WithSuffix = topics[1] + suffix;
169+
assertThat(holders).hasSize(2).element(0)
170+
.matches(holder -> holder.getMainTopic().equals(topics[0])
171+
&& holder.getCustomizedTopic().equals(topic1WithSuffix));
172+
assertThat(holders).hasSize(2).element(1)
173+
.matches(holder -> holder.getMainTopic().equals(topics[1])
174+
&& holder.getCustomizedTopic().equals(topic2WithSuffix));
175+
176+
String testStringSuffix = testString + suffix;
177+
178+
assertThat(endpoint.getTopics())
179+
.contains(topic1WithSuffix, topic2WithSuffix);
180+
assertThat(endpoint.getId())
181+
.isEqualTo(testStringSuffix);
182+
assertThat(endpoint.getClientIdPrefix())
183+
.isEqualTo(testStringSuffix);
184+
assertThat(endpoint.getGroup())
185+
.isEqualTo(testStringSuffix);
186+
assertThat(endpoint.getTopicPartitionsToAssign()).isEmpty();
187+
188+
List<EndpointCustomizer.TopicNamesHolder> holdersTPO =
189+
(List<EndpointCustomizer.TopicNamesHolder>) endpointCustomizer.customizeEndpointAndCollectTopics(endpointTPO);
190+
191+
assertThat(holdersTPO).hasSize(2).element(0)
192+
.matches(holder -> holder.getMainTopic().equals(topics[0])
193+
&& holder.getCustomizedTopic().equals(topic1WithSuffix));
194+
assertThat(holdersTPO).hasSize(2).element(1)
195+
.matches(holder -> holder.getMainTopic().equals(topics[1])
196+
&& holder.getCustomizedTopic().equals(topic2WithSuffix));
197+
198+
assertThat(endpointTPO.getTopics())
199+
.isEmpty();
200+
201+
TopicPartitionOffset[] topicPartitionsToAssign = endpointTPO.getTopicPartitionsToAssign();
202+
assertThat(topicPartitionsToAssign).hasSize(2);
203+
assertThat(equalsTopicPartitionOffset(topicPartitionsToAssign[0],
204+
new TopicPartitionOffset(topic1WithSuffix, 0, (Long) null))).isTrue();
205+
assertThat(equalsTopicPartitionOffset(topicPartitionsToAssign[1],
206+
new TopicPartitionOffset(topic2WithSuffix, 1, (Long) null))).isTrue();
207+
208+
assertThat(endpointTPO.getId())
209+
.isEqualTo(testStringSuffix);
210+
assertThat(endpointTPO.getClientIdPrefix())
211+
.isEqualTo(testStringSuffix);
212+
assertThat(endpointTPO.getGroup())
213+
.isEqualTo(testStringSuffix);
214+
}
215+
216+
private boolean equalsTopicPartitionOffset(TopicPartitionOffset tpo1, TopicPartitionOffset tpo2) {
217+
return tpo1.getTopicPartition().equals(tpo2.getTopicPartition()) &&
218+
((tpo1.getOffset() == null && tpo2.getOffset() == null) ||
219+
(tpo1.getOffset() != null && tpo1.getOffset().equals(tpo2.getOffset())));
220+
221+
}
222+
}

0 commit comments

Comments
 (0)