diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarConsumerErrorHandlerTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarConsumerErrorHandlerTests.java index 49865975..f041ace1 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarConsumerErrorHandlerTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarConsumerErrorHandlerTests.java @@ -23,13 +23,14 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -37,11 +38,13 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.junit.jupiter.api.Test; +import org.mockito.stubbing.Answer; import org.springframework.core.log.LogAccessor; import org.springframework.pulsar.core.DefaultPulsarConsumerFactory; import org.springframework.pulsar.core.DefaultPulsarProducerFactory; import org.springframework.pulsar.core.PulsarOperations; +import org.springframework.pulsar.core.PulsarOperations.SendMessageBuilder; import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.pulsar.core.TypedMessageBuilderCustomizer; import org.springframework.pulsar.test.support.PulsarTestContainerSupport; @@ -49,404 +52,227 @@ /** * @author Soby Chacko + * @author Sauhard Sharma */ public class DefaultPulsarConsumerErrorHandlerTests implements PulsarTestContainerSupport { private final LogAccessor logger = new LogAccessor(this.getClass()); @Test - @SuppressWarnings("unchecked") - void happyPathErrorHandlingForRecordMessageListener() throws Exception { - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) - .build(); - DefaultPulsarConsumerFactory pulsarConsumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, - List.of((consumerBuilder) -> { - consumerBuilder.topic("default-error-handler-tests-1"); - consumerBuilder.subscriptionName("default-error-handler-tests-sub-1"); - })); - - PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties(); - PulsarRecordMessageListener messageListener = mock(PulsarRecordMessageListener.class); - - doAnswer(invocation -> { - throw new RuntimeException(); - }).when(messageListener).received(any(Consumer.class), any(Message.class)); - - pulsarContainerProperties.setMessageListener(messageListener); - pulsarContainerProperties.setSchema(Schema.STRING); - - DefaultPulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, - "default-error-handler-tests-1"); - PulsarTemplate pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory); - PulsarTemplate mockPulsarTemplate = mock(PulsarTemplate.class, RETURNS_DEEP_STUBS); - - DefaultPulsarMessageListenerContainer container = new DefaultPulsarMessageListenerContainer<>( - pulsarConsumerFactory, pulsarContainerProperties); - container.setPulsarConsumerErrorHandler(new DefaultPulsarConsumerErrorHandler<>( - new PulsarDeadLetterPublishingRecoverer<>(mockPulsarTemplate), new FixedBackOff(100, 10))); - container.start(); - - pulsarTemplate.sendAsync("hello john doe"); + void whenHappyPathErrorHandlingForRecordMessageListenerThenSendToDlt() throws Exception { + var topicName = "default-error-handler-tests-3"; + var pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build(); - PulsarOperations.SendMessageBuilder sendMessageBuilderMock = mock( - PulsarOperations.SendMessageBuilder.class); + PulsarOperations.SendMessageBuilder sendMessageBuilderMock = mock(); + var mockPulsarTemplate = createMockPulsarTemplate(sendMessageBuilderMock); + var container = buildPulsarContainer(pulsarClient, (invocation) -> { + throw new RuntimeException(); + }, topicName, -1, false); - when(mockPulsarTemplate.newMessage("hello john doe") - .withTopic(any(String.class)) - .withMessageCustomizer(any(TypedMessageBuilderCustomizer.class))).thenReturn(sendMessageBuilderMock); + container.setPulsarConsumerErrorHandler(createErrorHandler(mockPulsarTemplate, 10)); - await().atMost(Duration.ofSeconds(10)) - .untilAsserted(() -> verify(messageListener, times(11)).received(any(Consumer.class), any(Message.class))); - await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> verify(sendMessageBuilderMock).sendAsync()); + try { + container.start(); + sendMessages(pulsarClient, topicName, List.of(0)); - container.stop(); - pulsarClient.close(); + // Calls to listener - 1 initial + 10 retries + // Calls to DLT producer - 1 message full failure + verifyContainerBehavior(container, sendMessageBuilderMock, false, 11, 1); + } + finally { + safeStopContainer(container); + pulsarClient.close(); + } } @Test - @SuppressWarnings("unchecked") - void errorHandlingForRecordMessageListenerWithTransientError() throws Exception { - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) - .build(); - DefaultPulsarConsumerFactory pulsarConsumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, - List.of((consumerBuilder) -> { - consumerBuilder.topic("default-error-handler-tests-2"); - consumerBuilder.subscriptionName("default-error-handler-tests-sub-2"); - })); - - PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties(); - PulsarRecordMessageListener messageListener = mock(PulsarRecordMessageListener.class); + void whenErrorHandlingForRecordMessageListenerWithTransientErrorThenDontSendToDlt() throws Exception { + var topicName = "default-error-handler-tests-3"; + var pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build(); + + PulsarOperations.SendMessageBuilder sendMessageBuilderMock = mock(); + var mockPulsarTemplate = createMockPulsarTemplate(sendMessageBuilderMock); AtomicInteger count = new AtomicInteger(0); - doAnswer(invocation -> { + var container = buildPulsarContainer(pulsarClient, (invocation) -> { int currentCount = count.incrementAndGet(); if (currentCount <= 3) { throw new RuntimeException(); } return new Object(); - }).when(messageListener).received(any(Consumer.class), any(Message.class)); + }, topicName, -1, false); + container.setPulsarConsumerErrorHandler(createErrorHandler(mockPulsarTemplate, 5)); - pulsarContainerProperties.setMessageListener(messageListener); - pulsarContainerProperties.setSchema(Schema.STRING); - - DefaultPulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, - "default-error-handler-tests-2"); - PulsarTemplate pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory); - PulsarTemplate mockPulsarTemplate = mock(PulsarTemplate.class); - - DefaultPulsarMessageListenerContainer container = new DefaultPulsarMessageListenerContainer<>( - pulsarConsumerFactory, pulsarContainerProperties); - container.setPulsarConsumerErrorHandler(new DefaultPulsarConsumerErrorHandler<>( - new PulsarDeadLetterPublishingRecoverer<>(mockPulsarTemplate), new FixedBackOff(100, 10))); - container.start(); - - pulsarTemplate.sendAsync("hello john doe"); - - await().atMost(Duration.ofSeconds(10)) - .untilAsserted(() -> verify(messageListener, times(4)).received(any(Consumer.class), any(Message.class))); - verifyNoInteractions(mockPulsarTemplate); + try { + container.start(); + sendMessages(pulsarClient, topicName, List.of(0)); - container.stop(); - pulsarClient.close(); + // Calls to listener - 1 initial + 2 retries + 1 final + // Calls to DLT producer - 0 full failures + verifyContainerBehavior(container, sendMessageBuilderMock, false, 4, 0); + } + finally { + safeStopContainer(container); + pulsarClient.close(); + } } @Test - @SuppressWarnings("unchecked") - void everyOtherRecordThrowsNonTransientExceptionsRecordMessageListener() throws Exception { - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) - .build(); - DefaultPulsarConsumerFactory pulsarConsumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, - List.of((consumerBuilder) -> { - consumerBuilder.topic("default-error-handler-tests-3"); - consumerBuilder.subscriptionName("default-error-handler-tests-sub-3"); - })); - - PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties(); - PulsarRecordMessageListener messageListener = mock(PulsarRecordMessageListener.class); - doAnswer(invocation -> { + void whenEveryOtherRecordThrowsNonTransientExceptionsRecordMessageListenerThenSendAllFailedMessagesToDlt() + throws Exception { + var topicName = "default-error-handler-tests-3"; + var pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build(); + + PulsarOperations.SendMessageBuilder sendMessageBuilderMock = mock(); + var mockPulsarTemplate = createMockPulsarTemplate(sendMessageBuilderMock); + var container = buildPulsarContainer(pulsarClient, (invocation) -> { Message message = invocation.getArgument(1); - Integer value = message.getValue(); - if (value % 2 == 0) { + if (message.getValue() % 2 == 0) { throw new RuntimeException(); } return new Object(); - }).when(messageListener).received(any(Consumer.class), any(Message.class)); + }, topicName, -1, false); + container.setPulsarConsumerErrorHandler(createErrorHandler(mockPulsarTemplate, 5)); - pulsarContainerProperties.setMessageListener(messageListener); - pulsarContainerProperties.setSchema(Schema.INT32); - - DefaultPulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, - "default-error-handler-tests-3"); - PulsarTemplate pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory); - PulsarTemplate mockPulsarTemplate = mock(PulsarTemplate.class, RETURNS_DEEP_STUBS); - - DefaultPulsarMessageListenerContainer container = new DefaultPulsarMessageListenerContainer<>( - pulsarConsumerFactory, pulsarContainerProperties); - container.setPulsarConsumerErrorHandler(new DefaultPulsarConsumerErrorHandler<>( - new PulsarDeadLetterPublishingRecoverer<>(mockPulsarTemplate), new FixedBackOff(100, 5))); - container.start(); + try { + container.start(); + sendMessages(pulsarClient, topicName, IntStream.range(0, 10).boxed().collect(Collectors.toList())); - for (int i = 0; i < 10; i++) { - pulsarTemplate.sendAsync(i); + // Calls to listener - 5 records fail - 5 * (1 + 5 max retry) = 30 + 5 records + // don't fail = 35 + // Calls to DLT producer - 5 messages full failures + verifyContainerBehavior(container, sendMessageBuilderMock, false, 35, 5); + } + finally { + safeStopContainer(container); + pulsarClient.close(); } - - PulsarOperations.SendMessageBuilder sendMessageBuilderMock = mock( - PulsarOperations.SendMessageBuilder.class); - - when(mockPulsarTemplate.newMessage(any(Integer.class)) - .withTopic(any(String.class)) - .withMessageCustomizer(any(TypedMessageBuilderCustomizer.class))).thenReturn(sendMessageBuilderMock); - - // 5 records fail - 5 * (1 + 5 max retry) = 30 + 5 records don't fail = 35 - await().atMost(Duration.ofSeconds(30)) - .untilAsserted(() -> verify(messageListener, times(35)).received(any(Consumer.class), any(Message.class))); - await().atMost(Duration.ofSeconds(30)) - .untilAsserted(() -> verify(sendMessageBuilderMock, times(5)).sendAsync()); - - container.stop(); - pulsarClient.close(); } @Test - @SuppressWarnings("unchecked") - void batchRecordListenerFirstOneOnlyErrorAndRecover() throws Exception { - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) - .build(); - DefaultPulsarConsumerFactory pulsarConsumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, - List.of((consumerBuilder) -> { - consumerBuilder.topic("default-error-handler-tests-4"); - consumerBuilder.subscriptionName("default-error-handler-tests-sub-4"); - })); - - PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties(); - pulsarContainerProperties.setMaxNumMessages(10); - pulsarContainerProperties.setBatchTimeoutMillis(60_000); - pulsarContainerProperties.setBatchListener(true); - PulsarBatchAcknowledgingMessageListener pulsarBatchMessageListener = mock( - PulsarBatchAcknowledgingMessageListener.class); + void whenBatchRecordListenerFirstOneOnlyErrorAndRecoverThenSendToDlt() throws Exception { + var topicName = "default-error-handler-tests-4"; + var pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build(); - doAnswer(invocation -> { - List> message = invocation.getArgument(1); - Message integerMessage = message.get(0); - Integer value = integerMessage.getValue(); - if (value == 0) { - throw new PulsarBatchListenerFailedException("failed", integerMessage); + PulsarOperations.SendMessageBuilder sendMessageBuilderMock = mock(); + var mockPulsarTemplate = createMockPulsarTemplate(sendMessageBuilderMock); + var container = buildPulsarContainer(pulsarClient, (invocation) -> { + List> messages = invocation.getArgument(1); + Message firstMessage = messages.get(0); + if (firstMessage.getValue() == 0) { + throw new PulsarBatchListenerFailedException("failed", firstMessage); } Acknowledgement acknowledgment = invocation.getArgument(2); - List messageIds = new ArrayList<>(); - for (Message integerMessage1 : message) { - messageIds.add(integerMessage1.getMessageId()); - } + List messageIds = messages.stream().map(Message::getMessageId).collect(Collectors.toList()); acknowledgment.acknowledge(messageIds); return new Object(); - }).when(pulsarBatchMessageListener).received(any(Consumer.class), any(List.class), any(Acknowledgement.class)); - - pulsarContainerProperties.setMessageListener(pulsarBatchMessageListener); - pulsarContainerProperties.setSchema(Schema.INT32); - pulsarContainerProperties.setAckMode(AckMode.MANUAL); - DefaultPulsarMessageListenerContainer container = new DefaultPulsarMessageListenerContainer<>( - pulsarConsumerFactory, pulsarContainerProperties); - PulsarTemplate mockPulsarTemplate = mock(PulsarTemplate.class, RETURNS_DEEP_STUBS); + }, topicName, 10, true); + container.setPulsarConsumerErrorHandler(createErrorHandler(mockPulsarTemplate, 10)); - container.setPulsarConsumerErrorHandler(new DefaultPulsarConsumerErrorHandler<>( - new PulsarDeadLetterPublishingRecoverer<>(mockPulsarTemplate), new FixedBackOff(100, 10))); - - container.start(); + try { + container.start(); + sendMessages(pulsarClient, topicName, IntStream.range(0, 10).boxed().collect(Collectors.toList())); - DefaultPulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, - "default-error-handler-tests-4"); - PulsarTemplate pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory); - for (int i = 0; i < 10; i++) { - pulsarTemplate.sendAsync(i); + // Calls to listener - 1 initial + 10 retries for failure + 1 final for rest + // of the batch = 12 calls + // Calls to DLT producer - 1 message full failure + verifyContainerBehavior(container, sendMessageBuilderMock, true, 12, 1); + } + finally { + safeStopContainer(container); + pulsarClient.close(); } - - PulsarOperations.SendMessageBuilder sendMessageBuilderMock = mock( - PulsarOperations.SendMessageBuilder.class); - - when(mockPulsarTemplate.newMessage(any(Integer.class)) - .withTopic(any(String.class)) - .withMessageCustomizer(any(TypedMessageBuilderCustomizer.class))).thenReturn(sendMessageBuilderMock); - - // 1 + 10 + 1 = 12 calls altogether - await().atMost(Duration.ofSeconds(30)) - .untilAsserted(() -> verify(pulsarBatchMessageListener, times(12)).received(any(Consumer.class), - any(List.class), any(Acknowledgement.class))); - await().atMost(Duration.ofSeconds(30)) - .untilAsserted(() -> verify(sendMessageBuilderMock, times(1)).sendAsync()); - - container.stop(); - pulsarClient.close(); } @Test - @SuppressWarnings("unchecked") - void batchRecordListenerRecordFailsInTheMiddle() throws Exception { - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) - .build(); - DefaultPulsarConsumerFactory pulsarConsumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, - List.of((consumerBuilder) -> { - consumerBuilder.topic("default-error-handler-tests-5"); - consumerBuilder.subscriptionName("default-error-handler-tests-sub-5"); - })); - - PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties(); - pulsarContainerProperties.setMaxNumMessages(10); - pulsarContainerProperties.setBatchTimeoutMillis(60_000); - pulsarContainerProperties.setBatchListener(true); - PulsarBatchAcknowledgingMessageListener pulsarBatchMessageListener = mock( - PulsarBatchAcknowledgingMessageListener.class); + void whenBatchRecordListenerRecordFailsInTheMiddleThenSendToDlt() throws Exception { + var topicName = "default-error-handler-tests-5"; + var pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build(); - doAnswer(invocation -> { + PulsarOperations.SendMessageBuilder sendMessageBuilderMock = mock(); + var mockPulsarTemplate = createMockPulsarTemplate(sendMessageBuilderMock); + var container = buildPulsarContainer(pulsarClient, (invocation) -> { List> messages = invocation.getArgument(1); - + Acknowledgement acknowledgment = invocation.getArgument(2); for (Message message : messages) { if (message.getValue() == 5) { throw new PulsarBatchListenerFailedException("failed", message); } else { - Acknowledgement acknowledgment = invocation.getArgument(2); acknowledgment.acknowledge(message.getMessageId()); } } return new Object(); - }).when(pulsarBatchMessageListener).received(any(Consumer.class), any(List.class), any(Acknowledgement.class)); - - pulsarContainerProperties.setMessageListener(pulsarBatchMessageListener); - pulsarContainerProperties.setSchema(Schema.INT32); - pulsarContainerProperties.setAckMode(AckMode.MANUAL); - DefaultPulsarMessageListenerContainer container = new DefaultPulsarMessageListenerContainer<>( - pulsarConsumerFactory, pulsarContainerProperties); - PulsarTemplate mockPulsarTemplate = mock(PulsarTemplate.class, RETURNS_DEEP_STUBS); + }, topicName, 10, true); + container.setPulsarConsumerErrorHandler(createErrorHandler(mockPulsarTemplate, 10)); - container.setPulsarConsumerErrorHandler(new DefaultPulsarConsumerErrorHandler<>( - new PulsarDeadLetterPublishingRecoverer<>(mockPulsarTemplate), new FixedBackOff(100, 10))); - - container.start(); + try { + container.start(); + sendMessages(pulsarClient, topicName, IntStream.range(0, 10).boxed().collect(Collectors.toList())); - DefaultPulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, - "default-error-handler-tests-5"); - PulsarTemplate pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory); - for (int i = 0; i < 10; i++) { - pulsarTemplate.sendAsync(i); + // Calls to listener - 1 initial + 10 retries for failure + 1 final for rest + // of the batch = 12 calls + // Calls to DLT producer - 1 message full failure + verifyContainerBehavior(container, sendMessageBuilderMock, true, 12, 1); + } + finally { + safeStopContainer(container); + pulsarClient.close(); } - PulsarOperations.SendMessageBuilder sendMessageBuilderMock = mock( - PulsarOperations.SendMessageBuilder.class); - - when(mockPulsarTemplate.newMessage(any(Integer.class)) - .withTopic(any(String.class)) - .withMessageCustomizer(any(TypedMessageBuilderCustomizer.class))).thenReturn(sendMessageBuilderMock); - - // 1 + 10 + 1 = 12 calls altogether - await().atMost(Duration.ofSeconds(30)) - .untilAsserted(() -> verify(pulsarBatchMessageListener, times(12)).received(any(Consumer.class), - any(List.class), any(Acknowledgement.class))); - await().atMost(Duration.ofSeconds(30)) - .untilAsserted(() -> verify(sendMessageBuilderMock, times(1)).sendAsync()); - - container.stop(); - pulsarClient.close(); } @Test - @SuppressWarnings("unchecked") - void batchRecordListenerRecordFailsTwiceInTheMiddle() throws Exception { - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) - .build(); - DefaultPulsarConsumerFactory pulsarConsumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, - List.of((consumerBuilder) -> { - consumerBuilder.topic("default-error-handler-tests-6"); - consumerBuilder.subscriptionName("default-error-handler-tests-sub-6"); - })); - - PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties(); - pulsarContainerProperties.setMaxNumMessages(10); - pulsarContainerProperties.setBatchTimeoutMillis(60_000); - pulsarContainerProperties.setBatchListener(true); - PulsarBatchAcknowledgingMessageListener pulsarBatchMessageListener = mock( - PulsarBatchAcknowledgingMessageListener.class); + void whenBatchRecordListenerRecordFailsTwiceInTheMiddleThenSendBothFailedMessagesToDlt() throws Exception { + var topicName = "default-error-handler-tests-6"; + var pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build(); - doAnswer(invocation -> { + PulsarOperations.SendMessageBuilder sendMessageBuilderMock = mock(); + var mockPulsarTemplate = createMockPulsarTemplate(sendMessageBuilderMock); + var container = buildPulsarContainer(pulsarClient, (invocation) -> { List> messages = invocation.getArgument(1); - + Acknowledgement acknowledgment = invocation.getArgument(2); for (Message message : messages) { if (message.getValue() == 2 || message.getValue() == 5) { throw new PulsarBatchListenerFailedException("failed", message); } else { - Acknowledgement acknowledgment = invocation.getArgument(2); acknowledgment.acknowledge(message.getMessageId()); } } return new Object(); - }).when(pulsarBatchMessageListener).received(any(Consumer.class), any(List.class), any(Acknowledgement.class)); - - pulsarContainerProperties.setMessageListener(pulsarBatchMessageListener); - pulsarContainerProperties.setSchema(Schema.INT32); - pulsarContainerProperties.setAckMode(AckMode.MANUAL); - DefaultPulsarMessageListenerContainer container = new DefaultPulsarMessageListenerContainer<>( - pulsarConsumerFactory, pulsarContainerProperties); - PulsarTemplate mockPulsarTemplate = mock(PulsarTemplate.class, RETURNS_DEEP_STUBS); + }, topicName, 10, true); + container.setPulsarConsumerErrorHandler(createErrorHandler(mockPulsarTemplate, 10)); - container.setPulsarConsumerErrorHandler(new DefaultPulsarConsumerErrorHandler<>( - new PulsarDeadLetterPublishingRecoverer<>(mockPulsarTemplate), new FixedBackOff(100, 10))); - - container.start(); + try { + container.start(); + sendMessages(pulsarClient, topicName, IntStream.range(0, 10).boxed().collect(Collectors.toList())); - DefaultPulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, - "default-error-handler-tests-6"); - PulsarTemplate pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory); - for (int i = 0; i < 10; i++) { - pulsarTemplate.sendAsync(i); + // Calls to listener - 1 initial + 10 retries for first failure + 1 for second + // half of the batch + 10 retries for the second failure + 1 final for the + // rest of the batch = 23 calls + // Calls to DLT producer - 2 messages full failures + verifyContainerBehavior(container, sendMessageBuilderMock, true, 23, 2); + } + finally { + safeStopContainer(container); + pulsarClient.close(); } - PulsarOperations.SendMessageBuilder sendMessageBuilderMock = mock( - PulsarOperations.SendMessageBuilder.class); - - when(mockPulsarTemplate.newMessage(any(Integer.class)) - .withTopic(any(String.class)) - .withMessageCustomizer(any(TypedMessageBuilderCustomizer.class))).thenReturn(sendMessageBuilderMock); - - // 1 + 10 + 1 + 10 + 1 = 23 calls altogether - await().atMost(Duration.ofSeconds(30)) - .untilAsserted(() -> verify(pulsarBatchMessageListener, times(23)).received(any(Consumer.class), - any(List.class), any(Acknowledgement.class))); - await().atMost(Duration.ofSeconds(30)) - .untilAsserted(() -> verify(sendMessageBuilderMock, times(2)).sendAsync()); - - container.stop(); - pulsarClient.close(); } @Test - @SuppressWarnings("unchecked") - void batchRecordListenerRecordFailsInTheMiddleButTransientError() throws Exception { - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) - .build(); - DefaultPulsarConsumerFactory pulsarConsumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, - List.of((consumerBuilder) -> { - consumerBuilder.topic("default-error-handler-tests-7"); - consumerBuilder.subscriptionName("default-error-handler-tests-sub-7"); - })); - - PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties(); - pulsarContainerProperties.setMaxNumMessages(10); - pulsarContainerProperties.setBatchTimeoutMillis(60_000); - pulsarContainerProperties.setBatchListener(true); - PulsarBatchAcknowledgingMessageListener pulsarBatchMessageListener = mock( - PulsarBatchAcknowledgingMessageListener.class); + void whenBatchRecordListenerRecordFailsInTheMiddleButTransientErrorThenDontSendToDlt() throws Exception { + var topicName = "default-error-handler-tests-7"; + var pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build(); + PulsarOperations.SendMessageBuilder sendMessageBuilderMock = mock(); + var mockPulsarTemplate = createMockPulsarTemplate(sendMessageBuilderMock); AtomicInteger count = new AtomicInteger(0); - doAnswer(invocation -> { + var container = buildPulsarContainer(pulsarClient, (invocation) -> { List> messages = invocation.getArgument(1); Acknowledgement acknowledgment = invocation.getArgument(2); for (Message message : messages) { if (message.getValue() == 5) { int currentCount = count.getAndIncrement(); + if (currentCount < 3) { throw new PulsarBatchListenerFailedException("failed", message); } @@ -459,57 +285,34 @@ void batchRecordListenerRecordFailsInTheMiddleButTransientError() throws Excepti } } return new Object(); - }).when(pulsarBatchMessageListener).received(any(Consumer.class), any(List.class), any(Acknowledgement.class)); - - pulsarContainerProperties.setMessageListener(pulsarBatchMessageListener); - pulsarContainerProperties.setSchema(Schema.INT32); - pulsarContainerProperties.setAckMode(AckMode.MANUAL); - DefaultPulsarMessageListenerContainer container = new DefaultPulsarMessageListenerContainer<>( - pulsarConsumerFactory, pulsarContainerProperties); - PulsarTemplate mockPulsarTemplate = mock(PulsarTemplate.class, RETURNS_DEEP_STUBS); + }, topicName, 10, true); + container.setPulsarConsumerErrorHandler(createErrorHandler(mockPulsarTemplate, 10)); - container.setPulsarConsumerErrorHandler(new DefaultPulsarConsumerErrorHandler<>( - new PulsarDeadLetterPublishingRecoverer<>(mockPulsarTemplate), new FixedBackOff(100, 10))); - - container.start(); + try { + container.start(); + sendMessages(pulsarClient, topicName, IntStream.range(0, 10).boxed().collect(Collectors.toList())); - DefaultPulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, - "default-error-handler-tests-7"); - PulsarTemplate pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory); - for (int i = 0; i < 10; i++) { - pulsarTemplate.sendAsync(i); + // Calls to listener - 1 initial + 2 retries + 1 final for the rest of the + // batch = 4 calls + // Calls to DLT producer - 0 full failures + verifyContainerBehavior(container, sendMessageBuilderMock, true, 4, 0); + } + finally { + safeStopContainer(container); + pulsarClient.close(); } - // 1 + 3 + 1 = 5 calls altogether - await().atMost(Duration.ofSeconds(30)) - .untilAsserted(() -> verify(pulsarBatchMessageListener, times(4)).received(any(Consumer.class), - any(List.class), any(Acknowledgement.class))); - verifyNoInteractions(mockPulsarTemplate); - - container.stop(); - pulsarClient.close(); } @Test - @SuppressWarnings("unchecked") - void batchListenerFailsTransientErrorFollowedByNonTransient() throws Exception { - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) - .build(); - DefaultPulsarConsumerFactory pulsarConsumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, - List.of((consumerBuilder) -> { - consumerBuilder.topic("default-error-handler-tests-8"); - consumerBuilder.subscriptionName("default-error-handler-tests-sub-8"); - })); - - PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties(); - pulsarContainerProperties.setMaxNumMessages(10); - pulsarContainerProperties.setBatchTimeoutMillis(60_000); - pulsarContainerProperties.setBatchListener(true); - PulsarBatchAcknowledgingMessageListener pulsarBatchMessageListener = mock( - PulsarBatchAcknowledgingMessageListener.class); + void whenBatchRecordListenerFailsTransientErrorFollowedByNonTransientThenSendAllNonTransientFailedMessageToDlt() + throws Exception { + var topicName = "default-error-handler-tests-8"; + var pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build(); + PulsarOperations.SendMessageBuilder sendMessageBuilderMock = mock(); + var mockPulsarTemplate = createMockPulsarTemplate(sendMessageBuilderMock); AtomicInteger count = new AtomicInteger(0); - doAnswer(invocation -> { + var container = buildPulsarContainer(pulsarClient, (invocation) -> { List> messages = invocation.getArgument(1); Acknowledgement acknowledgment = invocation.getArgument(2); for (Message message : messages) { @@ -530,62 +333,33 @@ else if (message.getValue() == 7) { } } return new Object(); - }).when(pulsarBatchMessageListener).received(any(Consumer.class), any(List.class), any(Acknowledgement.class)); - - pulsarContainerProperties.setMessageListener(pulsarBatchMessageListener); - pulsarContainerProperties.setSchema(Schema.INT32); - pulsarContainerProperties.setAckMode(AckMode.MANUAL); - DefaultPulsarMessageListenerContainer container = new DefaultPulsarMessageListenerContainer<>( - pulsarConsumerFactory, pulsarContainerProperties); - PulsarTemplate mockPulsarTemplate = mock(PulsarTemplate.class, RETURNS_DEEP_STUBS); + }, topicName, 10, true); + container.setPulsarConsumerErrorHandler(createErrorHandler(mockPulsarTemplate, 10)); - container.setPulsarConsumerErrorHandler(new DefaultPulsarConsumerErrorHandler<>( - new PulsarDeadLetterPublishingRecoverer<>(mockPulsarTemplate), new FixedBackOff(100, 10))); - - container.start(); + try { + container.start(); + sendMessages(pulsarClient, topicName, IntStream.range(0, 10).boxed().collect(Collectors.toList())); - DefaultPulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, - "default-error-handler-tests-8"); - PulsarTemplate pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory); - for (int i = 0; i < 10; i++) { - pulsarTemplate.sendAsync(i); + // Calls to listener - 1 initial + 2 retries for transient failure + 1 for + // second half of the batch + 10 retries for the second failure + 1 final for + // the rest of the batch = 15 calls + // Calls to DLT producer - 1 message full failure + verifyContainerBehavior(container, sendMessageBuilderMock, true, 15, 1); + } + finally { + safeStopContainer(container); + pulsarClient.close(); } - PulsarOperations.SendMessageBuilder sendMessageBuilderMock = mock( - PulsarOperations.SendMessageBuilder.class); - - when(mockPulsarTemplate.newMessage(any(Integer.class)) - .withTopic(any(String.class)) - .withMessageCustomizer(any(TypedMessageBuilderCustomizer.class))).thenReturn(sendMessageBuilderMock); - // 1 + 2 + 1 + 10 + 1 = 15 calls altogether - await().atMost(Duration.ofSeconds(30)) - .untilAsserted(() -> verify(pulsarBatchMessageListener, times(15)).received(any(Consumer.class), - any(List.class), any(Acknowledgement.class))); - await().atMost(Duration.ofSeconds(30)) - .untilAsserted(() -> verify(sendMessageBuilderMock, times(1)).sendAsync()); - - container.stop(); - pulsarClient.close(); } @Test - @SuppressWarnings("unchecked") - void whenBatchRecordListenerOneMessageBatchFailsThenSentToDltProperly() throws Exception { + void whenBatchRecordListenerOneMessageBatchFailsThenSentToDltProperlyThenSendToDlt() throws Exception { var topicName = "default-error-handler-tests-9"; var pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build(); - var pulsarConsumerFactory = new DefaultPulsarConsumerFactory(pulsarClient, - List.of((consumerBuilder) -> { - consumerBuilder.topic(topicName); - consumerBuilder.subscriptionName("%s-sub".formatted(topicName)); - })); - // Prepare container for batch consume - var pulsarContainerProperties = new PulsarContainerProperties(); - pulsarContainerProperties.setSchema(Schema.INT32); - pulsarContainerProperties.setAckMode(AckMode.MANUAL); - pulsarContainerProperties.setBatchListener(true); - pulsarContainerProperties.setMaxNumMessages(1); - pulsarContainerProperties.setBatchTimeoutMillis(60_000); - PulsarBatchAcknowledgingMessageListener pulsarBatchMessageListener = mock(); - doAnswer(invocation -> { + + PulsarOperations.SendMessageBuilder sendMessageBuilderMock = mock(); + var mockPulsarTemplate = createMockPulsarTemplate(sendMessageBuilderMock); + var container = buildPulsarContainer(pulsarClient, (invocation) -> { List> message = invocation.getArgument(1); Message integerMessage = message.get(0); Integer value = integerMessage.getValue(); @@ -599,37 +373,109 @@ void whenBatchRecordListenerOneMessageBatchFailsThenSentToDltProperly() throws E } acknowledgment.acknowledge(messageIds); return new Object(); - }).when(pulsarBatchMessageListener).received(any(Consumer.class), any(List.class), any(Acknowledgement.class)); - pulsarContainerProperties.setMessageListener(pulsarBatchMessageListener); - var container = new DefaultPulsarMessageListenerContainer<>(pulsarConsumerFactory, pulsarContainerProperties); + }, topicName, 1, true); + container.setPulsarConsumerErrorHandler(createErrorHandler(mockPulsarTemplate, 2)); - // Set error handler to recover after 2 retries - PulsarTemplate mockPulsarTemplate = mock(RETURNS_DEEP_STUBS); - PulsarOperations.SendMessageBuilder sendMessageBuilderMock = mock(); - when(mockPulsarTemplate.newMessage(any(Integer.class)) - .withTopic(any(String.class)) - .withMessageCustomizer(any(TypedMessageBuilderCustomizer.class))).thenReturn(sendMessageBuilderMock); - container.setPulsarConsumerErrorHandler(new DefaultPulsarConsumerErrorHandler<>( - new PulsarDeadLetterPublishingRecoverer<>(mockPulsarTemplate), new FixedBackOff(100, 2))); try { container.start(); // Send single message in batch - var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient, topicName); - var pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory); - pulsarTemplate.sendAsync(0); - // Initial call should fail - // Next 2 calls should fail (retries 2) - // No more calls after that - msg should go to DLT - await().atMost(Duration.ofSeconds(30)) - .untilAsserted(() -> verify(pulsarBatchMessageListener, times(3)).received(any(Consumer.class), - any(List.class), any(Acknowledgement.class))); - await().atMost(Duration.ofSeconds(30)) - .untilAsserted(() -> verify(sendMessageBuilderMock, times(1)).sendAsync()); + sendMessages(pulsarClient, topicName, List.of(0)); + + // Calls to listener - 1 initial + 2 retries for transient failure + 1 for + // second half of the batch + 10 retries for the second failure + 1 final for + // the rest of the batch = 15 calls + // Calls to DLT producer - 1 message full failure + verifyContainerBehavior(container, sendMessageBuilderMock, true, 3, 1); } finally { safeStopContainer(container); + pulsarClient.close(); + } + } + + @SuppressWarnings("unchecked") + private DefaultPulsarMessageListenerContainer buildPulsarContainer(PulsarClient pulsarClient, + Answer answer, String topicName, int maxNumMessages, boolean isBatch) { + PulsarContainerProperties pulsarContainerProperties; + PulsarRecordMessageListener pulsarListener; + if (isBatch) { + pulsarContainerProperties = buildPulsarContainerPropertiesForBatch(maxNumMessages); + pulsarListener = mock(PulsarBatchAcknowledgingMessageListener.class); + doAnswer(answer).when((PulsarBatchAcknowledgingMessageListener) pulsarListener) + .received(any(Consumer.class), any(List.class), any(Acknowledgement.class)); } - pulsarClient.close(); + else { + pulsarContainerProperties = buildPulsarContainerPropertiesForSingle(); + pulsarListener = mock(PulsarRecordMessageListener.class); + doAnswer(answer).when((PulsarRecordMessageListener) pulsarListener) + .received(any(Consumer.class), any(Message.class)); + } + pulsarContainerProperties.setMessageListener(pulsarListener); + + var pulsarConsumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, List.of((consumerBuilder) -> { + consumerBuilder.topic(topicName); + consumerBuilder.subscriptionName("%s-sub".formatted(topicName)); + })); + + return new DefaultPulsarMessageListenerContainer<>(pulsarConsumerFactory, pulsarContainerProperties); + } + + private PulsarContainerProperties buildPulsarContainerPropertiesForBatch(int maxNumMessages) { + var pulsarContainerProperties = new PulsarContainerProperties(); + pulsarContainerProperties.setSchema(Schema.INT32); + pulsarContainerProperties.setAckMode(AckMode.MANUAL); + pulsarContainerProperties.setBatchListener(true); + pulsarContainerProperties.setMaxNumMessages(maxNumMessages); + pulsarContainerProperties.setBatchTimeoutMillis(60_000); + return pulsarContainerProperties; + } + + private PulsarContainerProperties buildPulsarContainerPropertiesForSingle() { + var pulsarContainerProperties = new PulsarContainerProperties(); + pulsarContainerProperties.setSchema(Schema.INT32); + return pulsarContainerProperties; + } + + @SuppressWarnings("unchecked") + private PulsarTemplate createMockPulsarTemplate(SendMessageBuilder sendMessageBuilderMock) { + PulsarTemplate mockPulsarTemplate = mock(RETURNS_DEEP_STUBS); + when(mockPulsarTemplate.newMessage(any(Integer.class)) + .withTopic(any(String.class)) + .withMessageCustomizer(any(TypedMessageBuilderCustomizer.class))).thenReturn(sendMessageBuilderMock); + return mockPulsarTemplate; + } + + private void sendMessages(PulsarClient pulsarClient, String topicName, List messages) { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient, topicName); + var pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory); + messages.forEach(pulsarTemplate::sendAsync); + } + + @SuppressWarnings("unchecked") + private void verifyContainerBehavior(DefaultPulsarMessageListenerContainer container, + PulsarOperations.SendMessageBuilder sendMessageBuilderMock, boolean isBatch, + int expectedReceivedCalls, int expectedSendAsyncCalls) { + if (isBatch) { + await().atMost(Duration.ofSeconds(30)) + .untilAsserted( + () -> verify((PulsarBatchAcknowledgingMessageListener) container.getContainerProperties() + .getMessageListener(), times(expectedReceivedCalls)) + .received(any(Consumer.class), any(List.class), any(Acknowledgement.class))); + } + else { + await().atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> verify( + (PulsarRecordMessageListener) container.getContainerProperties().getMessageListener(), + times(expectedReceivedCalls)) + .received(any(Consumer.class), any(Message.class))); + } + await().atMost(Duration.ofSeconds(30)) + .untilAsserted(() -> verify(sendMessageBuilderMock, times(expectedSendAsyncCalls)).sendAsync()); + } + + private DefaultPulsarConsumerErrorHandler createErrorHandler(PulsarTemplate pulsarTemplate, int retries) { + return new DefaultPulsarConsumerErrorHandler<>(new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate), + new FixedBackOff(100, retries)); } private void safeStopContainer(PulsarMessageListenerContainer container) {