Skip to content

Commit b3e3581

Browse files
committed
Polish "Add auto-configuration to Kafka Retry Topics"
See gh-29812
1 parent bf46d72 commit b3e3581

File tree

4 files changed

+91
-160
lines changed

4 files changed

+91
-160
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717
package org.springframework.boot.autoconfigure.kafka;
1818

1919
import java.io.IOException;
20+
import java.time.Duration;
2021

2122
import org.springframework.beans.factory.ObjectProvider;
2223
import org.springframework.boot.autoconfigure.AutoConfiguration;
2324
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
2425
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
2526
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
2627
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
28+
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate;
2729
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
2830
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic;
2931
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -34,7 +36,6 @@
3436
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
3537
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
3638
import org.springframework.kafka.core.KafkaAdmin;
37-
import org.springframework.kafka.core.KafkaOperations;
3839
import org.springframework.kafka.core.KafkaTemplate;
3940
import org.springframework.kafka.core.ProducerFactory;
4041
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
@@ -146,21 +147,30 @@ public KafkaAdmin kafkaAdmin() {
146147

147148
@Bean
148149
@ConditionalOnProperty(name = "spring.kafka.retry.topic.enabled")
149-
public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaOperations<Object, Object> kafkaOperations) {
150+
@ConditionalOnSingleCandidate(KafkaTemplate.class)
151+
public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaTemplate<?, ?> kafkaTemplate) {
150152
KafkaProperties.Retry.Topic retryTopic = this.properties.getRetry().getTopic();
151153
RetryTopicConfigurationBuilder builder = RetryTopicConfigurationBuilder.newInstance()
152154
.maxAttempts(retryTopic.getAttempts()).useSingleTopicForFixedDelays().suffixTopicsWithIndexValues()
153155
.doNotAutoCreateRetryTopics();
154156
setBackOffPolicy(builder, retryTopic);
155-
return builder.create(kafkaOperations);
157+
return builder.create(kafkaTemplate);
156158
}
157159

158160
private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) {
159-
PropertyMapper.get().from(retryTopic.getDelayMillis()).whenEqualTo(0L).toCall(builder::noBackoff);
160-
PropertyMapper.get().from(retryTopic.getDelayMillis()).when((delay) -> delay > 0)
161-
.toCall(() -> builder.customBackoff((SleepingBackOffPolicy<?>) BackOffPolicyBuilder.newBuilder()
162-
.delay(retryTopic.getDelayMillis()).maxDelay(retryTopic.getMaxDelayMillis())
163-
.multiplier(retryTopic.getMultiplier()).random(retryTopic.isRandomBackOff()).build()));
161+
long delay = (retryTopic.getDelay() != null) ? retryTopic.getDelay().toMillis() : 0;
162+
if (delay > 0) {
163+
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
164+
BackOffPolicyBuilder backOffPolicy = BackOffPolicyBuilder.newBuilder();
165+
map.from(delay).to(backOffPolicy::delay);
166+
map.from(retryTopic.getMaxDelay()).as(Duration::toMillis).to(backOffPolicy::maxDelay);
167+
map.from(retryTopic.getMultiplier()).to(backOffPolicy::multiplier);
168+
map.from(retryTopic.isRandomBackOff()).to(backOffPolicy::random);
169+
builder.customBackoff((SleepingBackOffPolicy<?>) backOffPolicy.build());
170+
}
171+
else {
172+
builder.noBackoff();
173+
}
164174
}
165175

166176
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 47 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.springframework.boot.autoconfigure.kafka;
1818

1919
import java.io.IOException;
20-
import java.math.BigDecimal;
2120
import java.time.Duration;
2221
import java.time.temporal.ChronoUnit;
2322
import java.util.ArrayList;
@@ -42,7 +41,6 @@
4241
import org.springframework.core.io.Resource;
4342
import org.springframework.kafka.listener.ContainerProperties.AckMode;
4443
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
45-
import org.springframework.util.Assert;
4644
import org.springframework.util.CollectionUtils;
4745
import org.springframework.util.unit.DataSize;
4846

@@ -1341,96 +1339,105 @@ public void setOptions(Map<String, String> options) {
13411339

13421340
}
13431341

