Skip to content

Commit f744094

Browse files
garyrussellartembilan
authored andcommitted
GH-1916: Fix RetryableTopic for SpEL and Prop PH
Resolves #1916 Previously, the `topics` property was used during reply topic configuration before it had been resolved for SpEL expressions and Property Placeholders. **cherry-pick to 2.7.x** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java Optimize ListenerScope setup. GH-1919: Fix TopicPartitionOffset Matching Resolves #1919 When looking for a retry configuration instance, matches only worked when `topics` is provided, not `topicPartition`. If there is no content in the `topics` property, pull the topic names from `topicPartition`s (if any).
1 parent be5cc8d commit f744094

File tree

4 files changed

+322
-29
lines changed

4 files changed

+322
-29
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,11 @@ private void processMultiMethodListeners(Collection<KafkaListener> classLevelLis
438438
for (KafkaListener classLevelListener : classLevelListeners) {
439439
MultiMethodKafkaListenerEndpoint<K, V> endpoint =
440440
new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean);
441-
processListener(endpoint, classLevelListener, bean, beanName);
441+
String beanRef = classLevelListener.beanRef();
442+
this.listenerScope.addListener(beanRef, bean);
443+
processListener(endpoint, classLevelListener, bean, beanName, resolveTopics(classLevelListener),
444+
resolveTopicPartitions(classLevelListener));
445+
this.listenerScope.removeListener(beanRef);
442446
}
443447
}
444448

@@ -447,37 +451,49 @@ protected void processKafkaListener(KafkaListener kafkaListener, Method method,
447451
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
448452
endpoint.setMethod(methodToUse);
449453

450-
if (!processMainAndRetryListeners(kafkaListener, bean, beanName, methodToUse, endpoint)) {
451-
processListener(endpoint, kafkaListener, bean, beanName);
454+
String beanRef = kafkaListener.beanRef();
455+
this.listenerScope.addListener(beanRef, bean);
456+
String[] topics = resolveTopics(kafkaListener);
457+
TopicPartitionOffset[] tps = resolveTopicPartitions(kafkaListener);
458+
if (!processMainAndRetryListeners(kafkaListener, bean, beanName, methodToUse, endpoint, topics, tps)) {
459+
processListener(endpoint, kafkaListener, bean, beanName, topics, tps);
452460
}
461+
this.listenerScope.removeListener(beanRef);
453462
}
454463

455464
private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object bean, String beanName,
456-
Method methodToUse, MethodKafkaListenerEndpoint<K, V> endpoint) {
465+
Method methodToUse, MethodKafkaListenerEndpoint<K, V> endpoint, String[] topics,
466+
TopicPartitionOffset[] tps) {
467+
468+
String[] retryableCandidates = topics;
469+
if (retryableCandidates.length == 0 && tps.length > 0) {
470+
retryableCandidates = Arrays.stream(tps)
471+
.map(tp -> tp.getTopic())
472+
.distinct()
473+
.collect(Collectors.toList())
474+
.toArray(new String[0]);
475+
}
457476

458477
RetryTopicConfiguration retryTopicConfiguration = new RetryTopicConfigurationProvider(this.beanFactory,
459478
this.resolver, this.expressionContext)
460-
.findRetryConfigurationFor(kafkaListener.topics(), methodToUse, bean);
479+
.findRetryConfigurationFor(retryableCandidates, methodToUse, bean);
461480

462481
if (retryTopicConfiguration == null) {
463-
this.logger.debug("No retry topic configuration found for topics " + Arrays.asList(kafkaListener.topics()));
482+
String[] candidates = retryableCandidates;
483+
this.logger.debug(() ->
484+
"No retry topic configuration found for topics " + Arrays.toString(candidates));
464485
return false;
465486
}
466487

467488
RetryTopicConfigurer.EndpointProcessor endpointProcessor = endpointToProcess ->
468-
this.processKafkaListenerAnnotationForRetryTopic(endpointToProcess, kafkaListener, bean);
469-
470-
String beanRef = kafkaListener.beanRef();
471-
this.listenerScope.addListener(beanRef, bean);
489+
this.processKafkaListenerAnnotationForRetryTopic(endpointToProcess, kafkaListener, bean, topics, tps);
472490

473491
KafkaListenerContainerFactory<?> factory =
474492
resolveContainerFactory(kafkaListener, resolve(kafkaListener.containerFactory()), beanName);
475493

476494
getRetryTopicConfigurer()
477495
.processMainAndRetryListeners(endpointProcessor, endpoint, retryTopicConfiguration,
478496
this.registrar, factory, this.defaultContainerFactoryBeanName);
479-
480-
this.listenerScope.removeListener(beanRef);
481497
return true;
482498
}
483499

