Skip to content

Commit 6ec1b3b

Browse files
committed
GH-2459: FallbackBatchErrorHandler Retryable Ex
Resolves #2459 The `FallbackBatchErrorHandler` was not an `ExceptionClassifier`. The default error handler should propagate exception classifications. **2.8.x**x
1 parent 48b2017 commit 6ec1b3b

File tree

6 files changed

+159
-5
lines changed

6 files changed

+159
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlerAdapter.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@
1919
import java.util.Collection;
2020
import java.util.Collections;
2121
import java.util.List;
22+
import java.util.Map;
23+
import java.util.stream.Stream;
2224

2325
import org.apache.kafka.clients.consumer.Consumer;
2426
import org.apache.kafka.clients.consumer.ConsumerRecord;
2527
import org.apache.kafka.clients.consumer.ConsumerRecords;
2628
import org.apache.kafka.common.TopicPartition;
2729

2830
import org.springframework.kafka.support.TopicPartitionOffset;
31+
import org.springframework.lang.Nullable;
2932
import org.springframework.util.Assert;
3033

3134
/**
@@ -35,7 +38,7 @@
3538
* @since 2.7.4
3639
*
3740
*/
38-
class ErrorHandlerAdapter implements CommonErrorHandler {
41+
class ErrorHandlerAdapter extends ExceptionClassifier implements CommonErrorHandler {
3942

4043
@SuppressWarnings({ "rawtypes", "unchecked" })
4144
private static final ConsumerRecords EMPTY_BATCH = new ConsumerRecords(Collections.emptyMap());
@@ -170,5 +173,30 @@ public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartit
170173
}
171174
}
172175

176+
@Override
177+
protected void notRetryable(Stream<Class<? extends Exception>> notRetryable) {
178+
if (this.batchErrorHandler instanceof ExceptionClassifier) {
179+
notRetryable.forEach(ex -> ((ExceptionClassifier) this.batchErrorHandler).addNotRetryableExceptions(ex));
180+
}
181+
}
182+
183+
@Override
184+
public void setClassifications(Map<Class<? extends Throwable>, Boolean> classifications, boolean defaultValue) {
185+
super.setClassifications(classifications, defaultValue);
186+
if (this.batchErrorHandler instanceof ExceptionClassifier) {
187+
((ExceptionClassifier) this.batchErrorHandler).setClassifications(classifications, defaultValue);
188+
}
189+
}
190+
191+
@Override
192+
@Nullable
193+
public Boolean removeClassification(Class<? extends Exception> exceptionType) {
194+
Boolean removed = super.removeClassification(exceptionType);
195+
if (this.batchErrorHandler instanceof ExceptionClassifier) {
196+
((ExceptionClassifier) this.batchErrorHandler).removeClassification(exceptionType);
197+
}
198+
return removed;
199+
}
200+
173201
}
174202

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.kafka.clients.consumer.ConsumerRecords;
2727
import org.apache.kafka.common.TopicPartition;
2828

