Skip to content

Commit d290b5e

Browse files
authored
Fix REUSABLE_RETRY_TOPIC validation to allow multiple DLTs (#4095)
Fixes #3834 **Auto-cherry-pick to `3.3.x`** Relaxes the validation constraint for `REUSABLE_RETRY_TOPIC` to allow it to be followed by multiple DLT topics, enabling exception-based DLT routing with reusable retry topics. Problem: When using `sameIntervalTopicReuseStrategy = SINGLE_TOPIC` with multiple DLTs configured through `exceptionBasedDltRouting`, the application failed to start with: `IllegalArgumentException`: In the destination topic chain, the type `REUSABLE_RETRY_TOPIC` can only be specified as the last retry topic. This prevented exception-based DLT routing scenarios like: Main Topic → Reusable Retry Topic → Custom DLT → Default DLT Previous Behavior: `REUSABLE_RETRY_TOPIC` could only be: - The last topic in the chain, OR - Followed by exactly one DLT topic New Behavior: `REUSABLE_RETRY_TOPIC` can now be followed by multiple DLT topics, as long as all subsequent topics are DLT types. The validation still correctly rejects configurations where `REUSABLE_RETRY_TOPIC` is followed by regular `RETRY, MAIN`, or `NO_OPS` topics. Changes: - Updates `validateDestinations()` to use stream-based validation that checks all subsequent topics are DLTs - Uses lambda supplier for error message (lazy evaluation for better performance on the happy path) - Improves error message clarity: "followed only by DLT topics" - Adds 8 unit tests covering positive and negative validation cases - Adds 3 integration tests verifying runtime behavior with multiple DLTs, single DLT, and no DLT scenarios Signed-off-by: LeeHyungGeol <[email protected]>
1 parent 81f2941 commit d290b5e

File tree

4 files changed

+503
-4
lines changed

4 files changed

+503
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
* @author Yvette Quinby
5454
* @author Adrian Chlebosz
5555
* @author Omer Celik
56+
* @author Hyunggeol Lee
5657
* @since 2.7
5758
*
5859
*/
@@ -256,10 +257,16 @@ private void validateDestinations(List<DestinationTopic> destinationsToAdd) {
256257
for (int i = 0; i < destinationsToAdd.size(); i++) {
257258
DestinationTopic destination = destinationsToAdd.get(i);
258259
if (destination.isReusableRetryTopic()) {
259-
Assert.isTrue((i == (destinationsToAdd.size() - 1) ||
260-
((i == (destinationsToAdd.size() - 2)) && (destinationsToAdd.get(i + 1).isDltTopic()))),
261-
String.format("In the destination topic chain, the type %s can only be "
262-
+ "specified as the last retry topic.", Type.REUSABLE_RETRY_TOPIC));
260+
// Allow multiple DLTs after REUSABLE_RETRY_TOPIC
261+
boolean isLastOrFollowedOnlyByDlts = (i == destinationsToAdd.size() - 1) ||
262+
destinationsToAdd.subList(i + 1, destinationsToAdd.size())
263+
.stream()
264+
.allMatch(DestinationTopic::isDltTopic);
265+
266+
Assert.isTrue(isLastOrFollowedOnlyByDlts,
267+
() -> String.format("In the destination topic chain, the type %s can only be " +
268+
"specified as the last retry topic (followed only by DLT topics).",
269+
Type.REUSABLE_RETRY_TOPIC));
263270
}
264271
}
265272
}

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,15 @@
3939
import static org.assertj.core.api.Assertions.assertThat;
4040
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
4141
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
42+
import static org.assertj.core.api.Assertions.assertThatNoException;
4243
import static org.assertj.core.api.Assertions.assertThatNullPointerException;
4344

4445
/**
4546
* @author Tomaz Fernandes
4647
* @author Yvette Quinby
4748
* @author Gary Russell
4849
* @author Adrian Chlebosz
50+
* @author Hyunggeol Lee
4951
* @since 2.7
5052
*/
5153
@ExtendWith(MockitoExtension.class)
@@ -290,4 +292,103 @@ void shouldNotMarkContainerRefeshedOnOtherContextRefresh() {
290292
assertThat(defaultDestinationTopicContainer.isContextRefreshed()).isFalse();
291293
}
292294

295+
@Test
296+
void shouldAllowReusableRetryTopicWithSingleDlt() {
297+
assertThatNoException()
298+
.isThrownBy(() -> defaultDestinationTopicContainer
299+
.addDestinationTopics("id", allFifthDestinationTopics));
300+
}
301+
302+
@Test
303+
void shouldAllowReusableRetryTopicWithMultipleDlts() {
304+
assertThatNoException()
305+
.isThrownBy(() -> defaultDestinationTopicContainer
306+
.addDestinationTopics("id", allSixthDestinationTopics));
307+
}
308+
309+
@Test
310+
void shouldAllowReusableRetryTopicAsLastTopic() {
311+
List<DestinationTopic> topics = Arrays.asList(
312+
mainDestinationTopic5,
313+
reusableRetryDestinationTopic5
314+
);
315+
316+
assertThatNoException()
317+
.isThrownBy(() -> defaultDestinationTopicContainer
318+
.addDestinationTopics("id", topics));
319+
}
320+
321+
@Test
322+
void shouldRejectReusableRetryTopicFollowedByRegularRetry() {
323+
List<DestinationTopic> topics = Arrays.asList(
324+
mainDestinationTopic6,
325+
reusableRetryDestinationTopic6,
326+
invalidRetryDestinationTopic6,
327+
dltDestinationTopic6
328+
);
329+
330+
assertThatIllegalArgumentException()
331+
.isThrownBy(() -> defaultDestinationTopicContainer
332+
.addDestinationTopics("id", topics))
333+
.withMessageContaining("REUSABLE_RETRY_TOPIC")
334+
.withMessageContaining("followed only by DLT topics");
335+
}
336+
337+
@Test
338+
void shouldRejectReusableRetryTopicFollowedByNoOps() {
339+
List<DestinationTopic> topics = Arrays.asList(
340+
mainDestinationTopic6,
341+
reusableRetryDestinationTopic6,
342+
noOpsDestinationTopic6,
343+
dltDestinationTopic6
344+
);
345+
346+
assertThatIllegalArgumentException()
347+
.isThrownBy(() -> defaultDestinationTopicContainer
348+
.addDestinationTopics("id", topics))
349+
.withMessageContaining("REUSABLE_RETRY_TOPIC")
350+
.withMessageContaining("followed only by DLT topics");
351+
}
352+
353+
@Test
354+
void shouldAllowReusableRetryTopicWithTwoDlts() {
355+
List<DestinationTopic> topics = Arrays.asList(
356+
mainDestinationTopic6,
357+
reusableRetryDestinationTopic6,
358+
customDltDestinationTopic6,
359+
dltDestinationTopic6
360+
);
361+
362+
assertThatNoException()
363+
.isThrownBy(() -> defaultDestinationTopicContainer
364+
.addDestinationTopics("id", topics));
365+
}
366+
367+
@Test
368+
void shouldAllowReusableRetryTopicWithDifferentDltCombinations() {
369+
List<DestinationTopic> topics = Arrays.asList(
370+
mainDestinationTopic6,
371+
reusableRetryDestinationTopic6,
372+
validationDltDestinationTopic6,
373+
dltDestinationTopic6
374+
);
375+
376+
assertThatNoException()
377+
.isThrownBy(() -> defaultDestinationTopicContainer
378+
.addDestinationTopics("id", topics));
379+
}
380+
381+
@Test
382+
void shouldRejectReusableRetryTopicFollowedByMainTopic() {
383+
List<DestinationTopic> topics = Arrays.asList(
384+
mainDestinationTopic6,
385+
reusableRetryDestinationTopic6,
386+
mainDestinationTopic5
387+
);
388+
389+
assertThatIllegalArgumentException()
390+
.isThrownBy(() -> defaultDestinationTopicContainer
391+
.addDestinationTopics("id", topics))
392+
.withMessageContaining("REUSABLE_RETRY_TOPIC");
393+
}
293394
}

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
/**
3232
* @author Tomaz Fernandes
3333
* @author Adrian Chlebosz
34+
* @author Hyunggeol Lee
3435
* @since 2.7
3536
*/
3637
public class DestinationTopicTests {
@@ -144,6 +145,33 @@ public class DestinationTopicTests {
144145
new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1,
145146
DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null, Collections.emptySet());
146147

148+
protected DestinationTopic.Properties mainTopicProps6 =
149+
new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1,
150+
DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout);
151+
152+
protected DestinationTopic.Properties reusableRetryTopicProps6 =
153+
new DestinationTopic.Properties(1000, retrySuffix, DestinationTopic.Type.REUSABLE_RETRY_TOPIC, 4, 1,
154+
DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout);
155+
156+
protected DestinationTopic.Properties customDltTopicProps6 =
157+
new DestinationTopic.Properties(0, "-custom" + dltSuffix, DestinationTopic.Type.DLT, 4, 1,
158+
DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null,
159+
Set.of(IllegalStateException.class));
160+
161+
protected DestinationTopic.Properties validationDltTopicProps6 =
162+
new DestinationTopic.Properties(0, "-validation" + dltSuffix, DestinationTopic.Type.DLT, 4, 1,
163+
DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null,
164+
Set.of(IllegalArgumentException.class));
165+
166+
protected DestinationTopic.Properties dltTopicProps6 =
167+
new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1,
168+
DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null,
169+
Collections.emptySet());
170+
171+
protected DestinationTopic.Properties invalidRetryProps6 =
172+
new DestinationTopic.Properties(2000, retrySuffix + "-2000", DestinationTopic.Type.RETRY, 4, 1,
173+
DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout);
174+
147175
// Holders
148176

