Skip to content

Commit df16a88

Browse files
committed
Fix REUSABLE_RETRY_TOPIC validation to allow multiple DLTs
Signed-off-by: LeeHyungGeol <[email protected]>
1 parent 17fe0ff commit df16a88

File tree

4 files changed

+500
-8
lines changed

4 files changed

+500
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
* @author Yvette Quinby
5454
* @author Adrian Chlebosz
5555
* @author Omer Celik
56+
* @author Hyunggeol Lee
5657
* @since 2.7
5758
*
5859
*/
@@ -256,10 +257,16 @@ private void validateDestinations(List<DestinationTopic> destinationsToAdd) {
256257
for (int i = 0; i < destinationsToAdd.size(); i++) {
257258
DestinationTopic destination = destinationsToAdd.get(i);
258259
if (destination.isReusableRetryTopic()) {
259-
Assert.isTrue((i == (destinationsToAdd.size() - 1) ||
260-
((i == (destinationsToAdd.size() - 2)) && (destinationsToAdd.get(i + 1).isDltTopic()))),
261-
String.format("In the destination topic chain, the type %s can only be "
262-
+ "specified as the last retry topic.", Type.REUSABLE_RETRY_TOPIC));
260+
// Allow multiple DLTs after REUSABLE_RETRY_TOPIC
261+
boolean isLastOrFollowedOnlyByDlts = (i == destinationsToAdd.size() - 1) ||
262+
destinationsToAdd.subList(i + 1, destinationsToAdd.size())
263+
.stream()
264+
.allMatch(DestinationTopic::isDltTopic);
265+
266+
Assert.isTrue(isLastOrFollowedOnlyByDlts,
267+
() -> String.format("In the destination topic chain, the type %s can only be " +
268+
"specified as the last retry topic (followed only by DLT topics).",
269+
Type.REUSABLE_RETRY_TOPIC));
263270
}
264271
}
265272
}

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java

Lines changed: 101 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,14 @@
3636
import org.springframework.kafka.support.converter.ConversionException;
3737
import org.springframework.kafka.support.serializer.DeserializationException;
3838

39-
import static org.assertj.core.api.Assertions.assertThat;
40-
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
41-
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
42-
import static org.assertj.core.api.Assertions.assertThatNullPointerException;
39+
import static org.assertj.core.api.Assertions.*;
4340

4441
/**
4542
* @author Tomaz Fernandes
4643
* @author Yvette Quinby
4744
* @author Gary Russell
4845
* @author Adrian Chlebosz
46+
* @author Hyunggeol Lee
4947
* @since 2.7
5048
*/
5149
@ExtendWith(MockitoExtension.class)
@@ -290,4 +288,103 @@ void shouldNotMarkContainerRefeshedOnOtherContextRefresh() {
290288
assertThat(defaultDestinationTopicContainer.isContextRefreshed()).isFalse();
291289
}
292290