@@ -539,39 +555,34 @@ private Method checkProxy(Method methodArg, Object bean) {
539555
}
540556

541557
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
542-
Object bean, String beanName) {
543-
544-
String beanRef = kafkaListener.beanRef();
545-
if (StringUtils.hasText(beanRef)) {
546-
this.listenerScope.addListener(beanRef, bean);
547-
}
558+
Object bean, String beanName, String[] topics, TopicPartitionOffset[] tps) {
548559

549-
processKafkaListenerAnnotationBeforeRegistration(endpoint, kafkaListener, bean);
560+
processKafkaListenerAnnotationBeforeRegistration(endpoint, kafkaListener, bean, topics, tps);
550561

551562
String containerFactory = resolve(kafkaListener.containerFactory());
552563
KafkaListenerContainerFactory<?> listenerContainerFactory = resolveContainerFactory(kafkaListener, containerFactory, beanName);
553564

554565
this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
555566

556567
processKafkaListenerEndpointAfterRegistration(endpoint, kafkaListener);
557-
558-
if (StringUtils.hasText(beanRef)) {
559-
this.listenerScope.removeListener(beanRef);
560-
}
561568
}
562569

563-
private void processKafkaListenerAnnotationForRetryTopic(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean) {
564-
processKafkaListenerAnnotationBeforeRegistration(endpoint, kafkaListener, bean);
570+
private void processKafkaListenerAnnotationForRetryTopic(MethodKafkaListenerEndpoint<?, ?> endpoint,
571+
KafkaListener kafkaListener, Object bean, String[] topics, TopicPartitionOffset[] tps) {
572+
573+
processKafkaListenerAnnotationBeforeRegistration(endpoint, kafkaListener, bean, topics, tps);
565574
processKafkaListenerEndpointAfterRegistration(endpoint, kafkaListener);
566575
}
567576

568-
private void processKafkaListenerAnnotationBeforeRegistration(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean) {
577+
private void processKafkaListenerAnnotationBeforeRegistration(MethodKafkaListenerEndpoint<?, ?> endpoint,
578+
KafkaListener kafkaListener, Object bean, String[] topics, TopicPartitionOffset[] tps) {
579+
569580
endpoint.setBean(bean);
570581
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
571582
endpoint.setId(getEndpointId(kafkaListener));
572583
endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
573-
endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
574-
endpoint.setTopics(resolveTopics(kafkaListener));
584+
endpoint.setTopicPartitions(tps);
585+
endpoint.setTopics(topics);
575586
endpoint.setTopicPattern(resolvePattern(kafkaListener));
576587
endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
577588
String group = kafkaListener.containerGroup();
@@ -1076,7 +1087,7 @@ public String convert(byte[] source) {
10761087

10771088
}
10781089

1079-
private static class ListenerScope implements Scope {
1090+
static class ListenerScope implements Scope {
10801091

10811092
private final Map<String, Object> listeners = new HashMap<>();
10821093

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ private Collection<String> getTopics(MethodKafkaListenerEndpoint<?, ?> endpoint)
9595
if (topicPartitionsToAssign != null && topicPartitionsToAssign.length > 0) {
9696
topics = Arrays.stream(topicPartitionsToAssign)
9797
.map(TopicPartitionOffset::getTopic)
98+
.distinct()
9899
.collect(Collectors.toList());
99100
}
100101
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.retrytopic;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.TimeUnit;
25+
26+
import org.apache.kafka.clients.admin.AdminClientConfig;
27+
import org.apache.kafka.clients.consumer.Consumer;
28+
import org.apache.kafka.common.PartitionInfo;
29+
import org.junit.jupiter.api.Test;
30+
31+
import org.springframework.beans.factory.annotation.Autowired;
32+
import org.springframework.context.annotation.Bean;
33+
import org.springframework.context.annotation.Configuration;
34+
import org.springframework.kafka.annotation.EnableKafka;
35+
import org.springframework.kafka.annotation.KafkaListener;
36+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
37+
import org.springframework.kafka.config.KafkaListenerContainerFactory;
38+
import org.springframework.kafka.core.ConsumerFactory;
39+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
40+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
41+
import org.springframework.kafka.core.KafkaAdmin;
42+
import org.springframework.kafka.core.KafkaTemplate;
43+
import org.springframework.kafka.core.ProducerFactory;
44+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
45+
import org.springframework.kafka.test.context.EmbeddedKafka;
46+
import org.springframework.kafka.test.utils.KafkaTestUtils;
47+
import org.springframework.test.annotation.DirtiesContext;
48+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
49+
50+
/**
51+
* @author Gary Russell
52+
* @since 2.7.7
53+
*
54+
*/
55+
@SpringJUnitConfig
56+
@DirtiesContext
57+
@EmbeddedKafka(topics = RetryTopicConfigurationIntegrationTests.TOPIC1, partitions = 1)
58+
class RetryTopicConfigurationIntegrationTests {
59+
60+
public static final String TOPIC1 = "RetryTopicConfigurationIntegrationTests.1";
61+
62+
@Test
63+
void includeTopic(@Autowired EmbeddedKafkaBroker broker, @Autowired ConsumerFactory<Integer, String> cf,
64+
@Autowired KafkaTemplate<Integer, String> template, @Autowired Config config) throws InterruptedException {
65+
66+
Consumer<Integer, String> consumer = cf.createConsumer("grp2", "");
67+
Map<String, List<PartitionInfo>> topics = consumer.listTopics();
68+
assertThat(topics.keySet()).contains("RetryTopicConfigurationIntegrationTests.1",
69+
"RetryTopicConfigurationIntegrationTests.1-dlt", "RetryTopicConfigurationIntegrationTests.1-retry-100",
70+
"RetryTopicConfigurationIntegrationTests.1-retry-110");
71+
template.send(TOPIC1, "foo");
72+
assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue();
73+
}
74+
75+
@Configuration(proxyBeanMethods = false)
76+
@EnableKafka
77+
static class Config {
78+
79+
private final CountDownLatch latch = new CountDownLatch(1);
80+
81+
@KafkaListener(id = TOPIC1, topics = "#{'${some.prop:" + TOPIC1 + "}'}")
82+
void listen1(String in) {
83+
throw new RuntimeException("test");
84+
}
85+
86+
void dlt(String in) {
87+
this.latch.countDown();
88+
}
89+
90+
@Bean
91+
KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(KafkaTemplate<Integer, String> template,
92+
ConsumerFactory<Integer, String> consumerFactory) {
93+
94+
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
95+
new ConcurrentKafkaListenerContainerFactory<>();
96+
factory.setConsumerFactory(consumerFactory);
97+
factory.setReplyTemplate(template);
98+
return factory;
99+
}
100+
101+
@Bean
102+
ConsumerFactory<Integer, String> consumerFactory(EmbeddedKafkaBroker embeddedKafka) {
103+
return new DefaultKafkaConsumerFactory<>(
104+
KafkaTestUtils.consumerProps("retryConfig", "false", embeddedKafka));
105+
}
106+
107+
@Bean
108+
KafkaTemplate<Integer, String> template(ProducerFactory<Integer, String> producerFactory) {
109+
return new KafkaTemplate<>(producerFactory);
110+
}
111+
112+
@Bean
113+
ProducerFactory<Integer, String> producerFactory(EmbeddedKafkaBroker embeddedKafka) {
114+
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
115+
}
116+
117+
@Bean
118+
KafkaAdmin admin(EmbeddedKafkaBroker broker) {
119+
return new KafkaAdmin(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()));
120+
}
121+
122+
@Bean
123+
RetryTopicConfiguration retryTopicConfiguration1(KafkaTemplate<Integer, String> template) {
124+
return RetryTopicConfigurationBuilder.newInstance()
125+
.includeTopic(TOPIC1)
126+
.exponentialBackoff(100, 1.1, 110)
127+
.dltHandlerMethod(getClass(), "dlt")
128+
.create(template);
129+
}
130+
131+
}
132+
133+
}

0 commit comments

Comments
 (0)