Skip to content

Commit 76e197f

Browse files
committed
Merge pull request #29812 from tomazfernandes
* pr/29812: Polish "Add auto-configuration to Kafka Retry Topics" Add auto-configuration to Kafka Retry Topics Closes gh-29812
2 parents de17878 + b3e3581 commit 76e197f

File tree

4 files changed

+278
-1
lines changed

4 files changed

+278
-1
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,17 @@
1717
package org.springframework.boot.autoconfigure.kafka;
1818

1919
import java.io.IOException;
20+
import java.time.Duration;
2021

2122
import org.springframework.beans.factory.ObjectProvider;
2223
import org.springframework.boot.autoconfigure.AutoConfiguration;
2324
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
2425
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
2526
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
2627
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
28+
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate;
2729
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
30+
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic;
2831
import org.springframework.boot.context.properties.EnableConfigurationProperties;
2932
import org.springframework.boot.context.properties.PropertyMapper;
3033
import org.springframework.context.annotation.Bean;
@@ -35,11 +38,15 @@
3538
import org.springframework.kafka.core.KafkaAdmin;
3639
import org.springframework.kafka.core.KafkaTemplate;
3740
import org.springframework.kafka.core.ProducerFactory;
41+
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
42+
import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder;
3843
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
3944
import org.springframework.kafka.support.LoggingProducerListener;
4045
import org.springframework.kafka.support.ProducerListener;
4146
import org.springframework.kafka.support.converter.RecordMessageConverter;
4247
import org.springframework.kafka.transaction.KafkaTransactionManager;
48+
import org.springframework.retry.backoff.BackOffPolicyBuilder;
49+
import org.springframework.retry.backoff.SleepingBackOffPolicy;
4350

4451
/**
4552
* {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
@@ -48,6 +55,7 @@
4855
* @author Stephane Nicoll
4956
* @author Eddú Meléndez
5057
* @author Nakul Mishra
58+
* @author Tomaz Fernandes
5159
* @since 1.5.0
5260
*/
5361
@AutoConfiguration
@@ -137,4 +145,32 @@ public KafkaAdmin kafkaAdmin() {
137145
return kafkaAdmin;
138146
}
139147