29+
import org.springframework.classify.BinaryExceptionClassifier;
2930
import org.springframework.core.log.LogAccessor;
3031
import org.springframework.kafka.KafkaException;
3132
import org.springframework.lang.Nullable;
@@ -77,18 +78,49 @@ public static void clearRetryListeners() {
7778
* @param recoverer the recoverer.
7879
* @param logger the logger.
7980
* @param logLevel the log level.
81+
* @deprecated in favor of
82+
* {@link #retryBatch(Exception, ConsumerRecords, Consumer, MessageListenerContainer, Runnable, BackOff, CommonErrorHandler, BiConsumer, LogAccessor, org.springframework.kafka.KafkaException.Level, List, BinaryExceptionClassifier)}.
8083
*/
84+
@Deprecated
8185
public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
8286
MessageListenerContainer container, Runnable invokeListener, BackOff backOff,
8387
CommonErrorHandler seeker, BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer, LogAccessor logger,
8488
KafkaException.Level logLevel) {
8589

90+
retryBatch(thrownException, records, consumer, container, invokeListener, backOff, seeker, null, logger,
91+
logLevel, null, new BinaryExceptionClassifier(true));
92+
}
93+
94+
/**
95+
* Retry a complete batch by pausing the consumer and then, in a loop, poll the
96+
* consumer, wait for the next back off, then call the listener. When retries are
97+
* exhausted, call the recoverer with the {@link ConsumerRecords}.
98+
* @param thrownException the exception.
99+
* @param records the records.
100+
* @param consumer the consumer.
101+
* @param container the container.
102+
* @param invokeListener the {@link Runnable} to run (call the listener).
103+
* @param backOff the backOff.
104+
* @param seeker the common error handler that re-seeks the entire batch.
105+
* @param recoverer the recoverer.
106+
* @param logger the logger.
107+
* @param logLevel the log level.
108+
* @param retryListenersArg the retry listeners.
109+
* @param classifier the exception classifier.
110+
* @since 2.8.11
111+
*/
112+
public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
113+
MessageListenerContainer container, Runnable invokeListener, BackOff backOff,
114+
CommonErrorHandler seeker, BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer, LogAccessor logger,
115+
KafkaException.Level logLevel, @Nullable List<RetryListener> retryListenersArg,
116+
BinaryExceptionClassifier classifier) {
117+
86118
BackOffExecution execution = backOff.start();
87119
long nextBackOff = execution.nextBackOff();
88120
String failed = null;
89121
Set<TopicPartition> assignment = consumer.assignment();
90122
consumer.pause(assignment);
91-
List<RetryListener> listeners = retryListeners.get();
123+
List<RetryListener> listeners = retryListenersArg != null ? retryListenersArg : retryListeners.get();
92124
int attempt = 1;
93125
listen(listeners, records, thrownException, attempt++);
94126
ConsumerRecord<?, ?> first = records.iterator().next();
@@ -98,7 +130,8 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
98130
.publishConsumerPausedEvent(assignment, "For batch retry");
99131
}
100132
try {
101-
while (nextBackOff != BackOffExecution.STOP) {
133+
Boolean retryable = classifier.classify(unwrapIfNeeded(thrownException));
134+
while (Boolean.TRUE.equals(retryable) && nextBackOff != BackOffExecution.STOP) {
102135
consumer.poll(Duration.ZERO);
103136
try {
104137
ListenerUtils.stoppableSleep(container, nextBackOff);
@@ -171,4 +204,22 @@ public static String recordsToString(ConsumerRecords<?, ?> records) {
171204
return sb.toString();
172205
}
173206

207+
/**
208+
* Remove a {@link TimestampedException}, if present.
209+
* Remove a {@link ListenerExecutionFailedException}, if present.
210+
* @param exception the exception.
211+
* @return the unwrapped cause or cause of cause.
212+
* @since 2.8.11
213+
*/
214+
public static Exception unwrapIfNeeded(Exception exception) {
215+
Exception theEx = exception;
216+
if (theEx instanceof TimestampedException && theEx.getCause() instanceof Exception) {
217+
theEx = (Exception) theEx.getCause();
218+
}
219+
if (theEx instanceof ListenerExecutionFailedException && theEx.getCause() instanceof Exception) {
220+
theEx = (Exception) theEx.getCause();
221+
}
222+
return theEx;
223+
}
224+
174225
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.Arrays;
1920
import java.util.HashMap;
2021
import java.util.Map;
22+
import java.util.stream.Stream;
2123

2224
import org.springframework.classify.BinaryExceptionClassifier;
2325
import org.springframework.kafka.support.converter.ConversionException;
@@ -117,6 +119,16 @@ public void setClassifications(Map<Class<? extends Throwable>, Boolean> classifi
117119
@SuppressWarnings("varargs")
118120
public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
119121
add(false, exceptionTypes);
122+
notRetryable(Arrays.stream(exceptionTypes));
123+
}
124+
125+
/**
126+
* Subclasses can override this to receive notification of configuration of not
127+
* retryable exceptions.
128+
* @param notRetryable the not retryable exceptions.
129+
* @since 2.9.3
130+
*/
131+
protected void notRetryable(Stream<Class<? extends Exception>> notRetryable) {
120132
}
121133

122134
/**

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Map;
2727
import java.util.Set;
2828
import java.util.function.BiConsumer;
29+
import java.util.stream.Stream;
2930

3031
import org.apache.kafka.clients.consumer.Consumer;
3132
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -35,6 +36,7 @@
3536
import org.apache.kafka.common.TopicPartition;
3637

3738
import org.springframework.kafka.KafkaException;
39+
import org.springframework.kafka.KafkaException.Level;
3840
import org.springframework.lang.Nullable;
3941
import org.springframework.util.backoff.BackOff;
4042

@@ -70,6 +72,39 @@ public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception
7072
this.fallbackBatchHandler = fallbackHandler;
7173
}
7274

75+
@Override
76+
public void setLogLevel(Level logLevel) {
77+
super.setLogLevel(logLevel);
78+
if (this.fallbackBatchHandler instanceof KafkaExceptionLogLevelAware) {
79+
((KafkaExceptionLogLevelAware) this.fallbackBatchHandler).setLogLevel(logLevel);
80+
}
81+
}
82+
83+
@Override
84+
protected void notRetryable(Stream<Class<? extends Exception>> notRetryable) {
85+
if (this.fallbackBatchHandler instanceof ExceptionClassifier) {
86+
notRetryable.forEach(ex -> ((ExceptionClassifier) this.fallbackBatchHandler).addNotRetryableExceptions(ex));
87+
}
88+
}
89+
90+
@Override
91+
public void setClassifications(Map<Class<? extends Throwable>, Boolean> classifications, boolean defaultValue) {
92+
super.setClassifications(classifications, defaultValue);
93+
if (this.fallbackBatchHandler instanceof ExceptionClassifier) {
94+
((ExceptionClassifier) this.fallbackBatchHandler).setClassifications(classifications, defaultValue);
95+
}
96+
}
97+
98+
@Override
99+
@Nullable
100+
public Boolean removeClassification(Class<? extends Exception> exceptionType) {
101+
Boolean removed = super.removeClassification(exceptionType);
102+
if (this.fallbackBatchHandler instanceof ExceptionClassifier) {
103+
((ExceptionClassifier) this.fallbackBatchHandler).removeClassification(exceptionType);
104+
}
105+
return removed;
106+
}
107+
73108
/**
74109
* Return the fallback batch error handler.
75110
* @return the handler.

spring-kafka/src/main/java/org/springframework/kafka/listener/RetryingBatchErrorHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
*
4444
*/
4545
@Deprecated
46-
public class RetryingBatchErrorHandler extends KafkaExceptionLogLevelAware
46+
public class RetryingBatchErrorHandler extends ExceptionClassifier
4747
implements ListenerInvokingBatchErrorHandler {
4848

4949
private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));
@@ -107,7 +107,7 @@ public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> re
107107
this.retrying.set(true);
108108
try {
109109
ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
110-
this.seeker, this.recoverer, this.logger, getLogLevel());
110+
this.seeker, this.recoverer, this.logger, getLogLevel(), null, getClassifier());
111111
}
112112
finally {
113113
this.retrying.set(false);

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerBatchTests.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.mockito.BDDMockito.willAnswer;
2525
import static org.mockito.Mockito.inOrder;
2626
import static org.mockito.Mockito.mock;
27+
import static org.mockito.Mockito.never;
2728
import static org.mockito.Mockito.times;
2829
import static org.mockito.Mockito.verify;
2930

@@ -234,6 +235,33 @@ void fallbackListener() {
234235
verify(retryListener).recovered(any(ConsumerRecords.class), any());
235236
}
236237

238+
@SuppressWarnings({ "unchecked", "rawtypes" })
239+
@Test
240+
void notRetryable() {
241+
Consumer mockConsumer = mock(Consumer.class);
242+
ConsumerRecordRecoverer recoverer = mock(ConsumerRecordRecoverer.class);
243+
DefaultErrorHandler beh = new DefaultErrorHandler(recoverer, new FixedBackOff(0, 2));
244+
beh.addNotRetryableExceptions(IllegalStateException.class);
245+
RetryListener retryListener = mock(RetryListener.class);
246+
beh.setRetryListeners(retryListener);
247+
TopicPartition tp = new TopicPartition("foo", 0);
248+
ConsumerRecords<?, ?> records = new ConsumerRecords(Collections.singletonMap(tp,
249+
List.of(new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo",
250+
new RecordHeaders(), Optional.empty()),
251+
new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo",
252+
new RecordHeaders(), Optional.empty()))));
253+
MessageListenerContainer container = mock(MessageListenerContainer.class);
254+
given(container.isRunning()).willReturn(true);
255+
beh.handleBatch(new ListenerExecutionFailedException("test", new IllegalStateException()),
256+
records, mockConsumer, container, () -> {
257+
});
258+
verify(retryListener).failedDelivery(any(ConsumerRecords.class), any(), eq(1));
259+
// no retries
260+
verify(retryListener, never()).failedDelivery(any(ConsumerRecords.class), any(), eq(2));
261+
verify(recoverer, times(2)).accept(any(), any()); // each record in batch
262+
verify(retryListener).recovered(any(ConsumerRecords.class), any());
263+
}
264+
237265
@Configuration
238266
@EnableKafka
239267
public static class Config {

0 commit comments

Comments
 (0)