1344-
public static class Retry {
1342+
public static class Security {
13451343

1346-
private Topic topic = new Topic();
1344+
/**
1345+
* Security protocol used to communicate with brokers.
1346+
*/
1347+
private String protocol;
13471348

1348-
public Topic getTopic() {
1349-
return this.topic.validate();
1349+
public String getProtocol() {
1350+
return this.protocol;
1351+
}
1352+
1353+
public void setProtocol(String protocol) {
1354+
this.protocol = protocol;
1355+
}
1356+
1357+
public Map<String, Object> buildProperties() {
1358+
Properties properties = new Properties();
1359+
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
1360+
map.from(this::getProtocol).to(properties.in(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
1361+
return properties;
13501362
}
13511363

1352-
public void setTopic(Topic topic) {
1353-
this.topic = topic;
1364+
}
1365+
1366+
public static class Retry {
1367+
1368+
private final Topic topic = new Topic();
1369+
1370+
public Topic getTopic() {
1371+
return this.topic;
13541372
}
13551373

13561374
/**
13571375
* Properties for non-blocking, topic-based retries.
13581376
*/
13591377
public static class Topic {
13601378

1361-
private static final String RETRY_TOPIC_PROPERTIES_PREFIX = "spring.kafka.retry.topic.";
1362-
1363-
private static final String RETRY_TOPIC_VALIDATION_ERROR_MSG = "Property " + RETRY_TOPIC_PROPERTIES_PREFIX
1364-
+ "%s should be greater than or equal to %s. Provided value was %s.";
1365-
13661379
/**
1367-
* Whether to enable topic-based retries auto-configuration.
1380+
* Whether to enable topic-based non-blocking retries.
13681381
*/
1369-
private Boolean enabled;
1382+
private boolean enabled;
13701383

13711384
/**
1372-
* The total number of processing attempts made before sending the message to
1373-
* the DLT.
1385+
* Total number of processing attempts made before sending the message to the
1386+
* DLT.
13741387
*/
1375-
private Integer attempts = 3;
1388+
private int attempts = 3;
13761389

13771390
/**
1378-
* A canonical backoff period. Used as an initial value in the exponential
1379-
* case, and as a minimum value in the uniform case.
1391+
* Canonical backoff period. Used as an initial value in the exponential case,
1392+
* and as a minimum value in the uniform case.
13801393
*/
13811394
private Duration delay = Duration.ofSeconds(1);
13821395

13831396
/**
1384-
* If positive, then used as a multiplier for generating the next delay for
1385-
* backoff.
1397+
* Multiplier to use for generating the next backoff delay.
13861398
*/
1387-
private Double multiplier = 0.0;
1399+
private double multiplier = 0.0;
13881400

13891401
/**
1390-
* The maximum wait between retries. If less than the delay then the default
1391-
* of 30 seconds is applied.
1402+
* Maximum wait between retries. If less than the delay then the default of 30
1403+
* seconds is applied.
13921404
*/
13931405
private Duration maxDelay = Duration.ZERO;
13941406

13951407
/**
1396-
* In the exponential case, set this to true to have the backoff delays
1397-
* randomized.
1408+
* Whether to have the backoff delays.
13981409
*/
1399-
private Boolean randomBackOff = false;
1410+
private boolean randomBackOff = false;
14001411

1401-
public Boolean getEnabled() {
1412+
public boolean isEnabled() {
14021413
return this.enabled;
14031414
}
14041415

1405-
public void setEnabled(Boolean enabled) {
1416+
public void setEnabled(boolean enabled) {
14061417
this.enabled = enabled;
14071418
}
14081419

1409-
public Integer getAttempts() {
1420+
public int getAttempts() {
14101421
return this.attempts;
14111422
}
14121423

1413-
public void setAttempts(Integer attempts) {
1424+
public void setAttempts(int attempts) {
14141425
this.attempts = attempts;
14151426
}
14161427

14171428
public Duration getDelay() {
14181429
return this.delay;
14191430
}
14201431

1421-
public Long getDelayMillis() {
1422-
return (this.delay != null) ? this.delay.toMillis() : null;
1423-
}
1424-
14251432
public void setDelay(Duration delay) {
14261433
this.delay = delay;
14271434
}
14281435

1429-
public Double getMultiplier() {
1436+
public double getMultiplier() {
14301437
return this.multiplier;
14311438
}
14321439

1433-
public void setMultiplier(Double multiplier) {
1440+
public void setMultiplier(double multiplier) {
14341441
this.multiplier = multiplier;
14351442
}
14361443

@@ -1442,59 +1449,14 @@ public void setMaxDelay(Duration maxDelay) {
14421449
this.maxDelay = maxDelay;
14431450
}
14441451

1445-
public Long getMaxDelayMillis() {
1446-
return (this.maxDelay != null) ? this.maxDelay.toMillis() : null;
1447-
}
1448-
1449-
public Boolean isRandomBackOff() {
1452+
public boolean isRandomBackOff() {
14501453
return this.randomBackOff;
14511454
}
14521455

1453-
public void setRandomBackOff(Boolean randomBackOff) {
1456+
public void setRandomBackOff(boolean randomBackOff) {
14541457
this.randomBackOff = randomBackOff;
14551458
}
14561459

1457-
private Topic validate() {
1458-
validateProperty("attempts", this.attempts, 1);
1459-
validateProperty("delay", this.getDelayMillis(), 0);
1460-
validateProperty("multiplier", this.multiplier, 0);
1461-
validateProperty("maxDelay", this.getMaxDelayMillis(), 0);
1462-
Assert.isTrue(this.multiplier != 0 || !this.isRandomBackOff(),
1463-
"Property " + RETRY_TOPIC_PROPERTIES_PREFIX
1464-
+ "randomBackOff should not be true with non-exponential back offs.");
1465-
return this;
1466-
}
1467-
1468-
private static void validateProperty(String propertyName, Number providedValue, int minValue) {
1469-
Assert.notNull(providedValue, () -> RETRY_TOPIC_PROPERTIES_PREFIX + propertyName + " cannot be null.");
1470-
Assert.isTrue(new BigDecimal(providedValue.toString()).compareTo(BigDecimal.valueOf(minValue)) >= 0,
1471-
() -> String.format(RETRY_TOPIC_VALIDATION_ERROR_MSG, propertyName, minValue, providedValue));
1472-
}
1473-
1474-
}
1475-
1476-
}
1477-
1478-
public static class Security {
1479-
1480-
/**
1481-
* Security protocol used to communicate with brokers.
1482-
*/
1483-
private String protocol;
1484-
1485-
public String getProtocol() {
1486-
return this.protocol;
1487-
}
1488-
1489-
public void setProtocol(String protocol) {
1490-
this.protocol = protocol;
1491-
}
1492-
1493-
public Map<String, Object> buildProperties() {
1494-
Properties properties = new Properties();
1495-
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
1496-
map.from(this::getProtocol).to(properties.in(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
1497-
return properties;
14981460
}
14991461

15001462
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2021 the original author or authors.
2+
* Copyright 2012-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

0 commit comments

Comments
 (0)