291+
@Test
292+
void shouldAllowReusableRetryTopicWithSingleDlt() {
293+
assertThatNoException()
294+
.isThrownBy(() -> defaultDestinationTopicContainer
295+
.addDestinationTopics("id", allFifthDestinationTopics));
296+
}
297+
298+
@Test
299+
void shouldAllowReusableRetryTopicWithMultipleDlts() {
300+
assertThatNoException()
301+
.isThrownBy(() -> defaultDestinationTopicContainer
302+
.addDestinationTopics("id", allSixthDestinationTopics));
303+
}
304+
305+
@Test
306+
void shouldAllowReusableRetryTopicAsLastTopic() {
307+
List<DestinationTopic> topics = Arrays.asList(
308+
mainDestinationTopic5,
309+
reusableRetryDestinationTopic5
310+
);
311+
312+
assertThatNoException()
313+
.isThrownBy(() -> defaultDestinationTopicContainer
314+
.addDestinationTopics("id", topics));
315+
}
316+
317+
@Test
318+
void shouldRejectReusableRetryTopicFollowedByRegularRetry() {
319+
List<DestinationTopic> topics = Arrays.asList(
320+
mainDestinationTopic6,
321+
reusableRetryDestinationTopic6,
322+
invalidRetryDestinationTopic6,
323+
dltDestinationTopic6
324+
);
325+
326+
assertThatIllegalArgumentException()
327+
.isThrownBy(() -> defaultDestinationTopicContainer
328+
.addDestinationTopics("id", topics))
329+
.withMessageContaining("REUSABLE_RETRY_TOPIC")
330+
.withMessageContaining("followed only by DLT topics");
331+
}
332+
333+
@Test
334+
void shouldRejectReusableRetryTopicFollowedByNoOps() {
335+
List<DestinationTopic> topics = Arrays.asList(
336+
mainDestinationTopic6,
337+
reusableRetryDestinationTopic6,
338+
noOpsDestinationTopic6,
339+
dltDestinationTopic6
340+
);
341+
342+
assertThatIllegalArgumentException()
343+
.isThrownBy(() -> defaultDestinationTopicContainer
344+
.addDestinationTopics("id", topics))
345+
.withMessageContaining("REUSABLE_RETRY_TOPIC")
346+
.withMessageContaining("followed only by DLT topics");
347+
}
348+
349+
@Test
350+
void shouldAllowReusableRetryTopicWithTwoDlts() {
351+
List<DestinationTopic> topics = Arrays.asList(
352+
mainDestinationTopic6,
353+
reusableRetryDestinationTopic6,
354+
customDltDestinationTopic6,
355+
dltDestinationTopic6
356+
);
357+
358+
assertThatNoException()
359+
.isThrownBy(() -> defaultDestinationTopicContainer
360+
.addDestinationTopics("id", topics));
361+
}
362+
363+
@Test
364+
void shouldAllowReusableRetryTopicWithDifferentDltCombinations() {
365+
List<DestinationTopic> topics = Arrays.asList(
366+
mainDestinationTopic6,
367+
reusableRetryDestinationTopic6,
368+
validationDltDestinationTopic6,
369+
dltDestinationTopic6
370+
);
371+
372+
assertThatNoException()
373+
.isThrownBy(() -> defaultDestinationTopicContainer
374+
.addDestinationTopics("id", topics));
375+
}
376+
377+
@Test
378+
void shouldRejectReusableRetryTopicFollowedByMainTopic() {
379+
List<DestinationTopic> topics = Arrays.asList(
380+
mainDestinationTopic6,
381+
reusableRetryDestinationTopic6,
382+
mainDestinationTopic5
383+
);
384+
385+
assertThatIllegalArgumentException()
386+
.isThrownBy(() -> defaultDestinationTopicContainer
387+
.addDestinationTopics("id", topics))
388+
.withMessageContaining("REUSABLE_RETRY_TOPIC");
389+
}
293390
}

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
/**
3232
* @author Tomaz Fernandes
3333
* @author Adrian Chlebosz
34+
* @author Hyunggeol Lee
3435
* @since 2.7
3536
*/
3637
public class DestinationTopicTests {
@@ -144,6 +145,33 @@ public class DestinationTopicTests {
144145
new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1,
145146
DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null, Collections.emptySet());
146147

148+
protected DestinationTopic.Properties mainTopicProps6 =
149+
new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1,
150+
DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout);
151+
152+
protected DestinationTopic.Properties reusableRetryTopicProps6 =
153+
new DestinationTopic.Properties(1000, retrySuffix, DestinationTopic.Type.REUSABLE_RETRY_TOPIC, 4, 1,
154+
DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout);
155+
156+
protected DestinationTopic.Properties customDltTopicProps6 =
157+
new DestinationTopic.Properties(0, "-custom" + dltSuffix, DestinationTopic.Type.DLT, 4, 1,
158+
DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null,
159+
Set.of(IllegalStateException.class));
160+
161+
protected DestinationTopic.Properties validationDltTopicProps6 =
162+
new DestinationTopic.Properties(0, "-validation" + dltSuffix, DestinationTopic.Type.DLT, 4, 1,
163+
DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null,
164+
Set.of(IllegalArgumentException.class));
165+
166+
protected DestinationTopic.Properties dltTopicProps6 =
167+
new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1,
168+
DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null,
169+
Collections.emptySet());
170+
171+
protected DestinationTopic.Properties invalidRetryProps6 =
172+
new DestinationTopic.Properties(2000, retrySuffix + "-2000", DestinationTopic.Type.RETRY, 4, 1,
173+
DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout);
174+
147175
// Holders
148176

149177
protected final static String FIRST_TOPIC = "firstTopic";
@@ -285,6 +313,38 @@ public class DestinationTopicTests {
285313
protected List<DestinationTopic> allFifthDestinationTopics = Arrays
286314
.asList(mainDestinationTopic5, reusableRetryDestinationTopic5, dltDestinationTopic5);
287315

316+
protected final static String SIXTH_TOPIC = "sixthTopic";
317+
318+
protected DestinationTopic mainDestinationTopic6 =
319+
new DestinationTopic(SIXTH_TOPIC + mainTopicProps6.suffix(), mainTopicProps6);
320+
321+
protected DestinationTopic reusableRetryDestinationTopic6 =
322+
new DestinationTopic(SIXTH_TOPIC + reusableRetryTopicProps6.suffix(), reusableRetryTopicProps6);
323+
324+
protected DestinationTopic customDltDestinationTopic6 =
325+
new DestinationTopic(SIXTH_TOPIC + customDltTopicProps6.suffix(), customDltTopicProps6);
326+
327+
protected DestinationTopic validationDltDestinationTopic6 =
328+
new DestinationTopic(SIXTH_TOPIC + validationDltTopicProps6.suffix(), validationDltTopicProps6);
329+
330+
protected DestinationTopic dltDestinationTopic6 =
331+
new DestinationTopic(SIXTH_TOPIC + dltTopicProps6.suffix(), dltTopicProps6);
332+
333+
protected DestinationTopic invalidRetryDestinationTopic6 =
334+
new DestinationTopic(SIXTH_TOPIC + invalidRetryProps6.suffix(), invalidRetryProps6);
335+
336+
protected DestinationTopic noOpsDestinationTopic6 =
337+
new DestinationTopic(dltDestinationTopic6.getDestinationName() + "-noOps",
338+
new DestinationTopic.Properties(dltTopicProps6, "-noOps", DestinationTopic.Type.NO_OPS));
339+
340+
protected List<DestinationTopic> allSixthDestinationTopics = Arrays.asList(
341+
mainDestinationTopic6,
342+
reusableRetryDestinationTopic6,
343+
customDltDestinationTopic6,
344+
validationDltDestinationTopic6,
345+
dltDestinationTopic6
346+
);
347+
288348
// Exception matchers
289349

290350
private final ExceptionMatcher allowListExceptionMatcher = ExceptionMatcher.forAllowList().add(IllegalArgumentException.class).build();

0 commit comments

Comments
 (0)