Skip to content

Commit eee3515

Browse files
Test
1 parent c44c339 commit eee3515

File tree

4 files changed

+72
-2
lines changed

4 files changed

+72
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.lang.Thread.UncaughtExceptionHandler;
1920
import java.nio.ByteBuffer;
2021
import java.time.Duration;
2122
import java.util.AbstractMap.SimpleEntry;
@@ -895,6 +896,11 @@ else if (listener instanceof MessageListener) {
895896
this.wantsFullRecords = false;
896897
this.pollThreadStateProcessor = setUpPollProcessor(false);
897898
this.observationEnabled = this.containerProperties.isObservationEnabled();
899+
900+
final BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback
901+
= (cRecord, runtimeException) ->
902+
this.invokeErrorHandlerBySingleRecord(cRecord, runtimeException);
903+
this.listener.setAsyncRetryCallback(asyncRetryCallback);
898904
}
899905
else {
900906
throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
@@ -2827,6 +2833,42 @@ private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {
28272833
}
28282834
}
28292835

2836+
private void invokeErrorHandlerBySingleRecord(final ConsumerRecord<K, V> cRecord, RuntimeException rte) {
2837+
if (this.commonErrorHandler.seeksAfterHandling() || rte instanceof CommitFailedException) {
2838+
try {
2839+
if (this.producer == null) {
2840+
processCommits();
2841+
}
2842+
}
2843+
catch (Exception ex) { // NO SONAR
2844+
this.logger.error(ex, "Failed to commit before handling error");
2845+
}
2846+
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
2847+
records.add(cRecord);
2848+
this.commonErrorHandler.handleRemaining(rte, records, this.consumer,
2849+
KafkaMessageListenerContainer.this.thisOrParentContainer);
2850+
}
2851+
else {
2852+
boolean handled = false;
2853+
try {
2854+
handled = this.commonErrorHandler.handleOne(rte, cRecord, this.consumer,
2855+
KafkaMessageListenerContainer.this.thisOrParentContainer);
2856+
}
2857+
catch (Exception ex) {
2858+
this.logger.error(ex, "ErrorHandler threw unexpected exception");
2859+
}
2860+
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
2861+
if (!handled) {
2862+
records.computeIfAbsent(new TopicPartition(cRecord.topic(), cRecord.partition()),
2863+
tp -> new ArrayList<>()).add(cRecord);
2864+
}
2865+
if (!records.isEmpty()) {
2866+
this.remainingRecords = new ConsumerRecords<>(records);
2867+
this.pauseForPending = true;
2868+
}
2869+
}
2870+
}
2871+
28302872
private void invokeErrorHandler(final ConsumerRecord<K, V> cRecord,
28312873
Iterator<ConsumerRecord<K, V>> iterator, RuntimeException rte) {
28322874

spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListener.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.function.BiConsumer;
20+
1921
import org.apache.kafka.clients.consumer.ConsumerRecord;
2022

2123
/**
@@ -30,4 +32,7 @@
3032
@FunctionalInterface
3133
public interface MessageListener<K, V> extends GenericMessageListener<ConsumerRecord<K, V>> {
3234

35+
default void setAsyncRetryCallback(BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback) {
36+
//
37+
}
3338
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020
import java.time.Clock;
2121
import java.time.Instant;
2222
import java.util.Optional;
23+
import java.util.function.BiConsumer;
2324

2425
import org.apache.kafka.clients.consumer.Consumer;
2526
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -143,4 +144,9 @@ public void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment)
143144
public void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer) {
144145
onMessage(data, null, consumer);
145146
}
147+
148+
@Override
149+
public void setAsyncRetryCallback(BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback) {
150+
this.delegate.setAsyncRetryCallback(asyncRetryCallback);
151+
}
146152
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Map;
2929
import java.util.Objects;
3030
import java.util.concurrent.CompletableFuture;
31+
import java.util.function.BiConsumer;
3132
import java.util.stream.Collectors;
3233

3334
import org.apache.commons.logging.LogFactory;
@@ -46,6 +47,7 @@
4647
import org.springframework.expression.spel.support.StandardEvaluationContext;
4748
import org.springframework.expression.spel.support.StandardTypeConverter;
4849
import org.springframework.kafka.core.KafkaTemplate;
50+
import org.springframework.kafka.listener.AsyncRetryableException;
4951
import org.springframework.kafka.listener.ConsumerSeekAware;
5052
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
5153
import org.springframework.kafka.listener.ListenerExecutionFailedException;
@@ -152,6 +154,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
152154

153155
private String correlationHeaderName = KafkaHeaders.CORRELATION_ID;
154156

157+
private BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback;
158+
155159
/**
156160
* Create an instance with the provided bean and method.
157161
* @param bean the bean.
@@ -664,6 +668,15 @@ protected void acknowledge(@Nullable Acknowledgment acknowledgment) {
664668
protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
665669
Throwable t, Message<?> source) {
666670

671+
if (t.getCause() instanceof AsyncRetryableException) {
672+
if (request instanceof ConsumerRecord) {
673+
ConsumerRecord<K, V> record = (ConsumerRecord<K, V>) request;
674+
AsyncRetryableException ex = (AsyncRetryableException) t.getCause();
675+
asyncRetryCallback.accept(record, ex);
676+
return;
677+
}
678+
}
679+
667680
try {
668681
handleException(request, acknowledgment, consumer, source,
669682
new ListenerExecutionFailedException(createMessagingErrorMessage(
@@ -887,4 +900,8 @@ public void acknowledge() {
887900

888901
}
889902

903+
public void setAsyncRetryCallback(BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback) {
904+
this.asyncRetryCallback = asyncRetryCallback;
905+
}
906+
890907
}

0 commit comments

Comments
 (0)