Skip to content

Commit 5da4913

Browse files
committed
Adapt Kafka auto-configuration to core retry semantics
This commit adapts the auto-configuration of retry topics to Spring Kafka moving away from Spring Retry. The random property has been removed in favor of the new jitter value and default values have been made more explicit. Closes gh-47125
1 parent e13e1a6 commit 5da4913

File tree

4 files changed

+53
-40
lines changed

4 files changed

+53
-40
lines changed

module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfiguration.java

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.IOException;
2020
import java.time.Duration;
2121
import java.util.Map;
22+
import java.util.function.Predicate;
2223

2324
import org.apache.kafka.clients.CommonClientConfigs;
2425
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -47,6 +48,7 @@
4748
import org.springframework.context.annotation.Bean;
4849
import org.springframework.context.annotation.Import;
4950
import org.springframework.context.annotation.ImportRuntimeHints;
51+
import org.springframework.core.retry.RetryPolicy;
5052
import org.springframework.kafka.core.ConsumerFactory;
5153
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
5254
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -62,8 +64,6 @@
6264
import org.springframework.kafka.transaction.KafkaTransactionManager;
6365
import org.springframework.util.StringUtils;
6466
import org.springframework.util.backoff.BackOff;
65-
import org.springframework.util.backoff.ExponentialBackOff;
66-
import org.springframework.util.backoff.FixedBackOff;
6767

6868
/**
6969
* {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
@@ -193,13 +193,13 @@ KafkaAdmin kafkaAdmin(KafkaConnectionDetails connectionDetails) {
193193
@ConditionalOnSingleCandidate(KafkaTemplate.class)
194194
RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaTemplate<?, ?> kafkaTemplate) {
195195
KafkaProperties.Retry.Topic retryTopic = this.properties.getRetry().getTopic();
196-
return RetryTopicConfigurationBuilder.newInstance()
196+
RetryTopicConfigurationBuilder builder = RetryTopicConfigurationBuilder.newInstance()
197197
.maxAttempts(retryTopic.getAttempts())
198198
.useSingleTopicForSameIntervals()
199199
.suffixTopicsWithIndexValues()
200200
.doNotAutoCreateRetryTopics()
201-
.customBackoff(getBackOffPolicy(retryTopic.getBackoff()))
202-
.create(kafkaTemplate);
201+
.customBackoff(getBackOff(retryTopic.getBackoff()));
202+
return builder.create(kafkaTemplate);
203203
}
204204

205205
private void applyKafkaConnectionDetailsForConsumer(Map<String, Object> properties,
@@ -226,28 +226,14 @@ private void applyKafkaConnectionDetailsForAdmin(Map<String, Object> properties,
226226
applySslBundle(properties, admin.getSslBundle());
227227
}
228228

229-
private BackOff getBackOffPolicy(Backoff properties) {
230-
Duration delay = properties.getDelay();
231-
Duration maxDelay = properties.getMaxDelay();
232-
if (delay == null || Duration.ZERO.equals(delay)) {
233-
return new FixedBackOff(0);
234-
}
235-
if (properties.getMultiplier() > 0 || (maxDelay != null && maxDelay.toMillis() > delay.toMillis())) {
236-
long jitter = 0;
237-
if (properties.isRandom() && maxDelay != null) {
238-
jitter = (maxDelay.toMillis() - delay.toMillis()) / 2;
239-
}
240-
ExponentialBackOff backOff = new ExponentialBackOff();
241-
backOff.setInitialInterval(delay.toMillis() + jitter);
242-
backOff.setJitter(jitter);
243-
backOff.setMultiplier(properties.getMultiplier());
244-
if (maxDelay != null && maxDelay.toMillis() > delay.toMillis()) {
245-
backOff.setMaxInterval(properties.getMaxDelay().toMillis());
246-
}
247-
return backOff;
248-
}
249-
return new FixedBackOff(delay.toMillis());
250-
229+
static BackOff getBackOff(Backoff retryTopicBackoff) {
230+
PropertyMapper map = PropertyMapper.get();
231+
RetryPolicy.Builder builder = RetryPolicy.builder().maxAttempts(Long.MAX_VALUE);
232+
map.from(retryTopicBackoff.getDelay()).to(builder::delay);
233+
map.from(retryTopicBackoff.getMaxDelay()).when(Predicate.not(Duration::isZero)).to(builder::maxDelay);
234+
map.from(retryTopicBackoff.getMultiplier()).to(builder::multiplier);
235+
map.from(retryTopicBackoff.getJitter()).when((Predicate.not(Duration::isZero))).to(builder::jitter);
236+
return builder.build().getBackOff();
251237
}
252238

253239
static void applySslBundle(Map<String, Object> properties, @Nullable SslBundle sslBundle) {

module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaProperties.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1595,26 +1595,31 @@ public Backoff getBackoff() {
15951595
public static class Backoff {
15961596

15971597
/**
1598-
* Canonical backoff period. Used as an initial value in the exponential
1599-
* case, and as a minimum value in the uniform case.
1598+
* Base delay after the initial invocation. Can be combined with a
1599+
* "multiplier" to use an exponential back off strategy.
16001600
*/
16011601
private Duration delay = Duration.ofSeconds(1);
16021602

