Skip to content

Commit 9a64395

Browse files
Wzy19930507Zhiyang.Wang1garyrussell
authored
GH-2654: Incorrect retry topic suffix (#2876)
* GH-2654: Incorrect retry topic suffix Resolves #2654 * fix incorrect retry topic suffix when attempts is 2 * optimization method `isDelayWithReusedTopic` * add retry topic integration tests when reuse retry topic fix java doc in RetryTopic * fix javadoc in `@RetryTopic` * fix javadoc in `RetryTopicConfigurationProvider` * fix javadoc in `RetryableTopic` * add a note to the `whats-new.adoc` * Doc polishing. --------- Co-authored-by: Zhiyang.Wang1 <[email protected]> Co-authored-by: Gary Russell <[email protected]>
1 parent 8686714 commit 9a64395

File tree

4 files changed

+121
-5
lines changed

4 files changed

+121
-5
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,9 @@ See xref:kafka/container-factory.adoc[Container Factory] for more information.
3838
You can now add a `Validator` to this deserializer; if the delegate `Deserializer` successfully deserializes the object, but that object fails validation, an exception is thrown similar to a deserialization exception occurring.
3939
This allows the original raw data to be passed to the error handler.
4040
See xref:kafka/serdes.adoc#error-handling-deserializer[Using `ErrorHandlingDeserializer`] for more information.
41+
42+
[[x31-retryable]]
43+
=== Retryable Topics
44+
Change suffix `-retry-5000` to `-retry` when `@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)`.
45+
If you want to keep suffix `-retry-5000`, use `@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2")`.
46+
See xref:retrytopic/topic-naming.adoc[Topic Naming] for more information.

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryTopicConfigurationProvider.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -43,7 +43,7 @@
4343
* annotation is available.
4444
*
4545
* If beans are found in the container there's a check to determine whether or not the
46-
* provided topics topics should be handled by any of such instances.
46+
* provided topics should be handled by any of such instances.
4747
*
4848
* If the annotation is provided, a
4949
* {@link org.springframework.kafka.annotation.DltHandler} annotated method is looked up.

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ private List<DestinationTopic.Properties> createPropertiesForFixedDelaySingleTop
128128
}
129129

130130
private boolean isSingleTopicFixedDelay() {
131-
return isFixedDelay() && isSingleTopicSameIntervalTopicReuseStrategy();
131+
return (this.backOffValues.size() == 1 || isFixedDelay()) && isSingleTopicSameIntervalTopicReuseStrategy();
132132
}
133133

