Skip to content

Commit bf46d72

Browse files
tomazfernandessnicoll
authored andcommitted
Add auto-configuration to Kafka Retry Topics
See gh-29812
1 parent de17878 commit bf46d72

File tree

4 files changed

+346
-0
lines changed

4 files changed

+346
-0
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
2626
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
2727
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
28+
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic;
2829
import org.springframework.boot.context.properties.EnableConfigurationProperties;
2930
import org.springframework.boot.context.properties.PropertyMapper;
3031
import org.springframework.context.annotation.Bean;
@@ -33,13 +34,18 @@
3334
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
3435
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
3536
import org.springframework.kafka.core.KafkaAdmin;
37+
import org.springframework.kafka.core.KafkaOperations;
3638
import org.springframework.kafka.core.KafkaTemplate;
3739
import org.springframework.kafka.core.ProducerFactory;
40+
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
41+
import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder;
3842
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
3943
import org.springframework.kafka.support.LoggingProducerListener;
4044
import org.springframework.kafka.support.ProducerListener;
4145
import org.springframework.kafka.support.converter.RecordMessageConverter;
4246
import org.springframework.kafka.transaction.KafkaTransactionManager;
47+
import org.springframework.retry.backoff.BackOffPolicyBuilder;
48+
import org.springframework.retry.backoff.SleepingBackOffPolicy;
4349

4450
/**
4551
* {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
@@ -48,6 +54,7 @@
4854
* @author Stephane Nicoll
4955
* @author Eddú Meléndez
5056
* @author Nakul Mishra
57+
* @author Tomaz Fernandes
5158
* @since 1.5.0
5259
*/
5360
@AutoConfiguration
@@ -137,4 +144,23 @@ public KafkaAdmin kafkaAdmin() {
137144
return kafkaAdmin;
138145
}
139146

147+
@Bean
148+
@ConditionalOnProperty(name = "spring.kafka.retry.topic.enabled")
149+
public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaOperations<Object, Object> kafkaOperations) {
150+
KafkaProperties.Retry.Topic retryTopic = this.properties.getRetry().getTopic();
151+
RetryTopicConfigurationBuilder builder = RetryTopicConfigurationBuilder.newInstance()
152+
.maxAttempts(retryTopic.getAttempts()).useSingleTopicForFixedDelays().suffixTopicsWithIndexValues()
153+
.doNotAutoCreateRetryTopics();
154+
setBackOffPolicy(builder, retryTopic);
155+
return builder.create(kafkaOperations);
156+
}
157+
158+
private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) {
159+
PropertyMapper.get().from(retryTopic.getDelayMillis()).whenEqualTo(0L).toCall(builder::noBackoff);
160+
PropertyMapper.get().from(retryTopic.getDelayMillis()).when((delay) -> delay > 0)
161+
.toCall(() -> builder.customBackoff((SleepingBackOffPolicy<?>) BackOffPolicyBuilder.newBuilder()
162+
.delay(retryTopic.getDelayMillis()).maxDelay(retryTopic.getMaxDelayMillis())
163+
.multiplier(retryTopic.getMultiplier()).random(retryTopic.isRandomBackOff()).build()));
164+
}
165+
140166
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.boot.autoconfigure.kafka;
1818

1919
import java.io.IOException;
20+
import java.math.BigDecimal;
2021
import java.time.Duration;
2122
import java.time.temporal.ChronoUnit;
2223
import java.util.ArrayList;
@@ -41,6 +42,7 @@
4142
import org.springframework.core.io.Resource;
4243
import org.springframework.kafka.listener.ContainerProperties.AckMode;
4344
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
45+
import org.springframework.util.Assert;
4446
import org.springframework.util.CollectionUtils;
4547
import org.springframework.util.unit.DataSize;
4648

