Skip to content

Commit 8667995

Browse files
onobcdarshimo
andauthored
Backport "Fix batch listener error handling" (1.1.x) (#1006)
* Fix batch listener error handling Updates the batch listener error handler code to handle the case where there is only a single outstanding message in the batch list when retries are expired. Fixes #998 Signed-off-by: darshimo <[email protected]> (cherry picked from commit fddc711) * Polish "Fix batch listener error handling" Updates the error handler logic to only use `List.remove` in the special case when 1 item in batch. Also adds a test for the special case when 1 item in batch. See #998 (cherry picked from commit 16e0c9e) --------- Co-authored-by: darshimo <[email protected]>
1 parent a72562c commit 8667995

File tree

2 files changed

+78
-2
lines changed

2 files changed

+78
-2
lines changed

spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -748,12 +748,11 @@ private List<Message<T>> invokeBatchListenerErrorHandler(AtomicBoolean inRetryMo
748748
pulsarBatchListenerFailedException);
749749
handleAck(pulsarMessage, txn);
750750
if (messageList.size() == 1) {
751+
messageList.remove(0);
751752
messagesPendingInBatch.set(false);
752753
}
753754
else {
754755
messageList = messageList.subList(1, messageList.size());
755-
}
756-
if (!messageList.isEmpty()) {
757756
messagesPendingInBatch.set(true);
758757
}
759758
this.pulsarConsumerErrorHandler.clearMessage();

spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarConsumerErrorHandlerTests.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.pulsar.client.api.Schema;
3939
import org.junit.jupiter.api.Test;
4040

41+
import org.springframework.core.log.LogAccessor;
4142
import org.springframework.pulsar.core.DefaultPulsarConsumerFactory;
4243
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
4344
import org.springframework.pulsar.core.PulsarOperations;
@@ -51,6 +52,8 @@
5152
*/
5253
public class DefaultPulsarConsumerErrorHandlerTests implements PulsarTestContainerSupport {
5354

55+
private final LogAccessor logger = new LogAccessor(this.getClass());
56+
5457
@Test
5558
@SuppressWarnings("unchecked")
5659
void happyPathErrorHandlingForRecordMessageListener() throws Exception {
@@ -564,4 +567,78 @@ else if (message.getValue() == 7) {
564567
pulsarClient.close();
565568
}
566569

570+
@Test
571+
@SuppressWarnings("unchecked")
572+
void whenBatchRecordListenerOneMessageBatchFailsThenSentToDltProperly() throws Exception {
573+
var topicName = "default-error-handler-tests-9";
574+
var pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build();
575+
var pulsarConsumerFactory = new DefaultPulsarConsumerFactory<Integer>(pulsarClient,
576+
List.of((consumerBuilder) -> {
577+
consumerBuilder.topic(topicName);
578+
consumerBuilder.subscriptionName("%s-sub".formatted(topicName));
579+
}));
580+
// Prepare container for batch consume
581+
var pulsarContainerProperties = new PulsarContainerProperties();
582+
pulsarContainerProperties.setSchema(Schema.INT32);
583+
pulsarContainerProperties.setAckMode(AckMode.MANUAL);
584+
pulsarContainerProperties.setBatchListener(true);
585+
pulsarContainerProperties.setMaxNumMessages(1);
586+
pulsarContainerProperties.setBatchTimeoutMillis(60_000);
587+
PulsarBatchAcknowledgingMessageListener<?> pulsarBatchMessageListener = mock();
588+
doAnswer(invocation -> {
589+
List<Message<Integer>> message = invocation.getArgument(1);
590+
Message<Integer> integerMessage = message.get(0);
591+
Integer value = integerMessage.getValue();
592+
if (value == 0) {
593+
throw new PulsarBatchListenerFailedException("failed", integerMessage);
594+
}
595+
Acknowledgement acknowledgment = invocation.getArgument(2);
596+
List<MessageId> messageIds = new ArrayList<>();
597+
for (Message<Integer> integerMessage1 : message) {
598+
messageIds.add(integerMessage1.getMessageId());
599+
}
600+
acknowledgment.acknowledge(messageIds);
601+
return new Object();
602+
}).when(pulsarBatchMessageListener).received(any(Consumer.class), any(List.class), any(Acknowledgement.class));
603+
pulsarContainerProperties.setMessageListener(pulsarBatchMessageListener);
604+
var container = new DefaultPulsarMessageListenerContainer<>(pulsarConsumerFactory, pulsarContainerProperties);
605+
606+
// Set error handler to recover after 2 retries
607+
PulsarTemplate<Integer> mockPulsarTemplate = mock(RETURNS_DEEP_STUBS);
608+
PulsarOperations.SendMessageBuilder<Integer> sendMessageBuilderMock = mock();
609+
when(mockPulsarTemplate.newMessage(any(Integer.class))
610+
.withTopic(any(String.class))
611+
.withMessageCustomizer(any(TypedMessageBuilderCustomizer.class))).thenReturn(sendMessageBuilderMock);
612+
container.setPulsarConsumerErrorHandler(new DefaultPulsarConsumerErrorHandler<>(
613+
new PulsarDeadLetterPublishingRecoverer<>(mockPulsarTemplate), new FixedBackOff(100, 2)));
614+
try {
615+
container.start();
616+
// Send single message in batch
617+
var pulsarProducerFactory = new DefaultPulsarProducerFactory<Integer>(pulsarClient, topicName);
618+
var pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);
619+
pulsarTemplate.sendAsync(0);
620+
// Initial call should fail
621+
// Next 2 calls should fail (retries 2)
622+
// No more calls after that - msg should go to DLT
623+
await().atMost(Duration.ofSeconds(30))
624+
.untilAsserted(() -> verify(pulsarBatchMessageListener, times(3)).received(any(Consumer.class),
625+
any(List.class), any(Acknowledgement.class)));
626+
await().atMost(Duration.ofSeconds(30))
627+
.untilAsserted(() -> verify(sendMessageBuilderMock, times(1)).sendAsync());
628+
}
629+
finally {
630+
safeStopContainer(container);
631+
}
632+
pulsarClient.close();
633+
}
634+
635+
private void safeStopContainer(PulsarMessageListenerContainer container) {
636+
try {
637+
container.stop();
638+
}
639+
catch (Exception ex) {
640+
logger.warn(ex, "Failed to stop container %s: %s".formatted(container, ex.getMessage()));
641+
}
642+
}
643+
567644
}

0 commit comments

Comments
 (0)