134134
private boolean isSingleTopicSameIntervalTopicReuseStrategy() {
@@ -220,8 +220,7 @@ private String suffixForRepeatedInterval(int indexInBackoffValues, Long thisBack
220220
}
221221

222222
private boolean isDelayWithReusedTopic(Long backoffValue) {
223-
return ((isSingleTopicFixedDelay()) ||
224-
(hasDuplicates(backoffValue) && isSingleTopicSameIntervalTopicReuseStrategy()));
223+
return hasDuplicates(backoffValue) && isSingleTopicSameIntervalTopicReuseStrategy();
225224
}
226225

227226
private int getIndexInBackoffValues(int indexInBackoffValues, Long thisBackOffValue) {

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@
9393
/**
9494
* @author Tomaz Fernandes
9595
* @author Gary Russell
96+
* @author Wang Zhiyang
9697
* @since 2.7
9798
*/
9899
@SpringJUnitConfig
@@ -122,6 +123,12 @@ public class RetryTopicIntegrationTests {
122123

123124
public final static String NOT_RETRYABLE_EXCEPTION_TOPIC = "noRetryTopic";
124125

126+
public final static String FIRST_REUSE_RETRY_TOPIC = "reuseRetry1";
127+
128+
public final static String SECOND_REUSE_RETRY_TOPIC = "reuseRetry2";
129+
130+
public final static String THIRD_REUSE_RETRY_TOPIC = "reuseRetry3";
131+
125132
private final static String MAIN_TOPIC_CONTAINER_FACTORY = "kafkaListenerContainerFactory";
126133

127134
@Autowired
@@ -267,6 +274,29 @@ void shouldRetryManualTopicWithDefaultDlt(@Autowired KafkaListenerEndpointRegist
267274
}
268275
}
269276

277+
@Test
278+
void shouldFirstReuseRetryTopic(@Autowired FirstReuseRetryTopicListener listener1,
279+
@Autowired SecondReuseRetryTopicListener listener2, @Autowired ThirdReuseRetryTopicListener listener3) {
280+
281+
logger.debug("Sending message to topic " + FIRST_REUSE_RETRY_TOPIC);
282+
kafkaTemplate.send(FIRST_REUSE_RETRY_TOPIC, "Testing reuse topic 1");
283+
logger.debug("Sending message to topic " + SECOND_REUSE_RETRY_TOPIC);
284+
kafkaTemplate.send(SECOND_REUSE_RETRY_TOPIC, "Testing reuse topic 2");
285+
logger.debug("Sending message to topic " + THIRD_REUSE_RETRY_TOPIC);
286+
kafkaTemplate.send(THIRD_REUSE_RETRY_TOPIC, "Testing reuse topic 3");
287+
assertThat(awaitLatch(latchContainer.countDownLatchReuseOne)).isTrue();
288+
assertThat(awaitLatch(latchContainer.countDownLatchReuseTwo)).isTrue();
289+
assertThat(awaitLatch(latchContainer.countDownLatchReuseThree)).isTrue();
290+
assertThat(listener1.topics).containsExactly(FIRST_REUSE_RETRY_TOPIC,
291+
FIRST_REUSE_RETRY_TOPIC + "-retry");
292+
assertThat(listener2.topics).containsExactly(SECOND_REUSE_RETRY_TOPIC,
293+
SECOND_REUSE_RETRY_TOPIC + "-retry-30", SECOND_REUSE_RETRY_TOPIC + "-retry-60",
294+
SECOND_REUSE_RETRY_TOPIC + "-retry-100", SECOND_REUSE_RETRY_TOPIC + "-retry-100");
295+
assertThat(listener3.topics).containsExactly(THIRD_REUSE_RETRY_TOPIC,
296+
THIRD_REUSE_RETRY_TOPIC + "-retry", THIRD_REUSE_RETRY_TOPIC + "-retry",
297+
THIRD_REUSE_RETRY_TOPIC + "-retry", THIRD_REUSE_RETRY_TOPIC + "-retry");
298+
}
299+
270300
@Test
271301
public void shouldGoStraightToDlt() {
272302
logger.debug("Sending message to topic " + NOT_RETRYABLE_EXCEPTION_TOPIC);
@@ -473,6 +503,69 @@ public void annotatedDltMethod(Object message) {
473503
}
474504
}
475505

506+
@Component
507+
static class FirstReuseRetryTopicListener {
508+
509+
final List<String> topics = Collections.synchronizedList(new ArrayList<>());
510+
511+
@Autowired
512+
CountDownLatchContainer container;
513+
514+
@RetryableTopic(attempts = "2", backoff = @Backoff(50),
515+
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
516+
@KafkaListener(id = "reuseRetry1", topics = FIRST_REUSE_RETRY_TOPIC,
517+
containerFactory = "retryTopicListenerContainerFactory")
518+
public void listen1(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
519+
logger.debug("Message {} received in topic {} ", message, receivedTopic);
520+
this.topics.add(receivedTopic);
521+
container.countDownLatchReuseOne.countDown();
522+
throw new RuntimeException("Another woooops... " + receivedTopic);
523+
}
524+
525+
}
526+
527+
@Component
528+
static class SecondReuseRetryTopicListener {
529+
530+
final List<String> topics = Collections.synchronizedList(new ArrayList<>());
531+
532+
@Autowired
533+
CountDownLatchContainer container;
534+
535+
@RetryableTopic(attempts = "5", backoff = @Backoff(delay = 30, maxDelay = 100, multiplier = 2),
536+
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
537+
@KafkaListener(id = "reuseRetry2", topics = SECOND_REUSE_RETRY_TOPIC,
538+
containerFactory = "retryTopicListenerContainerFactory")
539+
public void listen2(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
540+
logger.debug("Message {} received in topic {} ", message, receivedTopic);
541+
this.topics.add(receivedTopic);
542+
container.countDownLatchReuseTwo.countDown();
543+
throw new RuntimeException("Another woooops... " + receivedTopic);
544+
}
545+
546+
}
547+
548+
@Component
549+
static class ThirdReuseRetryTopicListener {
550+
551+
final List<String> topics = Collections.synchronizedList(new ArrayList<>());
552+
553+
@Autowired
554+
CountDownLatchContainer container;
555+
556+
@RetryableTopic(attempts = "5", backoff = @Backoff(delay = 1, maxDelay = 5, multiplier = 1.4),
557+
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
558+
@KafkaListener(id = "reuseRetry3", topics = THIRD_REUSE_RETRY_TOPIC,
559+
containerFactory = "retryTopicListenerContainerFactory")
560+
public void listen3(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
561+
logger.debug("Message {} received in topic {} ", message, receivedTopic);
562+
this.topics.add(receivedTopic);
563+
container.countDownLatchReuseThree.countDown();
564+
throw new RuntimeException("Another woooops... " + receivedTopic);
565+
}
566+
567+
}
568+
476569
@Component
477570
static class CountDownLatchContainer {
478571

@@ -488,6 +581,9 @@ static class CountDownLatchContainer {
488581
CountDownLatch countDownLatchDltTwo = new CountDownLatch(1);
489582
CountDownLatch countDownLatchDltThree = new CountDownLatch(1);
490583
CountDownLatch countDownLatchDltFour = new CountDownLatch(1);
584+
CountDownLatch countDownLatchReuseOne = new CountDownLatch(2);
585+
CountDownLatch countDownLatchReuseTwo = new CountDownLatch(5);
586+
CountDownLatch countDownLatchReuseThree = new CountDownLatch(5);
491587
CountDownLatch customDltCountdownLatch = new CountDownLatch(1);
492588
CountDownLatch customErrorHandlerCountdownLatch = new CountDownLatch(6);
493589
CountDownLatch customMessageConverterCountdownLatch = new CountDownLatch(6);
@@ -628,6 +724,21 @@ public NoRetryTopicListener noRetryTopicListener() {
628724
return new NoRetryTopicListener();
629725
}
630726

727+
@Bean
728+
public FirstReuseRetryTopicListener firstReuseRetryTopicListener() {
729+
return new FirstReuseRetryTopicListener();
730+
}
731+
732+
@Bean
733+
public SecondReuseRetryTopicListener secondReuseRetryTopicListener() {
734+
return new SecondReuseRetryTopicListener();
735+
}
736+
737+
@Bean
738+
public ThirdReuseRetryTopicListener thirdReuseRetryTopicListener() {
739+
return new ThirdReuseRetryTopicListener();
740+
}
741+
631742
@Bean
632743
CountDownLatchContainer latchContainer() {
633744
return new CountDownLatchContainer();

0 commit comments

Comments
 (0)