16031603
/**
1604-
* Multiplier to use for generating the next backoff delay.
1604+
* Multiplier for a delay for the next retry attempt, applied to the
1605+
* previous delay, starting with the initial delay as well as to the
1606+
* applicable jitter for each attempt. Fixed delay by default.
16051607
*/
1606-
private double multiplier = 0.0;
1608+
private double multiplier = 1.0;
16071609

16081610
/**
1609-
* Maximum wait between retries. If less than the delay then the default
1610-
* of 30 seconds is applied.
1611+
* Maximum delay for any retry attempt, limiting how far jitter and the
1612+
* multiplier can increase the delay.
16111613
*/
1612-
private Duration maxDelay = Duration.ZERO;
1614+
private Duration maxDelay = Duration.ofSeconds(30);
16131615

16141616
/**
1615-
* Whether to have the backoff delays.
1617+
* Jitter value for the base retry attempt, randomly subtracted or added
1618+
* to the calculated delay, resulting in a value between 'delay - jitter'
1619+
* and 'delay + jitter' but never below the base delay or above the max
1620+
* delay.
16161621
*/
1617-
private boolean random = false;
1622+
private Duration jitter = Duration.ZERO;
16181623

16191624
public Duration getDelay() {
16201625
return this.delay;
@@ -1640,12 +1645,12 @@ public void setMaxDelay(Duration maxDelay) {
16401645
this.maxDelay = maxDelay;
16411646
}
16421647

1643-
public boolean isRandom() {
1644-
return this.random;
1648+
public Duration getJitter() {
1649+
return this.jitter;
16451650
}
16461651

1647-
public void setRandom(boolean random) {
1648-
this.random = random;
1652+
public void setJitter(Duration jitter) {
1653+
this.jitter = jitter;
16491654
}
16501655

16511656
}

module/spring-boot-kafka/src/main/resources/META-INF/additional-spring-configuration-metadata.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,14 @@
118118
"level": "error"
119119
}
120120
},
121+
{
122+
"name": "spring.kafka.retry.topic.backoff.random",
123+
"type": "java.lang.Boolean",
124+
"deprecation": {
125+
"replacement": "spring.kafka.retry.topic.backoff.jitter",
126+
"level": "error"
127+
}
128+
},
121129
{
122130
"name": "spring.kafka.retry.topic.delay",
123131
"type": "java.time.Duration",

module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfigurationTests.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.springframework.aot.hint.predicate.RuntimeHintsPredicates;
5151
import org.springframework.boot.autoconfigure.AutoConfigurations;
5252
import org.springframework.boot.autoconfigure.ssl.SslAutoConfiguration;
53+
import org.springframework.boot.kafka.autoconfigure.KafkaProperties.Retry;
5354
import org.springframework.boot.ssl.SslBundle;
5455
import org.springframework.boot.ssl.SslStoreBundle;
5556
import org.springframework.boot.test.context.assertj.AssertableApplicationContext;
@@ -96,6 +97,7 @@
9697
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
9798
import org.springframework.kafka.transaction.KafkaTransactionManager;
9899
import org.springframework.test.util.ReflectionTestUtils;
100+
import org.springframework.util.backoff.ExponentialBackOff;
99101

100102
import static org.assertj.core.api.Assertions.assertThat;
101103
import static org.assertj.core.api.Assertions.entry;
@@ -568,6 +570,18 @@ void streamsWithCustomKafkaConfiguration() {
568570
});
569571
}
570572

573+
@Test
574+
void getBackOffWithDefaultsMatchesMapping() {
575+
Retry.Topic.Backoff properties = new Retry.Topic.Backoff();
576+
assertThat(KafkaAutoConfiguration.getBackOff(properties)).isInstanceOfSatisfying(ExponentialBackOff.class,
577+
(backOff) -> {
578+
assertThat(backOff.getInitialInterval()).isEqualTo(properties.getDelay().toMillis());
579+
assertThat(backOff.getMultiplier()).isEqualTo(properties.getMultiplier());
580+
assertThat(backOff.getMaxInterval()).isEqualTo(properties.getMaxDelay().toMillis());
581+
assertThat(backOff.getJitter()).isEqualTo(properties.getJitter().toMillis());
582+
});
583+
}
584+
571585
@Test
572586
void retryTopicConfigurationIsNotEnabledByDefault() {
573587
this.contextRunner

0 commit comments

Comments
 (0)