diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index bbc8ae8ccb..1faaf16379 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -38,6 +38,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledFuture; @@ -73,6 +74,7 @@ import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; +import org.springframework.aop.support.AopUtils; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.ObjectProvider; import org.springframework.context.ApplicationContext; @@ -106,6 +108,8 @@ import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption; import org.springframework.kafka.listener.ContainerProperties.EOSMode; import org.springframework.kafka.listener.adapter.AsyncRepliesAware; +import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter; +import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaUtils; @@ -167,6 +171,7 @@ * @author Mikael Carlstedt * @author Borahm Lee * @author Lokesh Alamuri + * @author Sanghyeok An */ public class KafkaMessageListenerContainer // NOSONAR line count extends AbstractMessageListenerContainer implements ConsumerPauseResumeEventPublisher { @@ -840,6 +845,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private volatile long lastPoll = System.currentTimeMillis(); + private final ConcurrentLinkedDeque> failedRecords = new ConcurrentLinkedDeque<>(); + @SuppressWarnings(UNCHECKED) ListenerConsumer(GenericMessageListener listener, ListenerType listenerType, ObservationRegistry observationRegistry) { @@ -895,6 +902,16 @@ else if (listener instanceof MessageListener) { this.wantsFullRecords = false; this.pollThreadStateProcessor = setUpPollProcessor(false); this.observationEnabled = this.containerProperties.isObservationEnabled(); + + if (!AopUtils.isAopProxy(this.genericListener) && + this.genericListener instanceof KafkaBackoffAwareMessageListenerAdapter) { + KafkaBackoffAwareMessageListenerAdapter genListener = + (KafkaBackoffAwareMessageListenerAdapter) this.genericListener; + if (genListener.getDelegate() instanceof RecordMessagingMessageListenerAdapter adapterListener) { + // This means that the async retry feature is supported only for SingleRecordListener with @RetryableTopic. + adapterListener.setCallbackForAsyncFailure(this::callbackForAsyncFailure); + } + } } else { throw new IllegalArgumentException("Listener must be one of 'MessageListener', " @@ -1294,6 +1311,15 @@ public void run() { boolean failedAuthRetry = false; this.lastReceive = System.currentTimeMillis(); while (isRunning()) { + + try { + handleAsyncFailure(); + } + catch (Exception e) { + ListenerConsumer.this.logger.error( + "Failed to process async retry messages. skip this time, try it again next loop."); + } + try { pollAndInvoke(); if (failedAuthRetry) { @@ -1435,6 +1461,29 @@ protected void pollAndInvoke() { } } + protected void handleAsyncFailure() { + List> copyFailedRecords = new ArrayList<>(); + while (!this.failedRecords.isEmpty()) { + FailedRecordTuple failedRecordTuple = this.failedRecords.pollFirst(); + copyFailedRecords.add(failedRecordTuple); + } + + // If any copied and failed record fails to complete due to an unexpected error, + // We will give up on retrying with the remaining copied and failed Records. + for (FailedRecordTuple copyFailedRecord : copyFailedRecords) { + try { + invokeErrorHandlerBySingleRecord(copyFailedRecord); + } + catch (Exception e) { + this.logger.warn(() -> + "Async failed record failed to complete, thus skip it. record :" + + copyFailedRecord.toString() + + ", Exception : " + + e.getMessage()); + } + } + } + private void doProcessCommits() { if (!this.autoCommit && !this.isRecordAck) { try { @@ -2827,6 +2876,44 @@ private void doInvokeOnMessage(final ConsumerRecord recordArg) { } } + private void invokeErrorHandlerBySingleRecord(final FailedRecordTuple failedRecordTuple) { + final ConsumerRecord cRecord = failedRecordTuple.record; + RuntimeException rte = failedRecordTuple.ex; + if (this.commonErrorHandler.seeksAfterHandling() || rte instanceof CommitFailedException) { + try { + if (this.producer == null) { + processCommits(); + } + } + catch (Exception ex) { // NO SONAR + this.logger.error(ex, "Failed to commit before handling error"); + } + List> records = new ArrayList<>(); + records.add(cRecord); + this.commonErrorHandler.handleRemaining(rte, records, this.consumer, + KafkaMessageListenerContainer.this.thisOrParentContainer); + } + else { + boolean handled = false; + try { + handled = this.commonErrorHandler.handleOne(rte, cRecord, this.consumer, + KafkaMessageListenerContainer.this.thisOrParentContainer); + } + catch (Exception ex) { + this.logger.error(ex, "ErrorHandler threw unexpected exception"); + } + Map>> records = new LinkedHashMap<>(); + if (!handled) { + records.computeIfAbsent(new TopicPartition(cRecord.topic(), cRecord.partition()), + tp -> new ArrayList<>()).add(cRecord); + } + if (!records.isEmpty()) { + this.remainingRecords = new ConsumerRecords<>(records); + this.pauseForPending = true; + } + } + } + private void invokeErrorHandler(final ConsumerRecord cRecord, Iterator> iterator, RuntimeException rte) { @@ -3299,6 +3386,10 @@ private Collection> getHighestOffsetRecords(ConsumerRecords .values(); } + private void callbackForAsyncFailure(ConsumerRecord cRecord, RuntimeException ex) { + this.failedRecords.addLast(new FailedRecordTuple<>(cRecord, ex)); + } + @Override public void seek(String topic, int partition, long offset) { this.seeks.add(new TopicPartitionOffset(topic, partition, offset)); @@ -3913,4 +4004,6 @@ private static class StopAfterFenceException extends KafkaException { } + private record FailedRecordTuple(ConsumerRecord record, RuntimeException ex) { }; + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 9ae1567e67..9fc07d6827 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -28,6 +28,8 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.BiConsumer; import java.util.stream.Collectors; import org.apache.commons.logging.LogFactory; @@ -91,6 +93,7 @@ * @author Wang ZhiYang * @author Huijin Hong * @author Soby Chacko + * @author Sanghyeok An */ public abstract class MessagingMessageListenerAdapter implements ConsumerSeekAware, AsyncRepliesAware { @@ -152,6 +155,9 @@ public abstract class MessagingMessageListenerAdapter implements ConsumerS private String correlationHeaderName = KafkaHeaders.CORRELATION_ID; + @Nullable + private BiConsumer, RuntimeException> callbackForAsyncFailure; + /** * Create an instance with the provided bean and method. * @param bean the bean. @@ -665,16 +671,28 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm Throwable t, Message source) { try { + Throwable cause = t instanceof CompletionException ? t.getCause() : t; handleException(request, acknowledgment, consumer, source, - new ListenerExecutionFailedException(createMessagingErrorMessage( - "Async Fail", source.getPayload()), t)); + new ListenerExecutionFailedException(createMessagingErrorMessage( + "Async Fail", source.getPayload()), cause)); } catch (Throwable ex) { this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source); acknowledge(acknowledgment); + if (canAsyncRetry(request, ex) && + Objects.nonNull(this.callbackForAsyncFailure)) { + @SuppressWarnings("unchecked") + ConsumerRecord record = (ConsumerRecord) request; + this.callbackForAsyncFailure.accept(record, (RuntimeException) ex); + } } } + private static boolean canAsyncRetry(Object request, Throwable exception) { + // The async retry with @RetryableTopic is only supported for SingleRecord Listener. + return request instanceof ConsumerRecord && exception instanceof RuntimeException; + } + protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, Consumer consumer, Message message, ListenerExecutionFailedException e) { @@ -869,6 +887,18 @@ private boolean rawByParameterIsType(Type parameterType, Type type) { return parameterType instanceof ParameterizedType pType && pType.getRawType().equals(type); } + /** + * Set the retry callback for failures of both {@link CompletableFuture} and {@link Mono}. + * {@link MessagingMessageListenerAdapter#asyncFailure(Object, Acknowledgment, Consumer, Throwable, Message)} + * will invoke {@link MessagingMessageListenerAdapter#callbackForAsyncFailure} when + * {@link CompletableFuture} or {@link Mono} fails to complete. + * @param asyncRetryCallback the callback for async retry. + * @since 3.3 + */ + public void setCallbackForAsyncFailure(BiConsumer, RuntimeException> asyncRetryCallback) { + this.callbackForAsyncFailure = asyncRetryCallback; + } + /** * Root object for reply expression evaluation. * @param request the request. diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.java new file mode 100644 index 0000000000..a0cbf2de06 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.java @@ -0,0 +1,965 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.retrytopic; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.awaitility.Awaitility.await; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.DltHandler; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaHandler; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.PartitionOffset; +import org.springframework.kafka.annotation.RetryableTopic; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.config.TopicBuilder; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.kafka.core.KafkaAdmin.NewTopics; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.ContainerProperties.AckMode; +import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.messaging.Message; +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.converter.GenericMessageConverter; +import org.springframework.messaging.converter.SmartMessageConverter; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.retry.annotation.Backoff; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.ReflectionUtils; + +/** + * @author Sanghyeok An + * @since 3.3.0 + */ + +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka +@TestPropertySource(properties = { "five.attempts=5", "kafka.template=customKafkaTemplate"}) +public class AsyncCompletableFutureRetryTopicClassLevelIntegrationTests { + + public final static String FIRST_TOPIC = "myRetryTopic1"; + + public final static String SECOND_TOPIC = "myRetryTopic2"; + + public final static String THIRD_TOPIC = "myRetryTopic3"; + + public final static String FOURTH_TOPIC = "myRetryTopic4"; + + public final static String TWO_LISTENERS_TOPIC = "myRetryTopic5"; + + public final static String MANUAL_TOPIC = "myRetryTopic6"; + + public final static String NOT_RETRYABLE_EXCEPTION_TOPIC = "noRetryTopic"; + + public final static String FIRST_REUSE_RETRY_TOPIC = "reuseRetry1"; + + public final static String SECOND_REUSE_RETRY_TOPIC = "reuseRetry2"; + + public final static String THIRD_REUSE_RETRY_TOPIC = "reuseRetry3"; + + private final static String MAIN_TOPIC_CONTAINER_FACTORY = "kafkaListenerContainerFactory"; + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Autowired + private CountDownLatchContainer latchContainer; + + @Autowired + DestinationTopicContainer topicContainer; + + @Test + void shouldRetryFirstTopic(@Autowired KafkaListenerEndpointRegistry registry) { + kafkaTemplate.send(FIRST_TOPIC, "Testing topic 1"); + assertThat(topicContainer.getNextDestinationTopicFor("firstTopicId", FIRST_TOPIC).getDestinationName()) + .isEqualTo("myRetryTopic1-retry"); + assertThat(awaitLatch(latchContainer.countDownLatch1)).isTrue(); + assertThat(awaitLatch(latchContainer.customDltCountdownLatch)).isTrue(); + assertThat(awaitLatch(latchContainer.customErrorHandlerCountdownLatch)).isTrue(); + assertThat(awaitLatch(latchContainer.customMessageConverterCountdownLatch)).isTrue(); + registry.getListenerContainerIds().stream() + .filter(id -> id.startsWith("first")) + .forEach(id -> { + ConcurrentMessageListenerContainer container = (ConcurrentMessageListenerContainer) registry + .getListenerContainer(id); + if (id.equals("firstTopicId")) { + assertThat(container.getConcurrency()).isEqualTo(2); + } + else { + assertThat(container.getConcurrency()) + .describedAs("Expected %s to have concurrency", id) + .isEqualTo(1); + } + }); + } + + @Test + void shouldRetrySecondTopic() { + kafkaTemplate.send(SECOND_TOPIC, "Testing topic 2"); + assertThat(awaitLatch(latchContainer.countDownLatch2)).isTrue(); + assertThat(awaitLatch(latchContainer.customDltCountdownLatch)).isTrue(); + } + + @Test + void shouldRetryThirdTopicWithTimeout( + @Autowired KafkaAdmin admin, + @Autowired KafkaListenerEndpointRegistry registry) throws Exception { + + kafkaTemplate.send(THIRD_TOPIC, "Testing topic 3"); + assertThat(awaitLatch(latchContainer.countDownLatch3)).isTrue(); + assertThat(awaitLatch(latchContainer.countDownLatchDltOne)).isTrue(); + Map topics = admin.describeTopics(THIRD_TOPIC, THIRD_TOPIC + "-dlt", FOURTH_TOPIC); + assertThat(topics.get(THIRD_TOPIC).partitions()).hasSize(2); + assertThat(topics.get(THIRD_TOPIC + "-dlt").partitions()).hasSize(3); + assertThat(topics.get(FOURTH_TOPIC).partitions()).hasSize(2); + AtomicReference method = new AtomicReference<>(); + ReflectionUtils.doWithMethods(KafkaAdmin.class, m -> { + m.setAccessible(true); + method.set(m); + }, m -> m.getName().equals("newTopics")); + registry.getListenerContainerIds().stream() + .filter(id -> id.startsWith("third")) + .forEach(id -> { + ConcurrentMessageListenerContainer container = + (ConcurrentMessageListenerContainer) registry.getListenerContainer(id); + if (id.equals("thirdTopicId")) { + assertThat(container.getConcurrency()).isEqualTo(2); + } + else { + assertThat(container.getConcurrency()) + .describedAs("Expected %s to have concurrency", id) + .isEqualTo(1); + } + }); + } + + @Test + void shouldRetryFourthTopicWithNoDlt() { + kafkaTemplate.send(FOURTH_TOPIC, "Testing topic 4"); + assertThat(awaitLatch(latchContainer.countDownLatch4)).isTrue(); + } + + @Test + void shouldRetryFifthTopicWithTwoListenersAndManualAssignment( + @Autowired FifthTopicListener1 listener1, + @Autowired FifthTopicListener2 listener2) { + + kafkaTemplate.send(TWO_LISTENERS_TOPIC, 0, "0", "Testing topic 5 - 0"); + kafkaTemplate.send(TWO_LISTENERS_TOPIC, 1, "0", "Testing topic 5 - 1"); + assertThat(awaitLatch(latchContainer.countDownLatch51)).isTrue(); + assertThat(awaitLatch(latchContainer.countDownLatch52)).isTrue(); + assertThat(awaitLatch(latchContainer.countDownLatchDltThree)).isTrue(); + assertThat(listener1.topics).containsExactly( + TWO_LISTENERS_TOPIC, + TWO_LISTENERS_TOPIC + "-listener1-0", + TWO_LISTENERS_TOPIC + "-listener1-1", + TWO_LISTENERS_TOPIC + "-listener1-2", + TWO_LISTENERS_TOPIC + "-listener1-dlt" + ); + assertThat(listener2.topics).containsExactly( + TWO_LISTENERS_TOPIC, + TWO_LISTENERS_TOPIC + "-listener2-0", + TWO_LISTENERS_TOPIC + "-listener2-1", + TWO_LISTENERS_TOPIC + "-listener2-2", + TWO_LISTENERS_TOPIC + "-listener2-dlt"); + } + + @Test + void shouldRetryManualTopicWithDefaultDlt( + @Autowired KafkaListenerEndpointRegistry registry, + @Autowired ConsumerFactory cf) { + + kafkaTemplate.send(MANUAL_TOPIC, "Testing topic 6"); + assertThat(awaitLatch(latchContainer.countDownLatch6)).isTrue(); + registry.getListenerContainerIds().stream() + .filter(id -> id.startsWith("manual")) + .forEach(id -> { + ConcurrentMessageListenerContainer container = + (ConcurrentMessageListenerContainer) registry.getListenerContainer(id); + assertThat(container) + .extracting("commonErrorHandler") + .extracting("seekAfterError", InstanceOfAssertFactories.BOOLEAN) + .isFalse(); + }); + Consumer consumer = cf.createConsumer("manual-dlt", ""); + Set tp = + Set.of(new org.apache.kafka.common.TopicPartition(MANUAL_TOPIC + "-dlt", 0)); + consumer.assign(tp); + try { + await().untilAsserted(() -> { + OffsetAndMetadata offsetAndMetadata = consumer.committed(tp).get(tp.iterator().next()); + assertThat(offsetAndMetadata).isNotNull(); + assertThat(offsetAndMetadata.offset()).isEqualTo(1L); + }); + } + finally { + consumer.close(); + } + } + + @Test + void shouldFirstReuseRetryTopic(@Autowired + FirstReuseRetryTopicListener listener1, + @Autowired + SecondReuseRetryTopicListener listener2, @Autowired + ThirdReuseRetryTopicListener listener3) { + + kafkaTemplate.send(FIRST_REUSE_RETRY_TOPIC, "Testing reuse topic 1"); + kafkaTemplate.send(SECOND_REUSE_RETRY_TOPIC, "Testing reuse topic 2"); + kafkaTemplate.send(THIRD_REUSE_RETRY_TOPIC, "Testing reuse topic 3"); + assertThat(awaitLatch(latchContainer.countDownLatchReuseOne)).isTrue(); + assertThat(awaitLatch(latchContainer.countDownLatchReuseTwo)).isTrue(); + assertThat(awaitLatch(latchContainer.countDownLatchReuseThree)).isTrue(); + assertThat(listener1.topics).containsExactly( + FIRST_REUSE_RETRY_TOPIC, + FIRST_REUSE_RETRY_TOPIC + "-retry"); + assertThat(listener2.topics).containsExactly( + SECOND_REUSE_RETRY_TOPIC, + SECOND_REUSE_RETRY_TOPIC + "-retry-30", + SECOND_REUSE_RETRY_TOPIC + "-retry-60", + SECOND_REUSE_RETRY_TOPIC + "-retry-100", + SECOND_REUSE_RETRY_TOPIC + "-retry-100"); + assertThat(listener3.topics).containsExactly( + THIRD_REUSE_RETRY_TOPIC, + THIRD_REUSE_RETRY_TOPIC + "-retry", + THIRD_REUSE_RETRY_TOPIC + "-retry", + THIRD_REUSE_RETRY_TOPIC + "-retry", + THIRD_REUSE_RETRY_TOPIC + "-retry"); + } + + @Test + void shouldGoStraightToDlt() { + kafkaTemplate.send(NOT_RETRYABLE_EXCEPTION_TOPIC, "Testing topic with annotation 1"); + assertThat(awaitLatch(latchContainer.countDownLatchNoRetry)).isTrue(); + assertThat(awaitLatch(latchContainer.countDownLatchDltTwo)).isTrue(); + } + + private boolean awaitLatch(CountDownLatch latch) { + try { + return latch.await(60, TimeUnit.SECONDS); + } + catch (Exception e) { + fail(e.getMessage()); + throw new RuntimeException(e); + } + } + + @KafkaListener( + id = "firstTopicId", + topics = FIRST_TOPIC, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY, + errorHandler = "myCustomErrorHandler", + contentTypeConverter = "myCustomMessageConverter", + concurrency = "2") + static class FirstTopicListener { + + @Autowired + DestinationTopicContainer topicContainer; + + @Autowired + CountDownLatchContainer container; + + @KafkaHandler + public CompletableFuture listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + return CompletableFuture.supplyAsync(() -> { + try { + throw new RuntimeException("Woooops... in topic " + receivedTopic); + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatch1.countDown(); + } + }); + } + + } + + @KafkaListener(topics = SECOND_TOPIC, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY) + static class SecondTopicListener { + + @Autowired + CountDownLatchContainer container; + + @KafkaHandler + public CompletableFuture listenAgain(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + return CompletableFuture.supplyAsync(() -> { + try { + throw new IllegalStateException("Another woooops... " + receivedTopic); + } + catch (IllegalStateException e) { + throw e; + } + finally { + container.countDownIfNotKnown(receivedTopic, container.countDownLatch2); + } + }); + } + } + + @RetryableTopic( + attempts = "${five.attempts}", + backoff = @Backoff(delay = 250, maxDelay = 1000, multiplier = 1.5), + numPartitions = "#{3}", + timeout = "${missing.property:100000}", + include = MyRetryException.class, kafkaTemplate = "${kafka.template}", + topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE, + concurrency = "1") + @KafkaListener( + id = "thirdTopicId", + topics = THIRD_TOPIC, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY, + concurrency = "2") + static class ThirdTopicListener { + + @Autowired + CountDownLatchContainer container; + + @KafkaHandler + public CompletableFuture listenWithAnnotation(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + return CompletableFuture.supplyAsync(() -> { + try { + throw new MyRetryException("Annotated woooops... " + receivedTopic); + } + catch (MyRetryException e) { + throw e; + } + finally { + container.countDownIfNotKnown(receivedTopic, container.countDownLatch3); + } + }); + } + + @DltHandler + public void annotatedDltMethod(Object message) { + container.countDownLatchDltOne.countDown(); + } + } + + @RetryableTopic(dltStrategy = DltStrategy.NO_DLT, attempts = "4", backoff = @Backoff(300), + sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS, + kafkaTemplate = "${kafka.template}") + @KafkaListener(topics = FOURTH_TOPIC, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY) + static class FourthTopicListener { + + @Autowired + CountDownLatchContainer container; + + @KafkaHandler + public CompletableFuture listenNoDlt(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + return CompletableFuture.supplyAsync(() -> { + try { + throw new IllegalStateException("Another woooops... " + receivedTopic); + } + catch (IllegalStateException e) { + throw e; + } + finally { + container.countDownIfNotKnown(receivedTopic, container.countDownLatch4); + } + }); + } + + @DltHandler + public void shouldNotGetHere() { + fail("Dlt should not be processed!"); + } + } + + static class AbstractFifthTopicListener { + + final List topics = Collections.synchronizedList(new ArrayList<>()); + + @Autowired + CountDownLatchContainer container; + + @DltHandler + public void annotatedDltMethod(ConsumerRecord record) { + this.topics.add(record.topic()); + container.countDownLatchDltThree.countDown(); + } + + } + + @RetryableTopic( + attempts = "4", + backoff = @Backoff(250), + numPartitions = "2", + retryTopicSuffix = "-listener1", + dltTopicSuffix = "-listener1-dlt", + topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE, + sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS, + kafkaTemplate = "${kafka.template}") + @KafkaListener( + id = "fifthTopicId1", + topicPartitions = {@org.springframework.kafka.annotation.TopicPartition(topic = TWO_LISTENERS_TOPIC, + partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))}, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY) + static class FifthTopicListener1 extends AbstractFifthTopicListener { + + @KafkaHandler + public CompletableFuture listenWithAnnotation(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.topics.add(receivedTopic); + return CompletableFuture.supplyAsync(() -> { + try { + throw new RuntimeException("Annotated woooops... " + receivedTopic); + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownIfNotKnown(receivedTopic, container.countDownLatch51); + } + }); + } + + } + + @RetryableTopic( + attempts = "4", + backoff = @Backoff(250), + numPartitions = "2", + retryTopicSuffix = "-listener2", + dltTopicSuffix = "-listener2-dlt", + topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE, + sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS, + kafkaTemplate = "${kafka.template}") + @KafkaListener( + id = "fifthTopicId2", + topicPartitions = {@org.springframework.kafka.annotation.TopicPartition(topic = TWO_LISTENERS_TOPIC, + partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "0"))}, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY) + static class FifthTopicListener2 extends AbstractFifthTopicListener { + + @KafkaHandler + public CompletableFuture listenWithAnnotation2(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.topics.add(receivedTopic); + return CompletableFuture.supplyAsync(() -> { + try { + throw new RuntimeException("Annotated woooops... " + receivedTopic); + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatch52.countDown(); + } + }); + } + + } + + @RetryableTopic( + attempts = "4", + backoff = @Backoff(50), + sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS) + @KafkaListener( + id = "manual", + topics = MANUAL_TOPIC, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY) + static class SixthTopicDefaultDLTListener { + + @Autowired + CountDownLatchContainer container; + + @KafkaHandler + public CompletableFuture listenNoDlt( + String message, + @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic, + @SuppressWarnings("unused") Acknowledgment ack) { + return CompletableFuture.supplyAsync(() -> { + try { + throw new IllegalStateException("Another woooops... " + receivedTopic); + } + catch (IllegalStateException e) { + throw e; + } + finally { + container.countDownIfNotKnown(receivedTopic, container.countDownLatch6); + } + }); + } + + } + + @RetryableTopic( + attempts = "3", + numPartitions = "3", + exclude = MyDontRetryException.class, + backoff = @Backoff(delay = 50, maxDelay = 100, multiplier = 3), + traversingCauses = "true", + kafkaTemplate = "${kafka.template}") + @KafkaListener(topics = NOT_RETRYABLE_EXCEPTION_TOPIC, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY) + static class NoRetryTopicListener { + + @Autowired + CountDownLatchContainer container; + + @KafkaHandler + public CompletableFuture listenWithAnnotation2(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + return CompletableFuture.supplyAsync(() -> { + try { + throw new MyDontRetryException("Annotated second woooops... " + receivedTopic); + } + catch (MyDontRetryException e) { + throw e; + } + finally { + container.countDownIfNotKnown(receivedTopic, container.countDownLatchNoRetry); + } + }); + } + + @DltHandler + public void annotatedDltMethod(Object message) { + container.countDownLatchDltTwo.countDown(); + } + } + + @RetryableTopic( + attempts = "2", + backoff = @Backoff(50)) + @KafkaListener( + id = "reuseRetry1", + topics = FIRST_REUSE_RETRY_TOPIC, + containerFactory = "retryTopicListenerContainerFactory") + static class FirstReuseRetryTopicListener { + + final List topics = Collections.synchronizedList(new ArrayList<>()); + + @Autowired + CountDownLatchContainer container; + + @KafkaHandler + public CompletableFuture listen1(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.topics.add(receivedTopic); + return CompletableFuture.supplyAsync(() -> { + try { + throw new RuntimeException("Another woooops... " + receivedTopic); + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatchReuseOne.countDown(); + } + }); + } + + } + + @RetryableTopic( + attempts = "5", + backoff = @Backoff(delay = 30, maxDelay = 100, multiplier = 2)) + @KafkaListener( + id = "reuseRetry2", + topics = SECOND_REUSE_RETRY_TOPIC, + containerFactory = "retryTopicListenerContainerFactory") + static class SecondReuseRetryTopicListener { + + final List topics = Collections.synchronizedList(new ArrayList<>()); + + @Autowired + CountDownLatchContainer container; + + @KafkaHandler + public CompletableFuture listen2(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.topics.add(receivedTopic); + return CompletableFuture.supplyAsync(() -> { + try { + throw new RuntimeException("Another woooops... " + receivedTopic); + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatchReuseTwo.countDown(); + } + }); + } + + } + + @RetryableTopic(attempts = "5", backoff = @Backoff(delay = 1, maxDelay = 5, multiplier = 1.4)) + @KafkaListener(id = "reuseRetry3", topics = THIRD_REUSE_RETRY_TOPIC, + containerFactory = "retryTopicListenerContainerFactory") + static class ThirdReuseRetryTopicListener { + + final List topics = Collections.synchronizedList(new ArrayList<>()); + + @Autowired + CountDownLatchContainer container; + + @KafkaHandler + public CompletableFuture listen3(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.topics.add(receivedTopic); + return CompletableFuture.supplyAsync(() -> { + try { + throw new RuntimeException("Another woooops... " + receivedTopic); + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatchReuseThree.countDown(); + } + }); + + } + + } + + static class CountDownLatchContainer { + + CountDownLatch countDownLatch1 = new CountDownLatch(5); + + CountDownLatch countDownLatch2 = new CountDownLatch(3); + + CountDownLatch countDownLatch3 = new CountDownLatch(3); + + CountDownLatch countDownLatch4 = new CountDownLatch(4); + + CountDownLatch countDownLatch51 = new CountDownLatch(4); + + CountDownLatch countDownLatch52 = new CountDownLatch(4); + + CountDownLatch countDownLatch6 = new CountDownLatch(4); + + CountDownLatch countDownLatchNoRetry = new CountDownLatch(1); + + CountDownLatch countDownLatchDltOne = new CountDownLatch(1); + + CountDownLatch countDownLatchDltTwo = new CountDownLatch(1); + + CountDownLatch countDownLatchDltThree = new CountDownLatch(2); + + CountDownLatch countDownLatchReuseOne = new CountDownLatch(2); + + CountDownLatch countDownLatchReuseTwo = new CountDownLatch(5); + + CountDownLatch countDownLatchReuseThree = new CountDownLatch(5); + + CountDownLatch customDltCountdownLatch = new CountDownLatch(1); + + CountDownLatch customErrorHandlerCountdownLatch = new CountDownLatch(6); + + CountDownLatch customMessageConverterCountdownLatch = new CountDownLatch(6); + + final List knownTopics = new ArrayList<>(); + + private void countDownIfNotKnown(String receivedTopic, CountDownLatch countDownLatch) { + synchronized (knownTopics) { + if (!knownTopics.contains(receivedTopic)) { + knownTopics.add(receivedTopic); + countDownLatch.countDown(); + } + } + } + } + + static class MyCustomDltProcessor { + + @Autowired + KafkaTemplate kafkaTemplate; + + @Autowired + CountDownLatchContainer container; + + public void processDltMessage(Object message) { + container.customDltCountdownLatch.countDown(); + throw new RuntimeException("Dlt Error!"); + } + } + + @SuppressWarnings("serial") + static class MyRetryException extends RuntimeException { + MyRetryException(String msg) { + super(msg); + } + } + + @SuppressWarnings("serial") + static class MyDontRetryException extends RuntimeException { + MyDontRetryException(String msg) { + super(msg); + } + } + + @Configuration + static class RetryTopicConfigurations extends RetryTopicConfigurationSupport { + + private static final String DLT_METHOD_NAME = "processDltMessage"; + + @Bean + RetryTopicConfiguration firstRetryTopic(KafkaTemplate template) { + return RetryTopicConfigurationBuilder + .newInstance() + .fixedBackOff(50) + .maxAttempts(5) + .concurrency(1) + .useSingleTopicForSameIntervals() + .includeTopic(FIRST_TOPIC) + .doNotRetryOnDltFailure() + .dltHandlerMethod("myCustomDltProcessor", DLT_METHOD_NAME) + .create(template); + } + + @Bean + RetryTopicConfiguration secondRetryTopic(KafkaTemplate template) { + return RetryTopicConfigurationBuilder + .newInstance() + .exponentialBackoff(500, 2, 10000) + .retryOn(Arrays.asList(IllegalStateException.class, IllegalAccessException.class)) + .traversingCauses() + .includeTopic(SECOND_TOPIC) + .doNotRetryOnDltFailure() + .dltHandlerMethod("myCustomDltProcessor", DLT_METHOD_NAME) + .create(template); + } + + @Bean + FirstTopicListener firstTopicListener() { + return new FirstTopicListener(); + } + + @Bean + KafkaListenerErrorHandler myCustomErrorHandler( + CountDownLatchContainer container) { + return (message, exception) -> { + container.customErrorHandlerCountdownLatch.countDown(); + throw exception; + }; + } + + @Bean + SmartMessageConverter myCustomMessageConverter( + CountDownLatchContainer container) { + return new CompositeMessageConverter(Collections.singletonList(new GenericMessageConverter())) { + + @Override + public Object fromMessage(Message message, Class targetClass, Object conversionHint) { + container.customMessageConverterCountdownLatch.countDown(); + return super.fromMessage(message, targetClass, conversionHint); + } + }; + } + + @Bean + SecondTopicListener secondTopicListener() { + return new SecondTopicListener(); + } + + @Bean + ThirdTopicListener thirdTopicListener() { + return new ThirdTopicListener(); + } + + @Bean + FourthTopicListener fourthTopicListener() { + return new FourthTopicListener(); + } + + @Bean + FifthTopicListener1 fifthTopicListener1() { + return new FifthTopicListener1(); + } + + @Bean + FifthTopicListener2 fifthTopicListener2() { + return new FifthTopicListener2(); + } + + @Bean + SixthTopicDefaultDLTListener manualTopicListener() { + return new SixthTopicDefaultDLTListener(); + } + + @Bean + NoRetryTopicListener noRetryTopicListener() { + return new NoRetryTopicListener(); + } + + @Bean + FirstReuseRetryTopicListener firstReuseRetryTopicListener() { + return new FirstReuseRetryTopicListener(); + } + + @Bean + SecondReuseRetryTopicListener secondReuseRetryTopicListener() { + return new SecondReuseRetryTopicListener(); + } + + @Bean + ThirdReuseRetryTopicListener thirdReuseRetryTopicListener() { + return new ThirdReuseRetryTopicListener(); + } + + @Bean + CountDownLatchContainer latchContainer() { + return new CountDownLatchContainer(); + } + + @Bean + MyCustomDltProcessor myCustomDltProcessor() { + return new MyCustomDltProcessor(); + } + } + + @Configuration + static class KafkaProducerConfig { + + @Autowired + EmbeddedKafkaBroker broker; + + @Bean + ProducerFactory producerFactory() { + Map props = KafkaTestUtils.producerProps( + this.broker.getBrokersAsString()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new DefaultKafkaProducerFactory<>(props); + } + + @Bean("customKafkaTemplate") + KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + } + + @EnableKafka + @Configuration + static class KafkaConsumerConfig { + + @Autowired + EmbeddedKafkaBroker broker; + + @Bean + KafkaAdmin kafkaAdmin() { + Map configs = new HashMap<>(); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker.getBrokersAsString()); + return new KafkaAdmin(configs); + } + + @Bean + NewTopic topic() { + return TopicBuilder.name(THIRD_TOPIC).partitions(2).replicas(1).build(); + } + + @Bean + NewTopics topics() { + return new NewTopics(TopicBuilder.name(FOURTH_TOPIC).partitions(2).replicas(1).build()); + } + + @Bean + ConsumerFactory consumerFactory() { + Map props = KafkaTestUtils.consumerProps( + this.broker.getBrokersAsString(), + "groupId", + "false"); + props.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + ConcurrentKafkaListenerContainerFactory retryTopicListenerContainerFactory( + ConsumerFactory consumerFactory) { + + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + ContainerProperties props = factory.getContainerProperties(); + props.setIdleEventInterval(100L); + props.setPollTimeout(50L); + props.setIdlePartitionEventInterval(100L); + factory.setConsumerFactory(consumerFactory); + factory.setConcurrency(1); + factory.setContainerCustomizer( + container -> container.getContainerProperties().setIdlePartitionEventInterval(100L)); + return factory; + } + + @Bean + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory consumerFactory) { + + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + factory.setConcurrency(1); + factory.setContainerCustomizer(container -> { + if (container.getListenerId().startsWith("manual")) { + container.getContainerProperties().setAckMode(AckMode.MANUAL); + container.getContainerProperties().setAsyncAcks(true); + } + }); + return factory; + } + + @Bean + TaskScheduler sched() { + return new ThreadPoolTaskScheduler(); + } + + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicScenarioTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicScenarioTests.java new file mode 100644 index 0000000000..0bb52e0d63 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicScenarioTests.java @@ -0,0 +1,1162 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.retrytopic; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaHandler; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.ContainerProperties.AckMode; +import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.messaging.Message; +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.converter.GenericMessageConverter; +import org.springframework.messaging.converter.SmartMessageConverter; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Sanghyeok An + * @since 3.3.0 + */ + +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka +@TestPropertySource(properties = { "five.attempts=5", "kafka.template=customKafkaTemplate"}) +public class AsyncCompletableFutureRetryTopicScenarioTests { + + private final static String MAIN_TOPIC_CONTAINER_FACTORY = "kafkaListenerContainerFactory"; + + public final static String TEST_TOPIC0 = "myRetryTopic0"; + + public final static String TEST_TOPIC1 = "myRetryTopic1"; + + public final static String TEST_TOPIC2 = "myRetryTopic2"; + + public final static String TEST_TOPIC3 = "myRetryTopic3"; + + public final static String TEST_TOPIC4 = "myRetryTopic4"; + + public final static String TEST_TOPIC5 = "myRetryTopic5"; + + public final static String TEST_TOPIC6 = "myRetryTopic6"; + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Autowired + private CountDownLatchContainer latchContainer; + + @Autowired + DestinationTopicContainer topicContainer; + + @Test + void allFailCaseTest( + @Autowired TestTopicListener0 zeroTopicListener, + @Autowired MyCustomDltProcessor myCustomDltProcessor0) { + // All Fail case. + String shortFailedMsg1 = "0"; + String shortFailedMsg2 = "1"; + String shortFailedMsg3 = "2"; + DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("0-topicId", TEST_TOPIC0); + + String expectedRetryTopic = TEST_TOPIC0 + "-retry"; + String[] expectedReceivedMsgs = { + shortFailedMsg1, + shortFailedMsg2, + shortFailedMsg3, + shortFailedMsg1, + shortFailedMsg2, + shortFailedMsg3, + shortFailedMsg1, + shortFailedMsg2, + shortFailedMsg3, + shortFailedMsg1, + shortFailedMsg2, + shortFailedMsg3, + shortFailedMsg1, + shortFailedMsg2, + shortFailedMsg3 + }; + String[] expectedReceivedTopics = { + TEST_TOPIC0, + TEST_TOPIC0, + TEST_TOPIC0, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic + }; + String[] expectedDltMsgs = { + shortFailedMsg1, + shortFailedMsg2, + shortFailedMsg3 + }; + + // When + kafkaTemplate.send(TEST_TOPIC0, shortFailedMsg1); + kafkaTemplate.send(TEST_TOPIC0, shortFailedMsg2); + kafkaTemplate.send(TEST_TOPIC0, shortFailedMsg3); + + // Then + assertThat(awaitLatch(latchContainer.countDownLatch0)).isTrue(); + assertThat(awaitLatch(latchContainer.dltCountdownLatch0)).isTrue(); + + assertThat(destinationTopic.getDestinationName()).isEqualTo(TEST_TOPIC0 + "-retry"); + + assertThat(zeroTopicListener.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); + assertThat(zeroTopicListener.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + + assertThat(myCustomDltProcessor0.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); + } + + @Test + void firstShortFailAndLastLongSuccessRetryTest( + @Autowired TestTopicListener1 testTopicListener1, + @Autowired MyCustomDltProcessor myCustomDltProcessor1) { + // Given + String longSuccessMsg = "3"; + String shortFailedMsg = "1"; + DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("1-topicId", TEST_TOPIC1); + + String expectedRetryTopic = TEST_TOPIC1 + "-retry"; + String[] expectedReceivedMsgs = { + shortFailedMsg, + longSuccessMsg, + shortFailedMsg, + shortFailedMsg, + shortFailedMsg, + shortFailedMsg + }; + + String[] expectedReceivedTopics = { + TEST_TOPIC1, + TEST_TOPIC1, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic + }; + + String[] expectedDltMsgs = { + shortFailedMsg + }; + + + // When + kafkaTemplate.send(TEST_TOPIC1, shortFailedMsg); + kafkaTemplate.send(TEST_TOPIC1, longSuccessMsg); + + // Then + assertThat(awaitLatch(latchContainer.countDownLatch1)).isTrue(); + assertThat(awaitLatch(latchContainer.dltCountdownLatch1)).isTrue(); + + assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); + assertThat(testTopicListener1.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); + assertThat(testTopicListener1.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + + assertThat(myCustomDltProcessor1.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); + } + + @Test + void firstLongSuccessAndLastShortFailed( + @Autowired TestTopicListener2 zero2TopicListener, + @Autowired MyCustomDltProcessor myCustomDltProcessor2) { + // Given + String shortFailedMsg = "1"; + String longSuccessMsg = "3"; + DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("2-topicId", TEST_TOPIC2); + + String expectedRetryTopic = TEST_TOPIC2 + "-retry"; + String[] expectedReceivedMsgs = { + longSuccessMsg, + shortFailedMsg, + shortFailedMsg, + shortFailedMsg, + shortFailedMsg, + shortFailedMsg + }; + + String[] expectedReceivedTopics = { + TEST_TOPIC2, + TEST_TOPIC2, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic + }; + + String[] expectedDltMsgs = { + shortFailedMsg + }; + + // When + kafkaTemplate.send(TEST_TOPIC2, longSuccessMsg); + kafkaTemplate.send(TEST_TOPIC2, shortFailedMsg); + + // Then + + assertThat(awaitLatch(latchContainer.countDownLatch2)).isTrue(); + assertThat(awaitLatch(latchContainer.dltCountdownLatch2)).isTrue(); + + assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); + assertThat(zero2TopicListener.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); + assertThat(zero2TopicListener.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + + assertThat(myCustomDltProcessor2.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); + } + + @Test + void longFailMsgTwiceThenShortSucessMsgThird( + @Autowired TestTopicListener3 testTopicListener3, + @Autowired MyCustomDltProcessor myCustomDltProcessor3) { + // Given + DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("3-topicId", TEST_TOPIC3); + + String expectedRetryTopic = TEST_TOPIC3 + "-retry"; + String[] expectedReceivedMsgs = { + TestTopicListener3.LONG_FAIL_MSG, + TestTopicListener3.LONG_FAIL_MSG, + TestTopicListener3.SHORT_SUCCESS_MSG, + TestTopicListener3.SHORT_SUCCESS_MSG, + TestTopicListener3.SHORT_SUCCESS_MSG, + TestTopicListener3.LONG_FAIL_MSG, + TestTopicListener3.LONG_FAIL_MSG, + TestTopicListener3.LONG_FAIL_MSG, + TestTopicListener3.LONG_FAIL_MSG, + TestTopicListener3.LONG_FAIL_MSG, + TestTopicListener3.LONG_FAIL_MSG, + TestTopicListener3.LONG_FAIL_MSG, + TestTopicListener3.LONG_FAIL_MSG, + }; + + String[] expectedReceivedTopics = { + TEST_TOPIC3, + TEST_TOPIC3, + TEST_TOPIC3, + TEST_TOPIC3, + TEST_TOPIC3, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic + }; + + String[] expectedDltMsgs = { + TestTopicListener3.LONG_FAIL_MSG, + TestTopicListener3.LONG_FAIL_MSG, + }; + + // When + kafkaTemplate.send(TEST_TOPIC3, TestTopicListener3.LONG_FAIL_MSG); + kafkaTemplate.send(TEST_TOPIC3, TestTopicListener3.LONG_FAIL_MSG); + kafkaTemplate.send(TEST_TOPIC3, TestTopicListener3.SHORT_SUCCESS_MSG); + kafkaTemplate.send(TEST_TOPIC3, TestTopicListener3.SHORT_SUCCESS_MSG); + kafkaTemplate.send(TEST_TOPIC3, TestTopicListener3.SHORT_SUCCESS_MSG); + + // Then + assertThat(awaitLatch(latchContainer.countDownLatch3)).isTrue(); + assertThat(awaitLatch(latchContainer.dltCountdownLatch3)).isTrue(); + + assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); + assertThat(testTopicListener3.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); + assertThat(testTopicListener3.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + + assertThat(myCustomDltProcessor3.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); + } + + @Test + void longSuccessMsgTwiceThenShortFailMsgTwice( + @Autowired TestTopicListener4 topicListener4, + @Autowired MyCustomDltProcessor myCustomDltProcessor4) { + // Given + DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("4-TopicId", TEST_TOPIC4); + + String expectedRetryTopic = TEST_TOPIC4 + "-retry"; + String[] expectedReceivedMsgs = { + TestTopicListener4.LONG_SUCCESS_MSG, + TestTopicListener4.LONG_SUCCESS_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + }; + + String[] expectedReceivedTopics = { + TEST_TOPIC4, + TEST_TOPIC4, + TEST_TOPIC4, + TEST_TOPIC4, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + }; + + String[] expectedDltMsgs = { + TestTopicListener4.SHORT_FAIL_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + }; + + // When + kafkaTemplate.send(TEST_TOPIC4, TestTopicListener4.LONG_SUCCESS_MSG); + kafkaTemplate.send(TEST_TOPIC4, TestTopicListener4.LONG_SUCCESS_MSG); + kafkaTemplate.send(TEST_TOPIC4, TestTopicListener4.SHORT_FAIL_MSG); + kafkaTemplate.send(TEST_TOPIC4, TestTopicListener4.SHORT_FAIL_MSG); + + // Then + assertThat(awaitLatch(latchContainer.countDownLatch4)).isTrue(); + assertThat(awaitLatch(latchContainer.dltCountdownLatch4)).isTrue(); + + assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); + assertThat(topicListener4.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); + assertThat(topicListener4.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + + assertThat(myCustomDltProcessor4.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); + } + + @Test + void oneLongSuccessMsgBetween100ShortFailMsg( + @Autowired TestTopicListener5 topicListener5, + @Autowired MyCustomDltProcessor myCustomDltProcessor5) { + // Given + DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("5-TopicId", TEST_TOPIC5); + + String expectedRetryTopic = TEST_TOPIC5 + "-retry"; + + String[] expectedReceivedMsgs = new String[501]; + for (int i = 0; i < 500; i++) { + expectedReceivedMsgs[i] = TestTopicListener5.SHORT_FAIL_MSG; + } + expectedReceivedMsgs[500] = TestTopicListener5.LONG_SUCCESS_MSG; + + + String[] expectedReceivedTopics = new String[501]; + for (int i = 0; i < 100; i++) { + expectedReceivedTopics[i] = TEST_TOPIC5; + } + for (int i = 100; i < 500; i++) { + expectedReceivedTopics[i] = expectedRetryTopic; + } + expectedReceivedTopics[500] = TEST_TOPIC5; + + + String[] expectedDltMsgs = new String[100]; + for (int i = 0; i < 100; i++) { + expectedDltMsgs[i] = TestTopicListener5.SHORT_FAIL_MSG; + } + + // When + for (int i = 0; i < 100; i++) { + kafkaTemplate.send(TEST_TOPIC5, TestTopicListener5.SHORT_FAIL_MSG); + if (i == 50) { + kafkaTemplate.send(TEST_TOPIC5, TestTopicListener5.LONG_SUCCESS_MSG); + } + } + + // Then + assertThat(awaitLatch(latchContainer.countDownLatch5)).isTrue(); + assertThat(awaitLatch(latchContainer.dltCountdownLatch5)).isTrue(); + + assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); + assertThat(topicListener5.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); + assertThat(topicListener5.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + + assertThat(myCustomDltProcessor5.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); + } + + @Test + void halfSuccessMsgAndHalfFailedMsgWithRandomSleepTime( + @Autowired TestTopicListener6 topicListener6, + @Autowired MyCustomDltProcessor myCustomDltProcessor6) { + // Given + DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("6-TopicId", TEST_TOPIC6); + + String expectedRetryTopic = TEST_TOPIC6 + "-retry"; + + Random random = new Random(); + ConcurrentLinkedQueue q = new ConcurrentLinkedQueue<>(); + + for (int i = 0; i < 50; i++) { + int randomSleepAWhile = random.nextInt(1, 100); + String msg = String.valueOf(randomSleepAWhile) + TestTopicListener6.SUCCESS_SUFFIX; + q.add(msg); + } + + for (int i = 0; i < 50; i++) { + int randomSleepAWhile = random.nextInt(1, 100); + String msg = String.valueOf(randomSleepAWhile) + TestTopicListener6.FAIL_SUFFIX; + q.add(msg); + } + + int expectedSuccessMsgCount = 50; + int expectedFailedMsgCount = 250; + + int expectedReceivedOriginalTopicCount = 100; + int expectedReceivedRetryTopicCount = 200; + int expectedReceivedDltMsgCount = 50; + + + // When + while (!q.isEmpty()) { + String successOrFailMsg = q.poll(); + kafkaTemplate.send(TEST_TOPIC6, successOrFailMsg); + } + + // Then + assertThat(awaitLatch(latchContainer.countDownLatch6)).isTrue(); + assertThat(awaitLatch(latchContainer.dltCountdownLatch6)).isTrue(); + + assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); + + long actualReceivedSuccessMsgCount = topicListener6.receivedMsgs.stream() + .map(s -> s.split(",")[1]) + .filter(m -> (',' + m).equals(TestTopicListener6.SUCCESS_SUFFIX)) + .count(); + + long actualReceivedFailedMsgCount = topicListener6.receivedMsgs.stream() + .map(s -> s.split(",")[1]) + .filter(m -> (',' + m).equals( + TestTopicListener6.FAIL_SUFFIX)) + .count(); + + + long actualReceivedOriginalTopicMsgCount = topicListener6.receivedTopics.stream() + .filter(topic -> topic.equals(TEST_TOPIC6)) + .count(); + + long actualReceivedRetryTopicMsgCount = topicListener6.receivedTopics.stream() + .filter(topic -> topic.equals(expectedRetryTopic)) + .count(); + + assertThat(actualReceivedSuccessMsgCount).isEqualTo(expectedSuccessMsgCount); + assertThat(actualReceivedFailedMsgCount).isEqualTo(expectedFailedMsgCount); + assertThat(actualReceivedOriginalTopicMsgCount).isEqualTo(expectedReceivedOriginalTopicCount); + assertThat(actualReceivedRetryTopicMsgCount).isEqualTo(expectedReceivedRetryTopicCount); + + assertThat(myCustomDltProcessor6.receivedMsg.size()).isEqualTo(expectedReceivedDltMsgCount); + } + + private boolean awaitLatch(CountDownLatch latch) { + try { + return latch.await(60, TimeUnit.SECONDS); + } + catch (Exception e) { + fail(e.getMessage()); + throw new RuntimeException(e); + } + + } + + @KafkaListener( + id = "0-topicId", + topics = TEST_TOPIC0, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY, + errorHandler = "myCustomErrorHandler", + contentTypeConverter = "myCustomMessageConverter", + concurrency = "2") + static class TestTopicListener0 { + + @Autowired + CountDownLatchContainer container; + + private final List receivedMsgs = new ArrayList<>(); + + private final List receivedTopics = new ArrayList<>(); + + @KafkaHandler + public CompletableFuture listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.receivedMsgs.add(message); + this.receivedTopics.add(receivedTopic); + return CompletableFuture.supplyAsync(() -> { + try { + throw new RuntimeException("Woooops... in topic " + receivedTopic); + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatch0.countDown(); + } + }, CompletableFuture.delayedExecutor(Integer.parseInt(message), TimeUnit.MILLISECONDS)); + } + + } + + @KafkaListener( + id = "1-topicId", + topics = TEST_TOPIC1, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY, + errorHandler = "myCustomErrorHandler", + contentTypeConverter = "myCustomMessageConverter", + concurrency = "2") + static class TestTopicListener1 { + + @Autowired + CountDownLatchContainer container; + + private final List receivedMsgs = new ArrayList<>(); + + private final List receivedTopics = new ArrayList<>(); + + @KafkaHandler + public CompletableFuture listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.receivedMsgs.add(message); + this.receivedTopics.add(receivedTopic); + return CompletableFuture.supplyAsync(() -> { + try { + if (message.equals("1")) { + throw new RuntimeException("Woooops... in topic " + receivedTopic); + } + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatch1.countDown(); + } + + return "Task Completed"; + }, CompletableFuture.delayedExecutor(Integer.parseInt(message), TimeUnit.MILLISECONDS)); + } + + } + + @KafkaListener( + id = "2-topicId", + topics = TEST_TOPIC2, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY, + errorHandler = "myCustomErrorHandler", + contentTypeConverter = "myCustomMessageConverter", + concurrency = "2") + static class TestTopicListener2 { + + @Autowired + CountDownLatchContainer container; + + protected final List receivedMsgs = new ArrayList<>(); + + private final List receivedTopics = new ArrayList<>(); + + @KafkaHandler + public CompletableFuture listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.receivedMsgs.add(message); + this.receivedTopics.add(receivedTopic); + + return CompletableFuture.supplyAsync(() -> { + try { + if (message.equals("1")) { + throw new RuntimeException("Woooops... in topic " + receivedTopic); + } + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatch2.countDown(); + } + + return "Task Completed"; + }, CompletableFuture.delayedExecutor(Integer.parseInt(message), TimeUnit.MILLISECONDS)); + } + } + + @KafkaListener( + id = "3-topicId", + topics = TEST_TOPIC3, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY, + errorHandler = "myCustomErrorHandler", + contentTypeConverter = "myCustomMessageConverter", + concurrency = "2") + static class TestTopicListener3 { + + @Autowired + CountDownLatchContainer container; + + protected final List receivedMsgs = new ArrayList<>(); + + private final List receivedTopics = new ArrayList<>(); + + public static final String LONG_FAIL_MSG = "100"; + + public static final String SHORT_SUCCESS_MSG = "1"; + + @KafkaHandler + public CompletableFuture listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.receivedMsgs.add(message); + this.receivedTopics.add(receivedTopic); + + return CompletableFuture.supplyAsync(() -> { + try { + if (message.equals(LONG_FAIL_MSG)) { + throw new RuntimeException("Woooops... in topic " + receivedTopic); + } + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatch3.countDown(); + } + + return "Task Completed"; + }, CompletableFuture.delayedExecutor(Integer.parseInt(message), TimeUnit.MILLISECONDS)); + } + } + + @KafkaListener( + id = "4-TopicId", + topics = TEST_TOPIC4, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY, + errorHandler = "myCustomErrorHandler", + contentTypeConverter = "myCustomMessageConverter", + concurrency = "2") + static class TestTopicListener4 { + + @Autowired + CountDownLatchContainer container; + + protected final List receivedMsgs = new ArrayList<>(); + + private final List receivedTopics = new ArrayList<>(); + + public static final String LONG_SUCCESS_MSG = "100"; + + public static final String SHORT_FAIL_MSG = "1"; + + @KafkaHandler + public CompletableFuture listen(String message, + @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.receivedMsgs.add(message); + this.receivedTopics.add(receivedTopic); + + return CompletableFuture.supplyAsync(() -> { + try { + if (message.equals(SHORT_FAIL_MSG)) { + throw new RuntimeException("Woooops... in topic " + receivedTopic); + } + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatch4.countDown(); + } + return "Task Completed"; + }, CompletableFuture.delayedExecutor(Integer.parseInt(message), TimeUnit.MILLISECONDS)); + } + } + + @KafkaListener( + id = "5-TopicId", + topics = TEST_TOPIC5, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY, + errorHandler = "myCustomErrorHandler", + contentTypeConverter = "myCustomMessageConverter", + concurrency = "2") + static class TestTopicListener5 { + + @Autowired + CountDownLatchContainer container; + + protected final List receivedMsgs = new ArrayList<>(); + + private final List receivedTopics = new ArrayList<>(); + + public static final String LONG_SUCCESS_MSG = "100"; + + public static final String SHORT_FAIL_MSG = "1"; + + @KafkaHandler + public CompletableFuture listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.receivedMsgs.add(message); + this.receivedTopics.add(receivedTopic); + + return CompletableFuture.supplyAsync(() -> { + try { + if (message.equals(SHORT_FAIL_MSG)) { + throw new RuntimeException("Woooops... in topic " + receivedTopic); + } + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatch5.countDown(); + } + + return "Task Completed"; + }, CompletableFuture.delayedExecutor(Integer.parseInt(message), TimeUnit.MILLISECONDS)); + } + } + + @KafkaListener( + id = "6-TopicId", + topics = TEST_TOPIC6, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY, + errorHandler = "myCustomErrorHandler", + contentTypeConverter = "myCustomMessageConverter", + concurrency = "2") + static class TestTopicListener6 { + + @Autowired + CountDownLatchContainer container; + + protected final List receivedMsgs = new ArrayList<>(); + + private final List receivedTopics = new ArrayList<>(); + + public static final String SUCCESS_SUFFIX = ",s"; + + public static final String FAIL_SUFFIX = ",f"; + + @KafkaHandler + public CompletableFuture listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.receivedMsgs.add(message); + this.receivedTopics.add(receivedTopic); + + String[] split = message.split(","); + String sleepAWhile = split[0]; + String failOrSuccess = split[1]; + + return CompletableFuture.supplyAsync(() -> { + try { + if (failOrSuccess.equals("f")) { + throw new RuntimeException("Woooops... in topic " + receivedTopic); + } + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatch6.countDown(); + } + + return "Task Completed"; + }, CompletableFuture.delayedExecutor(Integer.parseInt(sleepAWhile), TimeUnit.MILLISECONDS)); + } + } + + static class CountDownLatchContainer { + + static int COUNT0 = 15; + + static int DLT_COUNT0 = 3; + + CountDownLatch countDownLatch0 = new CountDownLatch(COUNT0); + + CountDownLatch dltCountdownLatch0 = new CountDownLatch(DLT_COUNT0); + + static int COUNT1 = 6; + + static int DLT_COUNT1 = 1; + + CountDownLatch countDownLatch1 = new CountDownLatch(COUNT1); + + CountDownLatch dltCountdownLatch1 = new CountDownLatch(DLT_COUNT1); + + static int COUNT2 = 6; + + static int DLT_COUNT2 = 1; + + CountDownLatch countDownLatch2 = new CountDownLatch(COUNT2); + + CountDownLatch dltCountdownLatch2 = new CountDownLatch(DLT_COUNT2); + + static int COUNT3 = 13; + + static int DLT_COUNT3 = 2; + + CountDownLatch countDownLatch3 = new CountDownLatch(COUNT3); + + CountDownLatch dltCountdownLatch3 = new CountDownLatch(DLT_COUNT3); + + static int COUNT4 = 12; + + static int DLT_COUNT4 = 2; + + CountDownLatch countDownLatch4 = new CountDownLatch(COUNT4); + + CountDownLatch dltCountdownLatch4 = new CountDownLatch(DLT_COUNT4); + + static int COUNT5 = 501; + + static int DLT_COUNT5 = 100; + + CountDownLatch countDownLatch5 = new CountDownLatch(COUNT5); + + CountDownLatch dltCountdownLatch5 = new CountDownLatch(DLT_COUNT5); + + static int COUNT6 = 250; + + static int DLT_COUNT6 = 50; + + CountDownLatch countDownLatch6 = new CountDownLatch(COUNT6); + + CountDownLatch dltCountdownLatch6 = new CountDownLatch(DLT_COUNT6); + + } + + static class MyCustomDltProcessor { + + final List receivedMsg = new ArrayList<>(); + + MyCustomDltProcessor(KafkaTemplate kafkaTemplate, + CountDownLatch latch) { + this.kafkaTemplate = kafkaTemplate; + this.latch = latch; + } + + private final KafkaTemplate kafkaTemplate; + + private final CountDownLatch latch; + + public void processDltMessage(String message) { + this.receivedMsg.add(message); + latch.countDown(); + } + } + + @Configuration + static class RetryTopicConfigurations extends RetryTopicConfigurationSupport { + + private static final String DLT_METHOD_NAME = "processDltMessage"; + + static RetryTopicConfiguration createRetryTopicConfiguration( + KafkaTemplate template, + String topicName, + String dltBeanName) { + return RetryTopicConfigurationBuilder + .newInstance() + .fixedBackOff(50) + .maxAttempts(5) + .concurrency(1) + .useSingleTopicForSameIntervals() + .includeTopic(topicName) + .doNotRetryOnDltFailure() + .dltHandlerMethod(dltBeanName, DLT_METHOD_NAME) + .create(template); + } + + @Bean + RetryTopicConfiguration testRetryTopic0(KafkaTemplate template) { + return createRetryTopicConfiguration( + template, + TEST_TOPIC0, + "myCustomDltProcessor0"); + } + + @Bean + RetryTopicConfiguration testRetryTopic1(KafkaTemplate template) { + return createRetryTopicConfiguration( + template, + TEST_TOPIC1, + "myCustomDltProcessor1"); + } + + @Bean + RetryTopicConfiguration testRetryTopic2(KafkaTemplate template) { + return createRetryTopicConfiguration( + template, + TEST_TOPIC2, + "myCustomDltProcessor2"); + } + + @Bean + RetryTopicConfiguration testRetryTopic3(KafkaTemplate template) { + return createRetryTopicConfiguration( + template, + TEST_TOPIC3, + "myCustomDltProcessor3"); + } + + @Bean + RetryTopicConfiguration testRetryTopic4(KafkaTemplate template) { + return createRetryTopicConfiguration( + template, + TEST_TOPIC4, + "myCustomDltProcessor4"); + } + + @Bean + RetryTopicConfiguration testRetryTopic5(KafkaTemplate template) { + return createRetryTopicConfiguration( + template, + TEST_TOPIC5, + "myCustomDltProcessor5"); + } + + @Bean + RetryTopicConfiguration testRetryTopic6(KafkaTemplate template) { + return createRetryTopicConfiguration( + template, + TEST_TOPIC6, + "myCustomDltProcessor6"); + } + + @Bean + KafkaListenerErrorHandler myCustomErrorHandler( + CountDownLatchContainer container) { + return (message, exception) -> { + throw exception; + }; + } + + @Bean + SmartMessageConverter myCustomMessageConverter( + CountDownLatchContainer container) { + return new CompositeMessageConverter(Collections.singletonList(new GenericMessageConverter())) { + + @Override + public Object fromMessage(Message message, Class targetClass, Object conversionHint) { + return super.fromMessage(message, targetClass, conversionHint); + } + }; + } + + @Bean + CountDownLatchContainer latchContainer() { + return new CountDownLatchContainer(); + } + + @Bean + TestTopicListener0 testTopicListener0() { + return new TestTopicListener0(); + } + + @Bean + TestTopicListener1 testTopicListener1() { + return new TestTopicListener1(); + } + + @Bean + TestTopicListener2 testTopicListener2() { + return new TestTopicListener2(); + } + + @Bean + TestTopicListener3 testTopicListener3() { + return new TestTopicListener3(); + } + + @Bean + TestTopicListener4 testTopicListener4() { + return new TestTopicListener4(); + } + + @Bean + TestTopicListener5 testTopicListener5() { + return new TestTopicListener5(); + } + + @Bean + TestTopicListener6 testTopicListener6() { + return new TestTopicListener6(); + } + + @Bean + MyCustomDltProcessor myCustomDltProcessor0( + KafkaTemplate kafkaTemplate, + CountDownLatchContainer latchContainer) { + return new MyCustomDltProcessor(kafkaTemplate, + latchContainer.dltCountdownLatch0); + } + + @Bean + MyCustomDltProcessor myCustomDltProcessor1( + KafkaTemplate kafkaTemplate, + CountDownLatchContainer latchContainer) { + return new MyCustomDltProcessor(kafkaTemplate, + latchContainer.dltCountdownLatch1); + } + + @Bean + MyCustomDltProcessor myCustomDltProcessor2( + KafkaTemplate kafkaTemplate, + CountDownLatchContainer latchContainer) { + return new MyCustomDltProcessor(kafkaTemplate, + latchContainer.dltCountdownLatch2); + } + + @Bean + MyCustomDltProcessor myCustomDltProcessor3( + KafkaTemplate kafkaTemplate, + CountDownLatchContainer latchContainer) { + return new MyCustomDltProcessor(kafkaTemplate, + latchContainer.dltCountdownLatch3); + } + + @Bean + MyCustomDltProcessor myCustomDltProcessor4( + KafkaTemplate kafkaTemplate, + CountDownLatchContainer latchContainer) { + return new MyCustomDltProcessor(kafkaTemplate, + latchContainer.dltCountdownLatch4); + } + + @Bean + MyCustomDltProcessor myCustomDltProcessor5( + KafkaTemplate kafkaTemplate, + CountDownLatchContainer latchContainer) { + return new MyCustomDltProcessor(kafkaTemplate, + latchContainer.dltCountdownLatch5); + } + + @Bean + MyCustomDltProcessor myCustomDltProcessor6( + KafkaTemplate kafkaTemplate, + CountDownLatchContainer latchContainer) { + return new MyCustomDltProcessor(kafkaTemplate, + latchContainer.dltCountdownLatch6); + } + + } + + @Configuration + static class KafkaProducerConfig { + + @Autowired + EmbeddedKafkaBroker broker; + + @Bean + ProducerFactory producerFactory() { + Map props = KafkaTestUtils.producerProps( + this.broker.getBrokersAsString()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new DefaultKafkaProducerFactory<>(props); + } + + @Bean("customKafkaTemplate") + KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + } + + @EnableKafka + @Configuration + static class KafkaConsumerConfig { + + @Autowired + EmbeddedKafkaBroker broker; + + @Bean + ConsumerFactory consumerFactory() { + Map props = KafkaTestUtils.consumerProps( + this.broker.getBrokersAsString(), + "groupId", + "false"); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + ConcurrentKafkaListenerContainerFactory retryTopicListenerContainerFactory( + ConsumerFactory consumerFactory) { + + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + ContainerProperties props = factory.getContainerProperties(); + props.setIdleEventInterval(100L); + props.setPollTimeout(50L); + props.setIdlePartitionEventInterval(100L); + factory.setConsumerFactory(consumerFactory); + factory.setConcurrency(1); + factory.setContainerCustomizer( + container -> container.getContainerProperties().setIdlePartitionEventInterval(100L)); + return factory; + } + + @Bean + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory consumerFactory) { + + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + factory.setConcurrency(1); + factory.setContainerCustomizer(container -> { + if (container.getListenerId().startsWith("manual")) { + container.getContainerProperties().setAckMode(AckMode.MANUAL); + container.getContainerProperties().setAsyncAcks(true); + } + }); + return factory; + } + + @Bean + TaskScheduler sched() { + return new ThreadPoolTaskScheduler(); + } + + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoFutureRetryTopicClassLevelIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoFutureRetryTopicClassLevelIntegrationTests.java new file mode 100644 index 0000000000..469de356df --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoFutureRetryTopicClassLevelIntegrationTests.java @@ -0,0 +1,971 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.retrytopic; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.awaitility.Awaitility.await; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.DltHandler; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaHandler; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.PartitionOffset; +import org.springframework.kafka.annotation.RetryableTopic; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.config.TopicBuilder; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.kafka.core.KafkaAdmin.NewTopics; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.ContainerProperties.AckMode; +import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.messaging.Message; +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.converter.GenericMessageConverter; +import org.springframework.messaging.converter.SmartMessageConverter; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.retry.annotation.Backoff; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import reactor.core.publisher.Mono; + +/** + * @author Sanghyeok An + * @since 3.3.0 + */ + +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka +@TestPropertySource(properties = { "five.attempts=5", "kafka.template=customKafkaTemplate"}) +public class AsyncMonoFutureRetryTopicClassLevelIntegrationTests { + + public final static String FIRST_TOPIC = "myRetryTopic1"; + + public final static String SECOND_TOPIC = "myRetryTopic2"; + + public final static String THIRD_TOPIC = "myRetryTopic3"; + + public final static String FOURTH_TOPIC = "myRetryTopic4"; + + public final static String TWO_LISTENERS_TOPIC = "myRetryTopic5"; + + public final static String MANUAL_TOPIC = "myRetryTopic6"; + + public final static String NOT_RETRYABLE_EXCEPTION_TOPIC = "noRetryTopic"; + + public final static String FIRST_REUSE_RETRY_TOPIC = "reuseRetry1"; + + public final static String SECOND_REUSE_RETRY_TOPIC = "reuseRetry2"; + + public final static String THIRD_REUSE_RETRY_TOPIC = "reuseRetry3"; + + private final static String MAIN_TOPIC_CONTAINER_FACTORY = "kafkaListenerContainerFactory"; + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Autowired + private CountDownLatchContainer latchContainer; + + @Autowired + DestinationTopicContainer topicContainer; + + @Test + void shouldRetryFirstTopic(@Autowired KafkaListenerEndpointRegistry registry) { + kafkaTemplate.send(FIRST_TOPIC, "Testing topic 1"); + assertThat(topicContainer.getNextDestinationTopicFor("firstTopicId", FIRST_TOPIC).getDestinationName()) + .isEqualTo("myRetryTopic1-retry"); + assertThat(awaitLatch(latchContainer.countDownLatch1)).isTrue(); + assertThat(awaitLatch(latchContainer.customDltCountdownLatch)).isTrue(); + assertThat(awaitLatch(latchContainer.customErrorHandlerCountdownLatch)).isTrue(); + assertThat(awaitLatch(latchContainer.customMessageConverterCountdownLatch)).isTrue(); + registry.getListenerContainerIds().stream() + .filter(id -> id.startsWith("first")) + .forEach(id -> { + ConcurrentMessageListenerContainer container = (ConcurrentMessageListenerContainer) registry + .getListenerContainer(id); + if (id.equals("firstTopicId")) { + assertThat(container.getConcurrency()).isEqualTo(2); + } + else { + assertThat(container.getConcurrency()) + .describedAs("Expected %s to have concurrency", id) + .isEqualTo(1); + } + }); + } + + @Test + void shouldRetrySecondTopic() { + kafkaTemplate.send(SECOND_TOPIC, "Testing topic 2"); + assertThat(awaitLatch(latchContainer.countDownLatch2)).isTrue(); + assertThat(awaitLatch(latchContainer.customDltCountdownLatch)).isTrue(); + } + + @Test + void shouldRetryThirdTopicWithTimeout( + @Autowired KafkaAdmin admin, + @Autowired KafkaListenerEndpointRegistry registry) throws Exception { + + kafkaTemplate.send(THIRD_TOPIC, "Testing topic 3"); + assertThat(awaitLatch(latchContainer.countDownLatch3)).isTrue(); + assertThat(awaitLatch(latchContainer.countDownLatchDltOne)).isTrue(); + Map topics = admin.describeTopics(THIRD_TOPIC, THIRD_TOPIC + "-dlt", FOURTH_TOPIC); + assertThat(topics.get(THIRD_TOPIC).partitions()).hasSize(2); + assertThat(topics.get(THIRD_TOPIC + "-dlt").partitions()).hasSize(3); + assertThat(topics.get(FOURTH_TOPIC).partitions()).hasSize(2); + AtomicReference method = new AtomicReference<>(); + org.springframework.util.ReflectionUtils.doWithMethods(KafkaAdmin.class, m -> { + m.setAccessible(true); + method.set(m); + }, m -> m.getName().equals("newTopics")); + registry.getListenerContainerIds().stream() + .filter(id -> id.startsWith("third")) + .forEach(id -> { + ConcurrentMessageListenerContainer container = + (ConcurrentMessageListenerContainer) registry.getListenerContainer(id); + if (id.equals("thirdTopicId")) { + assertThat(container.getConcurrency()).isEqualTo(2); + } + else { + assertThat(container.getConcurrency()) + .describedAs("Expected %s to have concurrency", id) + .isEqualTo(1); + } + }); + } + + @Test + void shouldRetryFourthTopicWithNoDlt() { + kafkaTemplate.send(FOURTH_TOPIC, "Testing topic 4"); + assertThat(awaitLatch(latchContainer.countDownLatch4)).isTrue(); + } + + @Test + void shouldRetryFifthTopicWithTwoListenersAndManualAssignment( + @Autowired FifthTopicListener1 listener1, + @Autowired FifthTopicListener2 listener2) { + + kafkaTemplate.send(TWO_LISTENERS_TOPIC, 0, "0", "Testing topic 5 - 0"); + kafkaTemplate.send(TWO_LISTENERS_TOPIC, 1, "0", "Testing topic 5 - 1"); + assertThat(awaitLatch(latchContainer.countDownLatch51)).isTrue(); + assertThat(awaitLatch(latchContainer.countDownLatch52)).isTrue(); + assertThat(awaitLatch(latchContainer.countDownLatchDltThree)).isTrue(); + assertThat(listener1.topics).containsExactly( + TWO_LISTENERS_TOPIC, + TWO_LISTENERS_TOPIC + "-listener1-0", + TWO_LISTENERS_TOPIC + "-listener1-1", + TWO_LISTENERS_TOPIC + "-listener1-2", + TWO_LISTENERS_TOPIC + "-listener1-dlt"); + assertThat(listener2.topics).containsExactly( + TWO_LISTENERS_TOPIC, + TWO_LISTENERS_TOPIC + "-listener2-0", + TWO_LISTENERS_TOPIC + "-listener2-1", + TWO_LISTENERS_TOPIC + "-listener2-2", + TWO_LISTENERS_TOPIC + "-listener2-dlt"); + } + + @Test + void shouldRetryManualTopicWithDefaultDlt( + @Autowired KafkaListenerEndpointRegistry registry, + @Autowired ConsumerFactory cf) { + + kafkaTemplate.send(MANUAL_TOPIC, "Testing topic 6"); + assertThat(awaitLatch(latchContainer.countDownLatch6)).isTrue(); + registry.getListenerContainerIds().stream() + .filter(id -> id.startsWith("manual")) + .forEach(id -> { + ConcurrentMessageListenerContainer container = + (ConcurrentMessageListenerContainer) registry.getListenerContainer(id); + assertThat(container) + .extracting("commonErrorHandler") + .extracting("seekAfterError", InstanceOfAssertFactories.BOOLEAN) + .isFalse(); + }); + Consumer consumer = cf.createConsumer("manual-dlt", ""); + Set tp = + Set.of(new org.apache.kafka.common.TopicPartition(MANUAL_TOPIC + "-dlt", 0)); + consumer.assign(tp); + try { + await().untilAsserted(() -> { + OffsetAndMetadata offsetAndMetadata = consumer.committed(tp).get(tp.iterator().next()); + assertThat(offsetAndMetadata).isNotNull(); + assertThat(offsetAndMetadata.offset()).isEqualTo(1L); + }); + } + finally { + consumer.close(); + } + } + + @Test + void shouldFirstReuseRetryTopic( + @Autowired FirstReuseRetryTopicListener listener1, + @Autowired SecondReuseRetryTopicListener listener2, + @Autowired ThirdReuseRetryTopicListener listener3) { + + kafkaTemplate.send(FIRST_REUSE_RETRY_TOPIC, "Testing reuse topic 1"); + kafkaTemplate.send(SECOND_REUSE_RETRY_TOPIC, "Testing reuse topic 2"); + kafkaTemplate.send(THIRD_REUSE_RETRY_TOPIC, "Testing reuse topic 3"); + assertThat(awaitLatch(latchContainer.countDownLatchReuseOne)).isTrue(); + assertThat(awaitLatch(latchContainer.countDownLatchReuseTwo)).isTrue(); + assertThat(awaitLatch(latchContainer.countDownLatchReuseThree)).isTrue(); + assertThat(listener1.topics).containsExactly( + FIRST_REUSE_RETRY_TOPIC, + FIRST_REUSE_RETRY_TOPIC + "-retry"); + assertThat(listener2.topics).containsExactly( + SECOND_REUSE_RETRY_TOPIC, + SECOND_REUSE_RETRY_TOPIC + "-retry-30", + SECOND_REUSE_RETRY_TOPIC + "-retry-60", + SECOND_REUSE_RETRY_TOPIC + "-retry-100", + SECOND_REUSE_RETRY_TOPIC + "-retry-100"); + assertThat(listener3.topics).containsExactly( + THIRD_REUSE_RETRY_TOPIC, + THIRD_REUSE_RETRY_TOPIC + "-retry", + THIRD_REUSE_RETRY_TOPIC + "-retry", + THIRD_REUSE_RETRY_TOPIC + "-retry", + THIRD_REUSE_RETRY_TOPIC + "-retry"); + } + + @Test + void shouldGoStraightToDlt() { + kafkaTemplate.send(NOT_RETRYABLE_EXCEPTION_TOPIC, "Testing topic with annotation 1"); + assertThat(awaitLatch(latchContainer.countDownLatchNoRetry)).isTrue(); + assertThat(awaitLatch(latchContainer.countDownLatchDltTwo)).isTrue(); + } + + private boolean awaitLatch(CountDownLatch latch) { + try { + return latch.await(60, TimeUnit.SECONDS); + } + catch (Exception e) { + fail(e.getMessage()); + throw new RuntimeException(e); + } + } + + @KafkaListener( + id = "firstTopicId", + topics = FIRST_TOPIC, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY, + errorHandler = "myCustomErrorHandler", + contentTypeConverter = "myCustomMessageConverter", + concurrency = "2") + static class FirstTopicListener { + + @Autowired + DestinationTopicContainer topicContainer; + + @Autowired + CountDownLatchContainer container; + + @KafkaHandler + public Mono listen( + String message, + @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + return Mono.fromCallable(() -> { + try { + throw new RuntimeException("Woooops... in topic " + receivedTopic); + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatch1.countDown(); + } + }).then(); + } + + } + + @KafkaListener(topics = SECOND_TOPIC, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY) + static class SecondTopicListener { + + @Autowired + CountDownLatchContainer container; + + @KafkaHandler + public Mono listenAgain( + String message, + @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + return Mono.fromCallable(() -> { + try { + throw new IllegalStateException("Another woooops... " + receivedTopic); + } + catch (IllegalStateException e) { + throw e; + } + finally { + container.countDownIfNotKnown(receivedTopic, container.countDownLatch2); + } + }).then(); + } + } + + @RetryableTopic( + attempts = "${five.attempts}", + backoff = @Backoff(delay = 250, maxDelay = 1000, multiplier = 1.5), + numPartitions = "#{3}", + timeout = "${missing.property:100000}", + include = MyRetryException.class, kafkaTemplate = "${kafka.template}", + topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE, + concurrency = "1") + @KafkaListener( + id = "thirdTopicId", + topics = THIRD_TOPIC, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY, + concurrency = "2") + static class ThirdTopicListener { + + @Autowired + CountDownLatchContainer container; + + @KafkaHandler + public Mono listenWithAnnotation( + String message, + @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + return Mono.fromCallable(() -> { + try { + throw new MyRetryException("Annotated woooops... " + receivedTopic); + } + catch (MyRetryException e) { + throw e; + } + finally { + container.countDownIfNotKnown(receivedTopic, container.countDownLatch3); + } + }).then(); + } + + @DltHandler + public void annotatedDltMethod(Object message) { + container.countDownLatchDltOne.countDown(); + } + } + + @RetryableTopic(dltStrategy = DltStrategy.NO_DLT, attempts = "4", backoff = @Backoff(300), + sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS, + kafkaTemplate = "${kafka.template}") + @KafkaListener(topics = FOURTH_TOPIC, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY) + static class FourthTopicListener { + + @Autowired + CountDownLatchContainer container; + + @KafkaHandler + public Mono listenNoDlt( + String message, + @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + return Mono.fromCallable(() -> { + try { + throw new IllegalStateException("Another woooops... " + receivedTopic); + } + catch (IllegalStateException e) { + throw e; + } + finally { + container.countDownIfNotKnown(receivedTopic, container.countDownLatch4); + } + }).then(); + } + + @DltHandler + public void shouldNotGetHere() { + fail("Dlt should not be processed!"); + } + } + + static class AbstractFifthTopicListener { + + final List topics = Collections.synchronizedList(new ArrayList<>()); + + @Autowired + CountDownLatchContainer container; + + @DltHandler + public void annotatedDltMethod(ConsumerRecord record) { + this.topics.add(record.topic()); + container.countDownLatchDltThree.countDown(); + } + + } + + @RetryableTopic( + attempts = "4", + backoff = @Backoff(250), + numPartitions = "2", + retryTopicSuffix = "-listener1", + dltTopicSuffix = "-listener1-dlt", + topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE, + sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS, + kafkaTemplate = "${kafka.template}") + @KafkaListener( + id = "fifthTopicId1", + topicPartitions = {@org.springframework.kafka.annotation.TopicPartition(topic = TWO_LISTENERS_TOPIC, + partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))}, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY) + static class FifthTopicListener1 extends AbstractFifthTopicListener { + + @KafkaHandler + public Mono listenWithAnnotation(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.topics.add(receivedTopic); + return Mono.fromCallable(() -> { + try { + throw new RuntimeException("Annotated woooops... " + receivedTopic); + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownIfNotKnown(receivedTopic, container.countDownLatch51); + } + }).then(); + } + + } + + @RetryableTopic( + attempts = "4", + backoff = @Backoff(250), + numPartitions = "2", + retryTopicSuffix = "-listener2", + dltTopicSuffix = "-listener2-dlt", + topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE, + sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS, + kafkaTemplate = "${kafka.template}") + @KafkaListener( + id = "fifthTopicId2", + topicPartitions = {@org.springframework.kafka.annotation.TopicPartition(topic = TWO_LISTENERS_TOPIC, + partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "0"))}, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY) + static class FifthTopicListener2 extends AbstractFifthTopicListener { + + @KafkaHandler + public Mono listenWithAnnotation2(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.topics.add(receivedTopic); + return Mono.fromCallable(() -> { + try { + throw new RuntimeException("Annotated woooops... " + receivedTopic); + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatch52.countDown(); + } + }).then(); + } + + } + + @RetryableTopic( + attempts = "4", + backoff = @Backoff(50), + sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS) + @KafkaListener( + id = "manual", + topics = MANUAL_TOPIC, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY) + static class SixthTopicDefaultDLTListener { + + @Autowired + CountDownLatchContainer container; + + @KafkaHandler + public Mono listenNoDlt( + String message, + @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic, + @SuppressWarnings("unused") Acknowledgment ack) { + return Mono.fromCallable(() -> { + try { + throw new IllegalStateException("Another woooops... " + receivedTopic); + + } + catch (IllegalStateException e) { + throw e; + } + finally { + container.countDownIfNotKnown(receivedTopic, container.countDownLatch6); + } + }).then(); + } + + } + + @RetryableTopic( + attempts = "3", + numPartitions = "3", + exclude = MyDontRetryException.class, + backoff = @Backoff(delay = 50, maxDelay = 100, multiplier = 3), + traversingCauses = "true", + kafkaTemplate = "${kafka.template}") + @KafkaListener(topics = NOT_RETRYABLE_EXCEPTION_TOPIC, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY) + static class NoRetryTopicListener { + + @Autowired + CountDownLatchContainer container; + + @KafkaHandler + public Mono listenWithAnnotation2(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + return Mono.fromCallable(() -> { + try { + throw new MyDontRetryException("Annotated second woooops... " + receivedTopic); + } + catch (MyDontRetryException e) { + throw e; + } + finally { + container.countDownIfNotKnown(receivedTopic, container.countDownLatchNoRetry); + } + }).then(); + } + + @DltHandler + public void annotatedDltMethod(Object message) { + container.countDownLatchDltTwo.countDown(); + } + } + + @RetryableTopic( + attempts = "2", + backoff = @Backoff(50)) + @KafkaListener( + id = "reuseRetry1", + topics = FIRST_REUSE_RETRY_TOPIC, + containerFactory = "retryTopicListenerContainerFactory") + static class FirstReuseRetryTopicListener { + + final List topics = Collections.synchronizedList(new ArrayList<>()); + + @Autowired + CountDownLatchContainer container; + + @KafkaHandler + public Mono listen1(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.topics.add(receivedTopic); + return Mono.fromCallable(() -> { + try { + throw new RuntimeException("Another woooops... " + receivedTopic); + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatchReuseOne.countDown(); + } + }).then(); + } + + } + + @RetryableTopic( + attempts = "5", + backoff = @Backoff(delay = 30, maxDelay = 100, multiplier = 2)) + @KafkaListener( + id = "reuseRetry2", + topics = SECOND_REUSE_RETRY_TOPIC, + containerFactory = "retryTopicListenerContainerFactory") + static class SecondReuseRetryTopicListener { + + final List topics = Collections.synchronizedList(new ArrayList<>()); + + @Autowired + CountDownLatchContainer container; + + @KafkaHandler + public Mono listen2(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.topics.add(receivedTopic); + return Mono.fromCallable(() -> { + try { + throw new RuntimeException("Another woooops... " + receivedTopic); + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatchReuseTwo.countDown(); + } + }).then(); + } + + } + + @RetryableTopic(attempts = "5", backoff = @Backoff(delay = 1, maxDelay = 5, multiplier = 1.4)) + @KafkaListener(id = "reuseRetry3", topics = THIRD_REUSE_RETRY_TOPIC, + containerFactory = "retryTopicListenerContainerFactory") + static class ThirdReuseRetryTopicListener { + + final List topics = Collections.synchronizedList(new ArrayList<>()); + + @Autowired + CountDownLatchContainer container; + + @KafkaHandler + public Mono listen3(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.topics.add(receivedTopic); + return Mono.fromCallable(() -> { + try { + throw new RuntimeException("Another woooops... " + receivedTopic); + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatchReuseThree.countDown(); + } + }).then(); + } + + } + + static class CountDownLatchContainer { + + CountDownLatch countDownLatch1 = new CountDownLatch(5); + + CountDownLatch countDownLatch2 = new CountDownLatch(3); + + CountDownLatch countDownLatch3 = new CountDownLatch(3); + + CountDownLatch countDownLatch4 = new CountDownLatch(4); + + CountDownLatch countDownLatch51 = new CountDownLatch(4); + + CountDownLatch countDownLatch52 = new CountDownLatch(4); + + CountDownLatch countDownLatch6 = new CountDownLatch(4); + + CountDownLatch countDownLatchNoRetry = new CountDownLatch(1); + + CountDownLatch countDownLatchDltOne = new CountDownLatch(1); + + CountDownLatch countDownLatchDltTwo = new CountDownLatch(1); + + CountDownLatch countDownLatchDltThree = new CountDownLatch(2); + + CountDownLatch countDownLatchReuseOne = new CountDownLatch(2); + + CountDownLatch countDownLatchReuseTwo = new CountDownLatch(5); + + CountDownLatch countDownLatchReuseThree = new CountDownLatch(5); + + CountDownLatch customDltCountdownLatch = new CountDownLatch(1); + + CountDownLatch customErrorHandlerCountdownLatch = new CountDownLatch(6); + + CountDownLatch customMessageConverterCountdownLatch = new CountDownLatch(6); + + final List knownTopics = new ArrayList<>(); + + private void countDownIfNotKnown(String receivedTopic, CountDownLatch countDownLatch) { + synchronized (knownTopics) { + if (!knownTopics.contains(receivedTopic)) { + knownTopics.add(receivedTopic); + countDownLatch.countDown(); + } + } + } + } + + static class MyCustomDltProcessor { + + @Autowired + KafkaTemplate kafkaTemplate; + + @Autowired + CountDownLatchContainer container; + + public void processDltMessage(Object message) { + container.customDltCountdownLatch.countDown(); + throw new RuntimeException("Dlt Error!"); + } + } + + @SuppressWarnings("serial") + static class MyRetryException extends RuntimeException { + MyRetryException(String msg) { + super(msg); + } + } + + @SuppressWarnings("serial") + static class MyDontRetryException extends RuntimeException { + MyDontRetryException(String msg) { + super(msg); + } + } + + @Configuration + static class RetryTopicConfigurations extends RetryTopicConfigurationSupport { + + private static final String DLT_METHOD_NAME = "processDltMessage"; + + @Bean + RetryTopicConfiguration firstRetryTopic(KafkaTemplate template) { + return RetryTopicConfigurationBuilder + .newInstance() + .fixedBackOff(50) + .maxAttempts(5) + .concurrency(1) + .useSingleTopicForSameIntervals() + .includeTopic(FIRST_TOPIC) + .doNotRetryOnDltFailure() + .dltHandlerMethod("myCustomDltProcessor", DLT_METHOD_NAME) + .create(template); + } + + @Bean + RetryTopicConfiguration secondRetryTopic(KafkaTemplate template) { + return RetryTopicConfigurationBuilder + .newInstance() + .exponentialBackoff(500, 2, 10000) + .retryOn(Arrays.asList(IllegalStateException.class, IllegalAccessException.class)) + .traversingCauses() + .includeTopic(SECOND_TOPIC) + .doNotRetryOnDltFailure() + .dltHandlerMethod("myCustomDltProcessor", DLT_METHOD_NAME) + .create(template); + } + + @Bean + FirstTopicListener firstTopicListener() { + return new FirstTopicListener(); + } + + @Bean + KafkaListenerErrorHandler myCustomErrorHandler( + CountDownLatchContainer container) { + return (message, exception) -> { + container.customErrorHandlerCountdownLatch.countDown(); + throw exception; + }; + } + + @Bean + SmartMessageConverter myCustomMessageConverter( + CountDownLatchContainer container) { + return new CompositeMessageConverter(Collections.singletonList(new GenericMessageConverter())) { + + @Override + public Object fromMessage(Message message, Class targetClass, Object conversionHint) { + container.customMessageConverterCountdownLatch.countDown(); + return super.fromMessage(message, targetClass, conversionHint); + } + }; + } + + @Bean + SecondTopicListener secondTopicListener() { + return new SecondTopicListener(); + } + + @Bean + ThirdTopicListener thirdTopicListener() { + return new ThirdTopicListener(); + } + + @Bean + FourthTopicListener fourthTopicListener() { + return new FourthTopicListener(); + } + + @Bean + FifthTopicListener1 fifthTopicListener1() { + return new FifthTopicListener1(); + } + + @Bean + FifthTopicListener2 fifthTopicListener2() { + return new FifthTopicListener2(); + } + + @Bean + SixthTopicDefaultDLTListener manualTopicListener() { + return new SixthTopicDefaultDLTListener(); + } + + @Bean + NoRetryTopicListener noRetryTopicListener() { + return new NoRetryTopicListener(); + } + + @Bean + FirstReuseRetryTopicListener firstReuseRetryTopicListener() { + return new FirstReuseRetryTopicListener(); + } + + @Bean + SecondReuseRetryTopicListener secondReuseRetryTopicListener() { + return new SecondReuseRetryTopicListener(); + } + + @Bean + ThirdReuseRetryTopicListener thirdReuseRetryTopicListener() { + return new ThirdReuseRetryTopicListener(); + } + + @Bean + CountDownLatchContainer latchContainer() { + return new CountDownLatchContainer(); + } + + @Bean + MyCustomDltProcessor myCustomDltProcessor() { + return new MyCustomDltProcessor(); + } + } + + @Configuration + static class KafkaProducerConfig { + + @Autowired + EmbeddedKafkaBroker broker; + + @Bean + ProducerFactory producerFactory() { + Map props = KafkaTestUtils.producerProps( + this.broker.getBrokersAsString()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new DefaultKafkaProducerFactory<>(props); + } + + @Bean("customKafkaTemplate") + KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + } + + @EnableKafka + @Configuration + static class KafkaConsumerConfig { + + @Autowired + EmbeddedKafkaBroker broker; + + @Bean + KafkaAdmin kafkaAdmin() { + Map configs = new HashMap<>(); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker.getBrokersAsString()); + return new KafkaAdmin(configs); + } + + @Bean + NewTopic topic() { + return TopicBuilder.name(THIRD_TOPIC).partitions(2).replicas(1).build(); + } + + @Bean + NewTopics topics() { + return new NewTopics(TopicBuilder.name(FOURTH_TOPIC).partitions(2).replicas(1).build()); + } + + @Bean + ConsumerFactory consumerFactory() { + Map props = KafkaTestUtils.consumerProps( + this.broker.getBrokersAsString(), + "groupId", + "false"); + props.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + ConcurrentKafkaListenerContainerFactory retryTopicListenerContainerFactory( + ConsumerFactory consumerFactory) { + + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + ContainerProperties props = factory.getContainerProperties(); + props.setIdleEventInterval(100L); + props.setPollTimeout(50L); + props.setIdlePartitionEventInterval(100L); + factory.setConsumerFactory(consumerFactory); + factory.setConcurrency(1); + factory.setContainerCustomizer( + container -> container.getContainerProperties().setIdlePartitionEventInterval(100L)); + return factory; + } + + @Bean + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory consumerFactory) { + + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + factory.setConcurrency(1); + factory.setContainerCustomizer(container -> { + if (container.getListenerId().startsWith("manual")) { + container.getContainerProperties().setAckMode(AckMode.MANUAL); + container.getContainerProperties().setAsyncAcks(true); + } + }); + return factory; + } + + @Bean + TaskScheduler sched() { + return new ThreadPoolTaskScheduler(); + } + + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoRetryTopicScenarioTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoRetryTopicScenarioTests.java new file mode 100644 index 0000000000..ed467cf23c --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoRetryTopicScenarioTests.java @@ -0,0 +1,1185 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.retrytopic; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaHandler; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.ContainerProperties.AckMode; +import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.messaging.Message; +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.converter.GenericMessageConverter; +import org.springframework.messaging.converter.SmartMessageConverter; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import reactor.core.publisher.Mono; + +/** + * @author Sanghyeok An + * @since 3.3.0 + */ + +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka +@TestPropertySource(properties = { "five.attempts=5", "kafka.template=customKafkaTemplate"}) +public class AsyncMonoRetryTopicScenarioTests { + + private final static String MAIN_TOPIC_CONTAINER_FACTORY = "kafkaListenerContainerFactory"; + + public final static String TEST_TOPIC0 = "myRetryTopic0"; + + public final static String TEST_TOPIC1 = "myRetryTopic1"; + + public final static String TEST_TOPIC2 = "myRetryTopic2"; + + public final static String TEST_TOPIC3 = "myRetryTopic3"; + + public final static String TEST_TOPIC4 = "myRetryTopic4"; + + public final static String TEST_TOPIC5 = "myRetryTopic5"; + + public final static String TEST_TOPIC6 = "myRetryTopic6"; + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Autowired + private CountDownLatchContainer latchContainer; + + @Autowired + DestinationTopicContainer topicContainer; + + @Test + void allFailCaseTest( + @Autowired TestTopicListener0 zeroTopicListener, + @Autowired MyCustomDltProcessor myCustomDltProcessor0) { + // Given + String shortFailedMsg1 = "0"; + String shortFailedMsg2 = "1"; + String shortFailedMsg3 = "2"; + DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("0-topicId", TEST_TOPIC0); + + String expectedRetryTopic = TEST_TOPIC0 + "-retry"; + String[] expectedReceivedMsgs = { + shortFailedMsg1, + shortFailedMsg2, + shortFailedMsg3, + shortFailedMsg1, + shortFailedMsg2, + shortFailedMsg3, + shortFailedMsg1, + shortFailedMsg2, + shortFailedMsg3, + shortFailedMsg1, + shortFailedMsg2, + shortFailedMsg3, + shortFailedMsg1, + shortFailedMsg2, + shortFailedMsg3 + }; + String[] expectedReceivedTopics = { + TEST_TOPIC0, + TEST_TOPIC0, + TEST_TOPIC0, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic + }; + String[] expectedDltMsgs = { + shortFailedMsg1, + shortFailedMsg2, + shortFailedMsg3 + }; + + // When + kafkaTemplate.send(TEST_TOPIC0, shortFailedMsg1); + kafkaTemplate.send(TEST_TOPIC0, shortFailedMsg2); + kafkaTemplate.send(TEST_TOPIC0, shortFailedMsg3); + + // Then + assertThat(awaitLatch(latchContainer.countDownLatch0)).isTrue(); + assertThat(awaitLatch(latchContainer.dltCountdownLatch0)).isTrue(); + + assertThat(destinationTopic.getDestinationName()).isEqualTo(TEST_TOPIC0 + "-retry"); + + assertThat(zeroTopicListener.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); + assertThat(zeroTopicListener.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + + assertThat(myCustomDltProcessor0.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); + } + + @Test + void firstShortFailAndLastLongSuccessRetryTest( + @Autowired TestTopicListener1 testTopicListener1, + @Autowired MyCustomDltProcessor myCustomDltProcessor1) { + // Given + String longSuccessMsg = "3"; + String shortFailedMsg = "1"; + DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("1-topicId", TEST_TOPIC1); + + String expectedRetryTopic = TEST_TOPIC1 + "-retry"; + String[] expectedReceivedMsgs = { + shortFailedMsg, + longSuccessMsg, + shortFailedMsg, + shortFailedMsg, + shortFailedMsg, + shortFailedMsg + }; + + String[] expectedReceivedTopics = { + TEST_TOPIC1, + TEST_TOPIC1, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic + }; + + String[] expectedDltMsgs = { + shortFailedMsg + }; + + // When + kafkaTemplate.send(TEST_TOPIC1, shortFailedMsg); + kafkaTemplate.send(TEST_TOPIC1, longSuccessMsg); + + // Then + assertThat(awaitLatch(latchContainer.countDownLatch1)).isTrue(); + assertThat(awaitLatch(latchContainer.dltCountdownLatch1)).isTrue(); + + assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); + assertThat(testTopicListener1.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); + assertThat(testTopicListener1.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + + assertThat(myCustomDltProcessor1.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); + } + + @Test + void firstLongSuccessAndLastShortFailed( + @Autowired TestTopicListener2 zero2TopicListener, + @Autowired MyCustomDltProcessor myCustomDltProcessor2) { + // Given + String shortFailedMsg = "1"; + String longSuccessMsg = "3"; + DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("2-topicId", TEST_TOPIC2); + + String expectedRetryTopic = TEST_TOPIC2 + "-retry"; + String[] expectedReceivedMsgs = { + longSuccessMsg, + shortFailedMsg, + shortFailedMsg, + shortFailedMsg, + shortFailedMsg, + shortFailedMsg + }; + + String[] expectedReceivedTopics = { + TEST_TOPIC2, + TEST_TOPIC2, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic + }; + + String[] expectedDltMsgs = { + shortFailedMsg + }; + + // When + kafkaTemplate.send(TEST_TOPIC2, longSuccessMsg); + kafkaTemplate.send(TEST_TOPIC2, shortFailedMsg); + + // Then + assertThat(awaitLatch(latchContainer.countDownLatch2)).isTrue(); + assertThat(awaitLatch(latchContainer.dltCountdownLatch2)).isTrue(); + + assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); + assertThat(zero2TopicListener.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); + assertThat(zero2TopicListener.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + + assertThat(myCustomDltProcessor2.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); + } + + @Test + void longFailMsgTwiceThenShortSuccessMsgThird( + @Autowired TestTopicListener3 testTopicListener3, + @Autowired MyCustomDltProcessor myCustomDltProcessor3) { + // Given + DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("3-topicId", TEST_TOPIC3); + + String expectedRetryTopic = TEST_TOPIC3 + "-retry"; + String[] expectedReceivedMsgs = { + TestTopicListener3.LONG_FAIL_MSG, + TestTopicListener3.LONG_FAIL_MSG, + TestTopicListener3.SHORT_SUCCESS_MSG, + TestTopicListener3.SHORT_SUCCESS_MSG, + TestTopicListener3.SHORT_SUCCESS_MSG, + TestTopicListener3.LONG_FAIL_MSG, + TestTopicListener3.LONG_FAIL_MSG, + TestTopicListener3.LONG_FAIL_MSG, + TestTopicListener3.LONG_FAIL_MSG, + TestTopicListener3.LONG_FAIL_MSG, + TestTopicListener3.LONG_FAIL_MSG, + TestTopicListener3.LONG_FAIL_MSG, + TestTopicListener3.LONG_FAIL_MSG, + }; + + String[] expectedReceivedTopics = { + TEST_TOPIC3, + TEST_TOPIC3, + TEST_TOPIC3, + TEST_TOPIC3, + TEST_TOPIC3, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic + }; + + String[] expectedDltMsgs = { + TestTopicListener3.LONG_FAIL_MSG, + TestTopicListener3.LONG_FAIL_MSG, + }; + + // When + kafkaTemplate.send(TEST_TOPIC3, TestTopicListener3.LONG_FAIL_MSG); + kafkaTemplate.send(TEST_TOPIC3, TestTopicListener3.LONG_FAIL_MSG); + kafkaTemplate.send(TEST_TOPIC3, TestTopicListener3.SHORT_SUCCESS_MSG); + kafkaTemplate.send(TEST_TOPIC3, TestTopicListener3.SHORT_SUCCESS_MSG); + kafkaTemplate.send(TEST_TOPIC3, TestTopicListener3.SHORT_SUCCESS_MSG); + + // Then + assertThat(awaitLatch(latchContainer.countDownLatch3)).isTrue(); + assertThat(awaitLatch(latchContainer.dltCountdownLatch3)).isTrue(); + + assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); + assertThat(testTopicListener3.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); + assertThat(testTopicListener3.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + + assertThat(myCustomDltProcessor3.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); + } + + @Test + void longSuccessMsgTwiceThenShortFailMsgTwice( + @Autowired TestTopicListener4 topicListener4, + @Autowired MyCustomDltProcessor myCustomDltProcessor4) { + // Given + DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("4-TopicId", TEST_TOPIC4); + + String expectedRetryTopic = TEST_TOPIC4 + "-retry"; + String[] expectedReceivedMsgs = { + TestTopicListener4.LONG_SUCCESS_MSG, + TestTopicListener4.LONG_SUCCESS_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + }; + + String[] expectedReceivedTopics = { + TEST_TOPIC4, + TEST_TOPIC4, + TEST_TOPIC4, + TEST_TOPIC4, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + expectedRetryTopic, + }; + + String[] expectedDltMsgs = { + TestTopicListener4.SHORT_FAIL_MSG, + TestTopicListener4.SHORT_FAIL_MSG, + }; + + // When + kafkaTemplate.send(TEST_TOPIC4, TestTopicListener4.LONG_SUCCESS_MSG); + kafkaTemplate.send(TEST_TOPIC4, TestTopicListener4.LONG_SUCCESS_MSG); + kafkaTemplate.send(TEST_TOPIC4, TestTopicListener4.SHORT_FAIL_MSG); + kafkaTemplate.send(TEST_TOPIC4, TestTopicListener4.SHORT_FAIL_MSG); + + // Then + assertThat(awaitLatch(latchContainer.countDownLatch4)).isTrue(); + assertThat(awaitLatch(latchContainer.dltCountdownLatch4)).isTrue(); + + assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); + assertThat(topicListener4.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); + assertThat(topicListener4.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + + assertThat(myCustomDltProcessor4.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); + } + + @Test + void oneLongSuccessMsgBetween100ShortFailMsgs( + @Autowired TestTopicListener5 topicListener5, + @Autowired MyCustomDltProcessor myCustomDltProcessor5) { + // Given + DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("5-TopicId", TEST_TOPIC5); + + String expectedRetryTopic = TEST_TOPIC5 + "-retry"; + + String[] expectedReceivedMsgs = new String[501]; + for (int i = 0; i < 500; i++) { + expectedReceivedMsgs[i] = TestTopicListener5.SHORT_FAIL_MSG; + } + expectedReceivedMsgs[500] = TestTopicListener5.LONG_SUCCESS_MSG; + + String[] expectedReceivedTopics = new String[501]; + for (int i = 0; i < 100; i++) { + expectedReceivedTopics[i] = TEST_TOPIC5; + } + for (int i = 100; i < 500; i++) { + expectedReceivedTopics[i] = expectedRetryTopic; + } + expectedReceivedTopics[500] = TEST_TOPIC5; + + + String[] expectedDltMsgs = new String[100]; + for (int i = 0; i < 100; i++) { + expectedDltMsgs[i] = TestTopicListener5.SHORT_FAIL_MSG; + } + + // When + for (int i = 0; i < 100; i++) { + kafkaTemplate.send(TEST_TOPIC5, TestTopicListener5.SHORT_FAIL_MSG); + if (i == 50) { + kafkaTemplate.send(TEST_TOPIC5, TestTopicListener5.LONG_SUCCESS_MSG); + } + } + + // Then + assertThat(awaitLatch(latchContainer.countDownLatch5)).isTrue(); + assertThat(awaitLatch(latchContainer.dltCountdownLatch5)).isTrue(); + + assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); + assertThat(topicListener5.receivedMsgs).containsExactlyInAnyOrder(expectedReceivedMsgs); + assertThat(topicListener5.receivedTopics).containsExactlyInAnyOrder(expectedReceivedTopics); + + assertThat(myCustomDltProcessor5.receivedMsg).containsExactlyInAnyOrder(expectedDltMsgs); + } + + @Test + void halfSuccessMsgAndHalfFailedMsgWithRandomSleepTime( + @Autowired TestTopicListener6 topicListener6, + @Autowired MyCustomDltProcessor myCustomDltProcessor6) { + // Given + DestinationTopic destinationTopic = topicContainer.getNextDestinationTopicFor("6-TopicId", TEST_TOPIC6); + + String expectedRetryTopic = TEST_TOPIC6 + "-retry"; + + Random random = new Random(); + ConcurrentLinkedQueue q = new ConcurrentLinkedQueue<>(); + + for (int i = 0; i < 50; i++) { + int randomSleepAWhile = random.nextInt(1, 100); + String msg = String.valueOf(randomSleepAWhile) + TestTopicListener6.SUCCESS_SUFFIX; + q.add(msg); + } + + for (int i = 0; i < 50; i++) { + int randomSleepAWhile = random.nextInt(1, 100); + String msg = String.valueOf(randomSleepAWhile) + TestTopicListener6.FAIL_SUFFIX; + q.add(msg); + } + + int expectedSuccessMsgCount = 50; + int expectedFailedMsgCount = 250; + + int expectedReceivedOriginalTopicCount = 100; + int expectedReceivedRetryTopicCount = 200; + int expectedReceivedDltMsgCount = 50; + + + // When + while (!q.isEmpty()) { + String successOrFailMsg = q.poll(); + kafkaTemplate.send(TEST_TOPIC6, successOrFailMsg); + } + + // Then + assertThat(awaitLatch(latchContainer.countDownLatch6)).isTrue(); + assertThat(awaitLatch(latchContainer.dltCountdownLatch6)).isTrue(); + + assertThat(destinationTopic.getDestinationName()).isEqualTo(expectedRetryTopic); + + long actualReceivedSuccessMsgCount = topicListener6.receivedMsgs.stream() + .map(s -> s.split(",")[1]) + .filter(m -> (',' + m).equals(TestTopicListener6.SUCCESS_SUFFIX)) + .count(); + + long actualReceivedFailedMsgCount = topicListener6.receivedMsgs.stream() + .map(s -> s.split(",")[1]) + .filter(m -> (',' + m).equals( + TestTopicListener6.FAIL_SUFFIX)) + .count(); + + + long actualReceivedOriginalTopicMsgCount = topicListener6.receivedTopics.stream() + .filter(topic -> topic.equals(TEST_TOPIC6)) + .count(); + + long actualReceivedRetryTopicMsgCount = topicListener6.receivedTopics.stream() + .filter(topic -> topic.equals(expectedRetryTopic)) + .count(); + + assertThat(actualReceivedSuccessMsgCount).isEqualTo(expectedSuccessMsgCount); + assertThat(actualReceivedFailedMsgCount).isEqualTo(expectedFailedMsgCount); + assertThat(actualReceivedOriginalTopicMsgCount).isEqualTo(expectedReceivedOriginalTopicCount); + assertThat(actualReceivedRetryTopicMsgCount).isEqualTo(expectedReceivedRetryTopicCount); + + assertThat(myCustomDltProcessor6.receivedMsg.size()).isEqualTo(expectedReceivedDltMsgCount); + } + + private boolean awaitLatch(CountDownLatch latch) { + try { + return latch.await(60, TimeUnit.SECONDS); + } + catch (Exception e) { + fail(e.getMessage()); + throw new RuntimeException(e); + } + } + + @KafkaListener( + id = "0-topicId", + topics = TEST_TOPIC0, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY, + errorHandler = "myCustomErrorHandler", + contentTypeConverter = "myCustomMessageConverter", + concurrency = "2") + static class TestTopicListener0 { + + @Autowired + CountDownLatchContainer container; + + private final List receivedMsgs = new ArrayList<>(); + + private final List receivedTopics = new ArrayList<>(); + + @KafkaHandler + public Mono listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.receivedMsgs.add(message); + this.receivedTopics.add(receivedTopic); + return Mono.fromCallable(() -> { + try { + throw new RuntimeException("Woooops... in topic " + receivedTopic); + } + catch (Exception e) { + throw e; + } + finally { + container.countDownLatch0.countDown(); + } + }).then(); + } + + } + + @KafkaListener( + id = "1-topicId", + topics = TEST_TOPIC1, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY, + errorHandler = "myCustomErrorHandler", + contentTypeConverter = "myCustomMessageConverter", + concurrency = "2") + static class TestTopicListener1 { + + @Autowired + CountDownLatchContainer container; + + private final List receivedMsgs = new ArrayList<>(); + + private final List receivedTopics = new ArrayList<>(); + + @KafkaHandler + public Mono listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.receivedMsgs.add(message); + this.receivedTopics.add(receivedTopic); + + return Mono.fromCallable(() -> { + try { + Thread.sleep(Integer.parseInt(message)); + if (message.equals("1")) { + throw new RuntimeException("Woooops... in topic " + receivedTopic); + } + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatch1.countDown(); + } + + return "Task Completed"; + }); + + } + } + + @KafkaListener( + id = "2-topicId", + topics = TEST_TOPIC2, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY, + errorHandler = "myCustomErrorHandler", + contentTypeConverter = "myCustomMessageConverter", + concurrency = "2") + static class TestTopicListener2 { + + @Autowired + CountDownLatchContainer container; + + protected final List receivedMsgs = new ArrayList<>(); + + private final List receivedTopics = new ArrayList<>(); + + @KafkaHandler + public Mono listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.receivedMsgs.add(message); + this.receivedTopics.add(receivedTopic); + + return Mono.fromCallable(() -> { + try { + Thread.sleep(Integer.parseInt(message)); + if (message.equals("1")) { + throw new RuntimeException("Woooops... in topic " + receivedTopic); + } + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatch2.countDown(); + } + + return "Task Completed"; + }); + } + } + + @KafkaListener( + id = "3-topicId", + topics = TEST_TOPIC3, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY, + errorHandler = "myCustomErrorHandler", + contentTypeConverter = "myCustomMessageConverter", + concurrency = "2") + static class TestTopicListener3 { + + @Autowired + CountDownLatchContainer container; + + protected final List receivedMsgs = new ArrayList<>(); + + private final List receivedTopics = new ArrayList<>(); + + public static final String LONG_FAIL_MSG = "100"; + + public static final String SHORT_SUCCESS_MSG = "1"; + + @KafkaHandler + public Mono listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.receivedMsgs.add(message); + this.receivedTopics.add(receivedTopic); + + return Mono.fromCallable(() -> { + try { + Thread.sleep(Integer.parseInt(message)); + if (message.equals(LONG_FAIL_MSG)) { + throw new RuntimeException("Woooops... in topic " + receivedTopic); + } + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatch3.countDown(); + } + + return "Task Completed"; + }); + + + } + } + + @KafkaListener( + id = "4-TopicId", + topics = TEST_TOPIC4, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY, + errorHandler = "myCustomErrorHandler", + contentTypeConverter = "myCustomMessageConverter", + concurrency = "2") + static class TestTopicListener4 { + + @Autowired + CountDownLatchContainer container; + + protected final List receivedMsgs = new ArrayList<>(); + + private final List receivedTopics = new ArrayList<>(); + + public static final String LONG_SUCCESS_MSG = "100"; + + public static final String SHORT_FAIL_MSG = "1"; + + @KafkaHandler + public Mono listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.receivedMsgs.add(message); + this.receivedTopics.add(receivedTopic); + + return Mono.fromCallable(() -> { + try { + Thread.sleep(Integer.parseInt(message)); + if (message.equals(SHORT_FAIL_MSG)) { + throw new RuntimeException("Woooops... in topic " + receivedTopic); + } + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatch4.countDown(); + } + + return "Task Completed"; + }); + + } + } + + @KafkaListener( + id = "5-TopicId", + topics = TEST_TOPIC5, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY, + errorHandler = "myCustomErrorHandler", + contentTypeConverter = "myCustomMessageConverter", + concurrency = "2") + static class TestTopicListener5 { + + @Autowired + CountDownLatchContainer container; + + protected final List receivedMsgs = new ArrayList<>(); + + private final List receivedTopics = new ArrayList<>(); + + public static final String LONG_SUCCESS_MSG = "100"; + + public static final String SHORT_FAIL_MSG = "1"; + + @KafkaHandler + public Mono listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.receivedMsgs.add(message); + this.receivedTopics.add(receivedTopic); + + return Mono.fromCallable(() -> { + try { + Thread.sleep(Integer.parseInt(message)); + if (message.equals(SHORT_FAIL_MSG)) { + throw new RuntimeException("Woooops... in topic " + receivedTopic); + } + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatch5.countDown(); + } + + return "Task Completed"; + }); + } + } + + @KafkaListener( + id = "6-TopicId", + topics = TEST_TOPIC6, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY, + errorHandler = "myCustomErrorHandler", + contentTypeConverter = "myCustomMessageConverter", + concurrency = "2") + static class TestTopicListener6 { + + @Autowired + CountDownLatchContainer container; + + protected final List receivedMsgs = new ArrayList<>(); + + private final List receivedTopics = new ArrayList<>(); + + public static final String SUCCESS_SUFFIX = ",s"; + + public static final String FAIL_SUFFIX = ",f"; + + @KafkaHandler + public Mono listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + this.receivedMsgs.add(message); + this.receivedTopics.add(receivedTopic); + + return Mono.fromCallable(() -> { + String[] split = message.split(","); + String sleepAWhile = split[0]; + String failOrSuccess = split[1]; + + try { + Thread.sleep(Integer.parseInt(sleepAWhile)); + if (failOrSuccess.equals("f")) { + throw new RuntimeException("Woooops... in topic " + receivedTopic); + } + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + catch (RuntimeException e) { + throw e; + } + finally { + container.countDownLatch6.countDown(); + } + + return "Task Completed"; + }); + } + } + + static class CountDownLatchContainer { + + static int COUNT0 = 15; + + static int DLT_COUNT0 = 3; + + CountDownLatch countDownLatch0 = new CountDownLatch(COUNT0); + + CountDownLatch dltCountdownLatch0 = new CountDownLatch(DLT_COUNT0); + + static int COUNT1 = 6; + + static int DLT_COUNT1 = 1; + + CountDownLatch countDownLatch1 = new CountDownLatch(COUNT1); + + CountDownLatch dltCountdownLatch1 = new CountDownLatch(DLT_COUNT1); + + static int COUNT2 = 6; + + static int DLT_COUNT2 = 1; + + CountDownLatch countDownLatch2 = new CountDownLatch(COUNT2); + + CountDownLatch dltCountdownLatch2 = new CountDownLatch(DLT_COUNT2); + + static int COUNT3 = 13; + + static int DLT_COUNT3 = 2; + + CountDownLatch countDownLatch3 = new CountDownLatch(COUNT3); + + CountDownLatch dltCountdownLatch3 = new CountDownLatch(DLT_COUNT3); + + static int COUNT4 = 12; + + static int DLT_COUNT4 = 2; + + CountDownLatch countDownLatch4 = new CountDownLatch(COUNT4); + + CountDownLatch dltCountdownLatch4 = new CountDownLatch(DLT_COUNT4); + + static int COUNT5 = 501; + + static int DLT_COUNT5 = 100; + + CountDownLatch countDownLatch5 = new CountDownLatch(COUNT5); + + CountDownLatch dltCountdownLatch5 = new CountDownLatch(DLT_COUNT5); + + static int COUNT6 = 250; + + static int DLT_COUNT6 = 50; + + CountDownLatch countDownLatch6 = new CountDownLatch(COUNT6); + + CountDownLatch dltCountdownLatch6 = new CountDownLatch(DLT_COUNT6); + + } + + static class MyCustomDltProcessor { + + final List receivedMsg = new ArrayList<>(); + + MyCustomDltProcessor(KafkaTemplate kafkaTemplate, + CountDownLatch latch) { + this.kafkaTemplate = kafkaTemplate; + this.latch = latch; + } + + private final KafkaTemplate kafkaTemplate; + + private final CountDownLatch latch; + + public void processDltMessage(String message) { + this.receivedMsg.add(message); + latch.countDown(); + } + } + + @Configuration + static class RetryTopicConfigurations extends RetryTopicConfigurationSupport { + + private static final String DLT_METHOD_NAME = "processDltMessage"; + + static RetryTopicConfiguration createRetryTopicConfiguration(KafkaTemplate template, + String topicName, + String dltBeanName) { + return RetryTopicConfigurationBuilder + .newInstance() + .fixedBackOff(50) + .maxAttempts(5) + .concurrency(1) + .useSingleTopicForSameIntervals() + .includeTopic(topicName) + .doNotRetryOnDltFailure() + .dltHandlerMethod(dltBeanName, DLT_METHOD_NAME) + .create(template); + } + + @Bean + RetryTopicConfiguration testRetryTopic0(KafkaTemplate template) { + return createRetryTopicConfiguration( + template, + TEST_TOPIC0, + "myCustomDltProcessor0"); + } + + @Bean + RetryTopicConfiguration testRetryTopic1(KafkaTemplate template) { + return createRetryTopicConfiguration( + template, + TEST_TOPIC1, + "myCustomDltProcessor1"); + } + + @Bean + RetryTopicConfiguration testRetryTopic2(KafkaTemplate template) { + return createRetryTopicConfiguration( + template, + TEST_TOPIC2, + "myCustomDltProcessor2"); + } + + @Bean + RetryTopicConfiguration testRetryTopic3(KafkaTemplate template) { + return createRetryTopicConfiguration( + template, + TEST_TOPIC3, + "myCustomDltProcessor3"); + } + + @Bean + RetryTopicConfiguration testRetryTopic4(KafkaTemplate template) { + return createRetryTopicConfiguration( + template, + TEST_TOPIC4, + "myCustomDltProcessor4"); + } + + @Bean + RetryTopicConfiguration testRetryTopic5(KafkaTemplate template) { + return createRetryTopicConfiguration( + template, + TEST_TOPIC5, + "myCustomDltProcessor5"); + } + + @Bean + RetryTopicConfiguration testRetryTopic6(KafkaTemplate template) { + return createRetryTopicConfiguration( + template, + TEST_TOPIC6, + "myCustomDltProcessor6"); + } + + @Bean + KafkaListenerErrorHandler myCustomErrorHandler( + CountDownLatchContainer container) { + return (message, exception) -> { + throw exception; + }; + } + + @Bean + SmartMessageConverter myCustomMessageConverter( + CountDownLatchContainer container) { + return new CompositeMessageConverter(Collections.singletonList(new GenericMessageConverter())) { + + @Override + public Object fromMessage(Message message, Class targetClass, Object conversionHint) { + return super.fromMessage(message, targetClass, conversionHint); + } + }; + } + + @Bean + CountDownLatchContainer latchContainer() { + return new CountDownLatchContainer(); + } + + @Bean + TestTopicListener0 testTopicListener0() { + return new TestTopicListener0(); + } + + @Bean + TestTopicListener1 testTopicListener1() { + return new TestTopicListener1(); + } + + @Bean + TestTopicListener2 testTopicListener2() { + return new TestTopicListener2(); + } + + @Bean + TestTopicListener3 testTopicListener3() { + return new TestTopicListener3(); + } + + @Bean + TestTopicListener4 testTopicListener4() { + return new TestTopicListener4(); + } + + @Bean + TestTopicListener5 testTopicListener5() { + return new TestTopicListener5(); + } + + @Bean + TestTopicListener6 testTopicListener6() { + return new TestTopicListener6(); + } + + @Bean + MyCustomDltProcessor myCustomDltProcessor0( + KafkaTemplate kafkaTemplate, + CountDownLatchContainer latchContainer) { + return new MyCustomDltProcessor(kafkaTemplate, + latchContainer.dltCountdownLatch0); + } + + @Bean + MyCustomDltProcessor myCustomDltProcessor1( + KafkaTemplate kafkaTemplate, + CountDownLatchContainer latchContainer) { + return new MyCustomDltProcessor(kafkaTemplate, + latchContainer.dltCountdownLatch1); + } + + @Bean + MyCustomDltProcessor myCustomDltProcessor2( + KafkaTemplate kafkaTemplate, + CountDownLatchContainer latchContainer) { + return new MyCustomDltProcessor(kafkaTemplate, + latchContainer.dltCountdownLatch2); + } + + @Bean + MyCustomDltProcessor myCustomDltProcessor3( + KafkaTemplate kafkaTemplate, + CountDownLatchContainer latchContainer) { + return new MyCustomDltProcessor(kafkaTemplate, + latchContainer.dltCountdownLatch3); + } + + @Bean + MyCustomDltProcessor myCustomDltProcessor4( + KafkaTemplate kafkaTemplate, + CountDownLatchContainer latchContainer) { + return new MyCustomDltProcessor(kafkaTemplate, + latchContainer.dltCountdownLatch4); + } + + @Bean + MyCustomDltProcessor myCustomDltProcessor5( + KafkaTemplate kafkaTemplate, + CountDownLatchContainer latchContainer) { + return new MyCustomDltProcessor(kafkaTemplate, + latchContainer.dltCountdownLatch5); + } + + @Bean + MyCustomDltProcessor myCustomDltProcessor6( + KafkaTemplate kafkaTemplate, + CountDownLatchContainer latchContainer) { + return new MyCustomDltProcessor(kafkaTemplate, + latchContainer.dltCountdownLatch6); + } + + } + + @Configuration + static class KafkaProducerConfig { + + @Autowired + EmbeddedKafkaBroker broker; + + @Bean + ProducerFactory producerFactory() { + Map props = KafkaTestUtils.producerProps( + this.broker.getBrokersAsString()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new DefaultKafkaProducerFactory<>(props); + } + + @Bean("customKafkaTemplate") + KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + } + + @EnableKafka + @Configuration + static class KafkaConsumerConfig { + + @Autowired + EmbeddedKafkaBroker broker; + + @Bean + ConsumerFactory consumerFactory() { + Map props = KafkaTestUtils.consumerProps( + this.broker.getBrokersAsString(), + "groupId", + "false"); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + ConcurrentKafkaListenerContainerFactory retryTopicListenerContainerFactory( + ConsumerFactory consumerFactory) { + + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + ContainerProperties props = factory.getContainerProperties(); + props.setIdleEventInterval(100L); + props.setPollTimeout(50L); + props.setIdlePartitionEventInterval(100L); + factory.setConsumerFactory(consumerFactory); + factory.setConcurrency(1); + factory.setContainerCustomizer( + container -> container.getContainerProperties().setIdlePartitionEventInterval(100L)); + return factory; + } + + @Bean + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory consumerFactory) { + + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + factory.setConcurrency(1); + factory.setContainerCustomizer(container -> { + if (container.getListenerId().startsWith("manual")) { + container.getContainerProperties().setAckMode(AckMode.MANUAL); + container.getContainerProperties().setAsyncAcks(true); + } + }); + return factory; + } + + @Bean + TaskScheduler sched() { + return new ThreadPoolTaskScheduler(); + } + + } + +}