@@ -54,6 +56,7 @@
5456
* @author Stephane Nicoll
5557
* @author Artem Bilan
5658
* @author Nakul Mishra
59+
* @author Tomaz Fernandes
5760
* @since 1.5.0
5861
*/
5962
@ConfigurationProperties(prefix = "spring.kafka")
@@ -94,6 +97,8 @@ public class KafkaProperties {
9497

9598
private final Security security = new Security();
9699

100+
private final Retry retry = new Retry();
101+
97102
public List<String> getBootstrapServers() {
98103
return this.bootstrapServers;
99104
}
@@ -150,6 +155,10 @@ public Security getSecurity() {
150155
return this.security;
151156
}
152157

158+
public Retry getRetry() {
159+
return this.retry;
160+
}
161+
153162
private Map<String, Object> buildCommonProperties() {
154163
Map<String, Object> properties = new HashMap<>();
155164
if (this.bootstrapServers != null) {
@@ -1332,6 +1341,140 @@ public void setOptions(Map<String, String> options) {
13321341

13331342
}
13341343

1344+
public static class Retry {
1345+
1346+
private Topic topic = new Topic();
1347+
1348+
public Topic getTopic() {
1349+
return this.topic.validate();
1350+
}
1351+
1352+
public void setTopic(Topic topic) {
1353+
this.topic = topic;
1354+
}
1355+
1356+
/**
1357+
* Properties for non-blocking, topic-based retries.
1358+
*/
1359+
public static class Topic {
1360+
1361+
private static final String RETRY_TOPIC_PROPERTIES_PREFIX = "spring.kafka.retry.topic.";
1362+
1363+
private static final String RETRY_TOPIC_VALIDATION_ERROR_MSG = "Property " + RETRY_TOPIC_PROPERTIES_PREFIX
1364+
+ "%s should be greater than or equal to %s. Provided value was %s.";
1365+
1366+
/**
1367+
* Whether to enable topic-based retries auto-configuration.
1368+
*/
1369+
private Boolean enabled;
1370+
1371+
/**
1372+
* The total number of processing attempts made before sending the message to
1373+
* the DLT.
1374+
*/
1375+
private Integer attempts = 3;
1376+
1377+
/**
1378+
* A canonical backoff period. Used as an initial value in the exponential
1379+
* case, and as a minimum value in the uniform case.
1380+
*/
1381+
private Duration delay = Duration.ofSeconds(1);
1382+
1383+
/**
1384+
* If positive, then used as a multiplier for generating the next delay for
1385+
* backoff.
1386+
*/
1387+
private Double multiplier = 0.0;
1388+
1389+
/**
1390+
* The maximum wait between retries. If less than the delay then the default
1391+
* of 30 seconds is applied.
1392+
*/
1393+
private Duration maxDelay = Duration.ZERO;
1394+
1395+
/**
1396+
* In the exponential case, set this to true to have the backoff delays
1397+
* randomized.
1398+
*/
1399+
private Boolean randomBackOff = false;
1400+
1401+
public Boolean getEnabled() {
1402+
return this.enabled;
1403+
}
1404+
1405+
public void setEnabled(Boolean enabled) {
1406+
this.enabled = enabled;
1407+
}
1408+
1409+
public Integer getAttempts() {
1410+
return this.attempts;
1411+
}
1412+
1413+
public void setAttempts(Integer attempts) {
1414+
this.attempts = attempts;
1415+
}
1416+
1417+
public Duration getDelay() {
1418+
return this.delay;
1419+
}
1420+
1421+
public Long getDelayMillis() {
1422+
return (this.delay != null) ? this.delay.toMillis() : null;
1423+
}
1424+
1425+
public void setDelay(Duration delay) {
1426+
this.delay = delay;
1427+
}
1428+
1429+
public Double getMultiplier() {
1430+
return this.multiplier;
1431+
}
1432+
1433+
public void setMultiplier(Double multiplier) {
1434+
this.multiplier = multiplier;
1435+
}
1436+
1437+
public Duration getMaxDelay() {
1438+
return this.maxDelay;
1439+
}
1440+
1441+
public void setMaxDelay(Duration maxDelay) {
1442+
this.maxDelay = maxDelay;
1443+
}
1444+
1445+
public Long getMaxDelayMillis() {
1446+
return (this.maxDelay != null) ? this.maxDelay.toMillis() : null;
1447+
}
1448+
1449+
public Boolean isRandomBackOff() {
1450+
return this.randomBackOff;
1451+
}
1452+
1453+
public void setRandomBackOff(Boolean randomBackOff) {
1454+
this.randomBackOff = randomBackOff;
1455+
}
1456+
1457+
private Topic validate() {
1458+
validateProperty("attempts", this.attempts, 1);
1459+
validateProperty("delay", this.getDelayMillis(), 0);
1460+
validateProperty("multiplier", this.multiplier, 0);
1461+
validateProperty("maxDelay", this.getMaxDelayMillis(), 0);
1462+
Assert.isTrue(this.multiplier != 0 || !this.isRandomBackOff(),
1463+
"Property " + RETRY_TOPIC_PROPERTIES_PREFIX
1464+
+ "randomBackOff should not be true with non-exponential back offs.");
1465+
return this;
1466+
}
1467+
1468+
private static void validateProperty(String propertyName, Number providedValue, int minValue) {
1469+
Assert.notNull(providedValue, () -> RETRY_TOPIC_PROPERTIES_PREFIX + propertyName + " cannot be null.");
1470+
Assert.isTrue(new BigDecimal(providedValue.toString()).compareTo(BigDecimal.valueOf(minValue)) >= 0,
1471+
() -> String.format(RETRY_TOPIC_VALIDATION_ERROR_MSG, propertyName, minValue, providedValue));
1472+
}
1473+
1474+
}
1475+
1476+
}
1477+
13351478
public static class Security {
13361479

13371480
/**

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.boot.autoconfigure.kafka;
1818

19+
import java.util.ArrayList;
20+
import java.util.List;
1921
import java.util.concurrent.CountDownLatch;
2022
import java.util.concurrent.TimeUnit;
2123
import java.util.regex.Pattern;
@@ -41,6 +43,8 @@
4143
import org.springframework.kafka.config.TopicBuilder;
4244
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
4345
import org.springframework.kafka.core.KafkaTemplate;
46+
import org.springframework.kafka.retrytopic.DestinationTopic;
47+
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
4448
import org.springframework.kafka.support.KafkaHeaders;
4549
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
4650
import org.springframework.kafka.test.context.EmbeddedKafka;
@@ -53,12 +57,14 @@
5357
*
5458
* @author Gary Russell
5559
* @author Stephane Nicoll
60+
* @author Tomaz Fernandes
5661
*/
5762
@DisabledOnOs(OS.WINDOWS)
5863
@EmbeddedKafka(topics = KafkaAutoConfigurationIntegrationTests.TEST_TOPIC)
5964
class KafkaAutoConfigurationIntegrationTests {
6065

6166
static final String TEST_TOPIC = "testTopic";
67+
static final String TEST_RETRY_TOPIC = "testRetryTopic";
6268

6369
private static final String ADMIN_CREATED_TOPIC = "adminCreatedTopic";
6470

@@ -89,6 +95,27 @@ void testEndToEnd() throws Exception {
8995
producer.close();
9096
}
9197

98+
@SuppressWarnings("unchecked")
99+
@Test
100+
void testEndToEndWithRetryTopics() throws Exception {
101+
load(KafkaConfig.class, "spring.kafka.bootstrap-servers:" + getEmbeddedKafkaBrokersAsString(),
102+
"spring.kafka.consumer.group-id=testGroup", "spring.kafka.retry.topic.enabled=true",
103+
"spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.delay=100ms",
104+
"spring.kafka.retry.topic.multiplier=2", "spring.kafka.retry.topic.max-delay=300ms",
105+
"spring.kafka.consumer.auto-offset-reset=earliest");
106+
RetryTopicConfiguration configuration = this.context.getBean(RetryTopicConfiguration.class);
107+
assertThat(configuration.getDestinationTopicProperties()).extracting(DestinationTopic.Properties::delay)
108+
.containsExactly(0L, 100L, 200L, 300L, 300L, 0L);
109+
KafkaTemplate<String, String> template = this.context.getBean(KafkaTemplate.class);
110+
template.send(TEST_RETRY_TOPIC, "foo", "bar");
111+
RetryListener listener = this.context.getBean(RetryListener.class);
112+
assertThat(listener.latch.await(30, TimeUnit.SECONDS)).isTrue();
113+
assertThat(listener).extracting(RetryListener::getKey, RetryListener::getReceived).containsExactly("foo",
114+
"bar");
115+
assertThat(listener).extracting(RetryListener::getTopics).asList().hasSize(5).containsSequence("testRetryTopic",
116+
"testRetryTopic-retry-0", "testRetryTopic-retry-1", "testRetryTopic-retry-2", "testRetryTopic-retry-3");
117+
}
118+
92119
@Test
93120
void testStreams() {
94121
load(KafkaStreamsConfig.class, "spring.application.name:my-app",
@@ -121,6 +148,11 @@ Listener listener() {
121148
return new Listener();
122149
}
123150

151+
@Bean
152+
RetryListener retryListener() {
153+
return new RetryListener();
154+
}
155+
124156
@Bean
125157
NewTopic adminCreated() {
126158
return TopicBuilder.name(ADMIN_CREATED_TOPIC).partitions(10).replicas(1).build();
@@ -157,4 +189,38 @@ void listen(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
157189

158190
}
159191

192+
static class RetryListener {
193+
194+
private final CountDownLatch latch = new CountDownLatch(5);
195+
196+
private final List<String> topics = new ArrayList<>();
197+
198+
private volatile String received;
199+
200+
private volatile String key;
201+
202+
@KafkaListener(topics = TEST_RETRY_TOPIC)
203+
void listen(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
204+
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
205+
this.received = foo;
206+
this.key = key;
207+
this.topics.add(topic);
208+
this.latch.countDown();
209+
throw new RuntimeException("Test exception");
210+
}
211+
212+
private List<String> getTopics() {
213+
return this.topics;
214+
}
215+
216+
private String getReceived() {
217+
return this.received;
218+
}
219+
220+
private String getKey() {
221+
return this.key;
222+
}
223+
224+
}
225+
160226
}

0 commit comments

Comments
 (0)