Skip to content

Commit 70b9143

Browse files
garyrussellartembilan
authored andcommitted
GH-2459: FallbackBatchErrorHandler Retryable Ex
Resolves #2459 The `FallbackBatchErrorHandler` was not an `ExceptionClassifier`. The default error handler should propagate exception classifications. **3.0.x - I will back port**
1 parent 97ed09e commit 70b9143

File tree

6 files changed

+92
-11
lines changed

6 files changed

+92
-11
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.kafka.clients.consumer.ConsumerRecords;
2727
import org.apache.kafka.common.TopicPartition;
2828

29+
import org.springframework.classify.BinaryExceptionClassifier;
2930
import org.springframework.core.log.LogAccessor;
3031
import org.springframework.kafka.KafkaException;
3132
import org.springframework.kafka.support.KafkaUtils;
@@ -59,11 +60,13 @@ private ErrorHandlingUtils() {
5960
* @param logger the logger.
6061
* @param logLevel the log level.
6162
* @param retryListeners the retry listeners.
63+
* @param classifier the exception classifier.
64+
* @since 2.8.11
6265
*/
6366
public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
6467
MessageListenerContainer container, Runnable invokeListener, BackOff backOff,
6568
CommonErrorHandler seeker, BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer, LogAccessor logger,
66-
KafkaException.Level logLevel, List<RetryListener> retryListeners) {
69+
KafkaException.Level logLevel, List<RetryListener> retryListeners, BinaryExceptionClassifier classifier) {
6770

6871
BackOffExecution execution = backOff.start();
6972
long nextBackOff = execution.nextBackOff();
@@ -79,7 +82,8 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
7982
.publishConsumerPausedEvent(assignment, "For batch retry");
8083
}
8184
try {
82-
while (nextBackOff != BackOffExecution.STOP) {
85+
Boolean retryable = classifier.classify(unwrapIfNeeded(thrownException));
86+
while (Boolean.TRUE.equals(retryable) && nextBackOff != BackOffExecution.STOP) {
8387
consumer.poll(Duration.ZERO);
8488
try {
8589
ListenerUtils.stoppableSleep(container, nextBackOff);
@@ -145,4 +149,22 @@ public static String recordsToString(ConsumerRecords<?, ?> records) {
145149
return sb.toString();
146150
}
147151

152+
/**
153+
* Remove a {@link TimestampedException}, if present.
154+
* Remove a {@link ListenerExecutionFailedException}, if present.
155+
* @param exception the exception.
156+
* @return the unwrapped cause or cause of cause.
157+
* @since 2.8.11
158+
*/
159+
public static Exception unwrapIfNeeded(Exception exception) {
160+
Exception theEx = exception;
161+
if (theEx instanceof TimestampedException && theEx.getCause() instanceof Exception cause) {
162+
theEx = cause;
163+
}
164+
if (theEx instanceof ListenerExecutionFailedException && theEx.getCause() instanceof Exception cause) {
165+
theEx = cause;
166+
}
167+
return theEx;
168+
}
169+
148170
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222
import java.util.Map;
2323
import java.util.stream.Collectors;
24+
import java.util.stream.Stream;
2425

2526
import org.springframework.classify.BinaryExceptionClassifier;
2627
import org.springframework.kafka.support.converter.ConversionException;
@@ -149,6 +150,16 @@ public void setClassifications(Map<Class<? extends Throwable>, Boolean> classifi
149150
@SuppressWarnings("varargs")
150151
public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
151152
add(false, exceptionTypes);
153+
notRetryable(Arrays.stream(exceptionTypes));
154+
}
155+
156+
/**
157+
* Subclasses can override this to receive notification of configuration of not
158+
* retryable exceptions.
159+
* @param notRetryable the not retryable exceptions.
160+
* @since 2.9.3
161+
*/
162+
protected void notRetryable(Stream<Class<? extends Exception>> notRetryable) {
152163
}
153164

154165
/**

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Map;
2727
import java.util.Set;
2828
import java.util.function.BiConsumer;
29+
import java.util.stream.Stream;
2930

3031
import org.apache.kafka.clients.consumer.Consumer;
3132
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -101,6 +102,31 @@ public void setLogLevel(Level logLevel) {
101102
}
102103
}
103104

105+
@Override
106+
protected void notRetryable(Stream<Class<? extends Exception>> notRetryable) {
107+
if (this.fallbackBatchHandler instanceof ExceptionClassifier handler) {
108+
notRetryable.forEach(ex -> handler.addNotRetryableExceptions(ex));
109+
}
110+
}
111+
112+
@Override
113+
public void setClassifications(Map<Class<? extends Throwable>, Boolean> classifications, boolean defaultValue) {
114+
super.setClassifications(classifications, defaultValue);
115+
if (this.fallbackBatchHandler instanceof ExceptionClassifier handler) {
116+
handler.setClassifications(classifications, defaultValue);
117+
}
118+
}
119+
120+
@Override
121+
@Nullable
122+
public Boolean removeClassification(Class<? extends Exception> exceptionType) {
123+
Boolean removed = super.removeClassification(exceptionType);
124+
if (this.fallbackBatchHandler instanceof ExceptionClassifier handler) {
125+
handler.removeClassification(exceptionType);
126+
}
127+
return removed;
128+
}
129+
104130
/**
105131
* Return the fallback batch error handler.
106132
* @return the handler.

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,7 @@ public abstract class FailedRecordProcessor extends ExceptionClassifier implemen
4545

4646
private final BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> noRetriesForClassified =
4747
(rec, ex) -> {
48-
Exception theEx = ex;
49-
if (theEx instanceof TimestampedException && theEx.getCause() instanceof Exception cause) {
50-
theEx = cause;
51-
}
52-
if (theEx instanceof ListenerExecutionFailedException && theEx.getCause() instanceof Exception cause) {
53-
theEx = cause;
54-
}
48+
Exception theEx = ErrorHandlingUtils.unwrapIfNeeded(ex);
5549
if (!getClassifier().classify(theEx)) {
5650
return NO_RETRIES_OR_DELAY_BACKOFF;
5751
}

spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
* @since 2.3.7
4949
*
5050
*/
51-
class FallbackBatchErrorHandler extends KafkaExceptionLogLevelAware implements CommonErrorHandler {
51+
class FallbackBatchErrorHandler extends ExceptionClassifier implements CommonErrorHandler {
5252

5353
private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));
5454

@@ -124,7 +124,7 @@ public void handleBatch(Exception thrownException, @Nullable ConsumerRecords<?,
124124
this.retrying.set(true);
125125
try {
126126
ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
127-
this.seeker, this.recoverer, this.logger, getLogLevel(), this.retryListeners);
127+
this.seeker, this.recoverer, this.logger, getLogLevel(), this.retryListeners, getClassifier());
128128
}
129129
finally {
130130
this.retrying.set(false);

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerBatchTests.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.mockito.BDDMockito.willAnswer;
2525
import static org.mockito.Mockito.inOrder;
2626
import static org.mockito.Mockito.mock;
27+
import static org.mockito.Mockito.never;
2728
import static org.mockito.Mockito.times;
2829
import static org.mockito.Mockito.verify;
2930

@@ -234,6 +235,33 @@ void fallbackListener() {
234235
verify(retryListener).recovered(any(ConsumerRecords.class), any());
235236
}
236237

238+
@SuppressWarnings({ "unchecked", "rawtypes" })
239+
@Test
240+
void notRetryable() {
241+
Consumer mockConsumer = mock(Consumer.class);
242+
ConsumerRecordRecoverer recoverer = mock(ConsumerRecordRecoverer.class);
243+
DefaultErrorHandler beh = new DefaultErrorHandler(recoverer, new FixedBackOff(0, 2));
244+
beh.addNotRetryableExceptions(IllegalStateException.class);
245+
RetryListener retryListener = mock(RetryListener.class);
246+
beh.setRetryListeners(retryListener);
247+
TopicPartition tp = new TopicPartition("foo", 0);
248+
ConsumerRecords<?, ?> records = new ConsumerRecords(Collections.singletonMap(tp,
249+
List.of(new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo",
250+
new RecordHeaders(), Optional.empty()),
251+
new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo",
252+
new RecordHeaders(), Optional.empty()))));
253+
MessageListenerContainer container = mock(MessageListenerContainer.class);
254+
given(container.isRunning()).willReturn(true);
255+
beh.handleBatch(new ListenerExecutionFailedException("test", new IllegalStateException()),
256+
records, mockConsumer, container, () -> {
257+
});
258+
verify(retryListener).failedDelivery(any(ConsumerRecords.class), any(), eq(1));
259+
// no retries
260+
verify(retryListener, never()).failedDelivery(any(ConsumerRecords.class), any(), eq(2));
261+
verify(recoverer, times(2)).accept(any(), any()); // each record in batch
262+
verify(retryListener).recovered(any(ConsumerRecords.class), any());
263+
}
264+
237265
@Configuration
238266
@EnableKafka
239267
public static class Config {

0 commit comments

Comments
 (0)