148+
@Bean
149+
@ConditionalOnProperty(name = "spring.kafka.retry.topic.enabled")
150+
@ConditionalOnSingleCandidate(KafkaTemplate.class)
151+
public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaTemplate<?, ?> kafkaTemplate) {
152+
KafkaProperties.Retry.Topic retryTopic = this.properties.getRetry().getTopic();
153+
RetryTopicConfigurationBuilder builder = RetryTopicConfigurationBuilder.newInstance()
154+
.maxAttempts(retryTopic.getAttempts()).useSingleTopicForFixedDelays().suffixTopicsWithIndexValues()
155+
.doNotAutoCreateRetryTopics();
156+
setBackOffPolicy(builder, retryTopic);
157+
return builder.create(kafkaTemplate);
158+
}
159+
160+
private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) {
161+
long delay = (retryTopic.getDelay() != null) ? retryTopic.getDelay().toMillis() : 0;
162+
if (delay > 0) {
163+
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
164+
BackOffPolicyBuilder backOffPolicy = BackOffPolicyBuilder.newBuilder();
165+
map.from(delay).to(backOffPolicy::delay);
166+
map.from(retryTopic.getMaxDelay()).as(Duration::toMillis).to(backOffPolicy::maxDelay);
167+
map.from(retryTopic.getMultiplier()).to(backOffPolicy::multiplier);
168+
map.from(retryTopic.isRandomBackOff()).to(backOffPolicy::random);
169+
builder.customBackoff((SleepingBackOffPolicy<?>) backOffPolicy.build());
170+
}
171+
else {
172+
builder.noBackoff();
173+
}
174+
}
175+
140176
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
* @author Stephane Nicoll
5555
* @author Artem Bilan
5656
* @author Nakul Mishra
57+
* @author Tomaz Fernandes
5758
* @since 1.5.0
5859
*/
5960
@ConfigurationProperties(prefix = "spring.kafka")
@@ -94,6 +95,8 @@ public class KafkaProperties {
9495

9596
private final Security security = new Security();
9697

98+
private final Retry retry = new Retry();
99+
97100
public List<String> getBootstrapServers() {
98101
return this.bootstrapServers;
99102
}
@@ -150,6 +153,10 @@ public Security getSecurity() {
150153
return this.security;
151154
}
152155

156+
public Retry getRetry() {
157+
return this.retry;
158+
}
159+
153160
private Map<String, Object> buildCommonProperties() {
154161
Map<String, Object> properties = new HashMap<>();
155162
if (this.bootstrapServers != null) {
@@ -1356,6 +1363,104 @@ public Map<String, Object> buildProperties() {
13561363

13571364
}
13581365

1366+
public static class Retry {
1367+
1368+
private final Topic topic = new Topic();
1369+
1370+
public Topic getTopic() {
1371+
return this.topic;
1372+
}
1373+
1374+
/**
1375+
* Properties for non-blocking, topic-based retries.
1376+
*/
1377+
public static class Topic {
1378+
1379+
/**
1380+
* Whether to enable topic-based non-blocking retries.
1381+
*/
1382+
private boolean enabled;
1383+
1384+
/**
1385+
* Total number of processing attempts made before sending the message to the
1386+
* DLT.
1387+
*/
1388+
private int attempts = 3;
1389+
1390+
/**
1391+
* Canonical backoff period. Used as an initial value in the exponential case,
1392+
* and as a minimum value in the uniform case.
1393+
*/
1394+
private Duration delay = Duration.ofSeconds(1);
1395+
1396+
/**
1397+
* Multiplier to use for generating the next backoff delay.
1398+
*/
1399+
private double multiplier = 0.0;
1400+
1401+
/**
1402+
* Maximum wait between retries. If less than the delay then the default of 30
1403+
* seconds is applied.
1404+
*/
1405+
private Duration maxDelay = Duration.ZERO;
1406+
1407+
/**
1408+
* Whether to have the backoff delays.
1409+
*/
1410+
private boolean randomBackOff = false;
1411+
1412+
public boolean isEnabled() {
1413+
return this.enabled;
1414+
}
1415+
1416+
public void setEnabled(boolean enabled) {
1417+
this.enabled = enabled;
1418+
}
1419+
1420+
public int getAttempts() {
1421+
return this.attempts;
1422+
}
1423+
1424+
public void setAttempts(int attempts) {
1425+
this.attempts = attempts;
1426+
}
1427+
1428+
public Duration getDelay() {
1429+
return this.delay;
1430+
}
1431+
1432+
public void setDelay(Duration delay) {
1433+
this.delay = delay;
1434+
}
1435+
1436+
public double getMultiplier() {
1437+
return this.multiplier;
1438+
}
1439+
1440+
public void setMultiplier(double multiplier) {
1441+
this.multiplier = multiplier;
1442+
}
1443+
1444+
public Duration getMaxDelay() {
1445+
return this.maxDelay;
1446+
}
1447+
1448+
public void setMaxDelay(Duration maxDelay) {
1449+
this.maxDelay = maxDelay;
1450+
}
1451+
1452+
public boolean isRandomBackOff() {
1453+
return this.randomBackOff;
1454+
}
1455+
1456+
public void setRandomBackOff(boolean randomBackOff) {
1457+
this.randomBackOff = randomBackOff;
1458+
}
1459+
1460+
}
1461+
1462+
}
1463+
13591464
public static class Cleanup {
13601465

13611466
/**

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2021 the original author or authors.
2+
* Copyright 2012-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.boot.autoconfigure.kafka;
1818

19+
import java.util.ArrayList;
20+
import java.util.List;
1921
import java.util.concurrent.CountDownLatch;
2022
import java.util.concurrent.TimeUnit;
2123
import java.util.regex.Pattern;
@@ -41,6 +43,8 @@
4143
import org.springframework.kafka.config.TopicBuilder;
4244
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
4345
import org.springframework.kafka.core.KafkaTemplate;
46+
import org.springframework.kafka.retrytopic.DestinationTopic;
47+
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
4448
import org.springframework.kafka.support.KafkaHeaders;
4549
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
4650
import org.springframework.kafka.test.context.EmbeddedKafka;
@@ -53,12 +57,14 @@
5357
*
5458
* @author Gary Russell
5559
* @author Stephane Nicoll
60+
* @author Tomaz Fernandes
5661
*/
5762
@DisabledOnOs(OS.WINDOWS)
5863
@EmbeddedKafka(topics = KafkaAutoConfigurationIntegrationTests.TEST_TOPIC)
5964
class KafkaAutoConfigurationIntegrationTests {
6065

6166
static final String TEST_TOPIC = "testTopic";
67+
static final String TEST_RETRY_TOPIC = "testRetryTopic";
6268

6369
private static final String ADMIN_CREATED_TOPIC = "adminCreatedTopic";
6470

@@ -89,6 +95,27 @@ void testEndToEnd() throws Exception {
8995
producer.close();
9096
}
9197

98+
@SuppressWarnings("unchecked")
99+
@Test
100+
void testEndToEndWithRetryTopics() throws Exception {
101+
load(KafkaConfig.class, "spring.kafka.bootstrap-servers:" + getEmbeddedKafkaBrokersAsString(),
102+
"spring.kafka.consumer.group-id=testGroup", "spring.kafka.retry.topic.enabled=true",
103+
"spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.delay=100ms",
104+
"spring.kafka.retry.topic.multiplier=2", "spring.kafka.retry.topic.max-delay=300ms",
105+
"spring.kafka.consumer.auto-offset-reset=earliest");
106+
RetryTopicConfiguration configuration = this.context.getBean(RetryTopicConfiguration.class);
107+
assertThat(configuration.getDestinationTopicProperties()).extracting(DestinationTopic.Properties::delay)
108+
.containsExactly(0L, 100L, 200L, 300L, 300L, 0L);
109+
KafkaTemplate<String, String> template = this.context.getBean(KafkaTemplate.class);
110+
template.send(TEST_RETRY_TOPIC, "foo", "bar");
111+
RetryListener listener = this.context.getBean(RetryListener.class);
112+
assertThat(listener.latch.await(30, TimeUnit.SECONDS)).isTrue();
113+
assertThat(listener).extracting(RetryListener::getKey, RetryListener::getReceived).containsExactly("foo",
114+
"bar");
115+
assertThat(listener).extracting(RetryListener::getTopics).asList().hasSize(5).containsSequence("testRetryTopic",
116+
"testRetryTopic-retry-0", "testRetryTopic-retry-1", "testRetryTopic-retry-2", "testRetryTopic-retry-3");
117+
}
118+
92119
@Test
93120
void testStreams() {
94121
load(KafkaStreamsConfig.class, "spring.application.name:my-app",
@@ -121,6 +148,11 @@ Listener listener() {
121148
return new Listener();
122149
}
123150

151+
@Bean
152+
RetryListener retryListener() {
153+
return new RetryListener();
154+
}
155+
124156
@Bean
125157
NewTopic adminCreated() {
126158
return TopicBuilder.name(ADMIN_CREATED_TOPIC).partitions(10).replicas(1).build();
@@ -157,4 +189,38 @@ void listen(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
157189

158190
}
159191

192+
static class RetryListener {
193+
194+
private final CountDownLatch latch = new CountDownLatch(5);
195+
196+
private final List<String> topics = new ArrayList<>();
197+
198+
private volatile String received;
199+
200+
private volatile String key;
201+
202+
@KafkaListener(topics = TEST_RETRY_TOPIC)
203+
void listen(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
204+
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
205+
this.received = foo;
206+
this.key = key;
207+
this.topics.add(topic);
208+
this.latch.countDown();
209+
throw new RuntimeException("Test exception");
210+
}
211+
212+
private List<String> getTopics() {
213+
return this.topics;
214+
}
215+
216+
private String getReceived() {
217+
return this.received;
218+
}
219+
220+
private String getKey() {
221+
return this.key;
222+
}
223+
224+
}
225+
160226
}

0 commit comments

Comments
 (0)