149177
protected final static String FIRST_TOPIC = "firstTopic";
@@ -285,6 +313,38 @@ public class DestinationTopicTests {
285313
protected List<DestinationTopic> allFifthDestinationTopics = Arrays
286314
.asList(mainDestinationTopic5, reusableRetryDestinationTopic5, dltDestinationTopic5);
287315

316+
protected final static String SIXTH_TOPIC = "sixthTopic";
317+
318+
protected DestinationTopic mainDestinationTopic6 =
319+
new DestinationTopic(SIXTH_TOPIC + mainTopicProps6.suffix(), mainTopicProps6);
320+
321+
protected DestinationTopic reusableRetryDestinationTopic6 =
322+
new DestinationTopic(SIXTH_TOPIC + reusableRetryTopicProps6.suffix(), reusableRetryTopicProps6);
323+
324+
protected DestinationTopic customDltDestinationTopic6 =
325+
new DestinationTopic(SIXTH_TOPIC + customDltTopicProps6.suffix(), customDltTopicProps6);
326+
327+
protected DestinationTopic validationDltDestinationTopic6 =
328+
new DestinationTopic(SIXTH_TOPIC + validationDltTopicProps6.suffix(), validationDltTopicProps6);
329+
330+
protected DestinationTopic dltDestinationTopic6 =
331+
new DestinationTopic(SIXTH_TOPIC + dltTopicProps6.suffix(), dltTopicProps6);
332+
333+
protected DestinationTopic invalidRetryDestinationTopic6 =
334+
new DestinationTopic(SIXTH_TOPIC + invalidRetryProps6.suffix(), invalidRetryProps6);
335+
336+
protected DestinationTopic noOpsDestinationTopic6 =
337+
new DestinationTopic(dltDestinationTopic6.getDestinationName() + "-noOps",
338+
new DestinationTopic.Properties(dltTopicProps6, "-noOps", DestinationTopic.Type.NO_OPS));
339+
340+
protected List<DestinationTopic> allSixthDestinationTopics = Arrays.asList(
341+
mainDestinationTopic6,
342+
reusableRetryDestinationTopic6,
343+
customDltDestinationTopic6,
344+
validationDltDestinationTopic6,
345+
dltDestinationTopic6
346+
);
347+
288348
// Exception matchers
289349

290350
private final ExceptionMatcher allowListExceptionMatcher = ExceptionMatcher.forAllowList().add(IllegalArgumentException.class).build();

0 commit comments

Comments
 (0)