Skip to content

Commit 90c87c0

Browse files
bananayonggaryrussell
authored andcommitted
Add ConversionException in classifier to skip
Resolves #1566 - ConversionException should be considered as fatal like MessageConversionException - Add ConversionException in FailedRecordProcessor.classifier to skip retrying - edit reference document
1 parent a52f2fe commit 90c87c0

File tree

3 files changed

+12
-1
lines changed

3 files changed

+12
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.springframework.classify.BinaryExceptionClassifier;
2929
import org.springframework.core.log.LogAccessor;
3030
import org.springframework.kafka.support.TopicPartitionOffset;
31+
import org.springframework.kafka.support.converter.ConversionException;
3132
import org.springframework.kafka.support.serializer.DeserializationException;
3233
import org.springframework.lang.Nullable;
3334
import org.springframework.messaging.converter.MessageConversionException;
@@ -128,6 +129,7 @@ public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
128129
* <ul>
129130
* <li>{@link DeserializationException}</li>
130131
* <li>{@link MessageConversionException}</li>
132+
* <li>{@link ConversionException}</li>
131133
* <li>{@link MethodArgumentResolutionException}</li>
132134
* <li>{@link NoSuchMethodException}</li>
133135
* <li>{@link ClassCastException}</li>
@@ -150,6 +152,7 @@ public void addNotRetryableException(Class<? extends Exception> exceptionType) {
150152
* <ul>
151153
* <li>{@link DeserializationException}</li>
152154
* <li>{@link MessageConversionException}</li>
155+
* <li>{@link ConversionException}</li>
153156
* <li>{@link MethodArgumentResolutionException}</li>
154157
* <li>{@link NoSuchMethodException}</li>
155158
* <li>{@link ClassCastException}</li>
@@ -195,6 +198,7 @@ private static BinaryExceptionClassifier configureDefaultClassifier() {
195198
Map<Class<? extends Throwable>, Boolean> classified = new HashMap<>();
196199
classified.put(DeserializationException.class, false);
197200
classified.put(MessageConversionException.class, false);
201+
classified.put(ConversionException.class, false);
198202
classified.put(MethodArgumentResolutionException.class, false);
199203
classified.put(NoSuchMethodException.class, false);
200204
classified.put(ClassCastException.class, false);

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentErrorHandlerTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.mockito.InOrder;
3838

3939
import org.springframework.kafka.KafkaException;
40+
import org.springframework.kafka.support.converter.ConversionException;
4041
import org.springframework.kafka.support.serializer.DeserializationException;
4142

4243
/**
@@ -68,6 +69,10 @@ consumer, mock(MessageListenerContainer.class)))
6869
handler.handle(new DeserializationException("intended", null, false, illegalState), records,
6970
consumer, mock(MessageListenerContainer.class));
7071
assertThat(recovered.get()).isSameAs(record1);
72+
recovered.set(null);
73+
handler.handle(new ConversionException("intended", null), records,
74+
consumer, mock(MessageListenerContainer.class));
75+
assertThat(recovered.get()).isSameAs(record1);
7176
handler.addNotRetryableException(IllegalStateException.class);
7277
recovered.set(null);
7378
recovererShouldFail.set(true);
@@ -77,7 +82,7 @@ consumer, mock(MessageListenerContainer.class)))
7782
assertThat(recovered.get()).isSameAs(record1);
7883
InOrder inOrder = inOrder(consumer);
7984
inOrder.verify(consumer).seek(new TopicPartition("foo", 0), 0L); // not recovered so seek
80-
inOrder.verify(consumer, times(2)).seek(new TopicPartition("foo", 1), 1L);
85+
inOrder.verify(consumer, times(3)).seek(new TopicPartition("foo", 1), 1L);
8186
inOrder.verify(consumer).seek(new TopicPartition("foo", 0), 0L); // recovery failed
8287
inOrder.verify(consumer, times(2)).seek(new TopicPartition("foo", 1), 1L);
8388
inOrder.verifyNoMoreInteractions();

src/reference/asciidoc/kafka.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4600,6 +4600,7 @@ The exceptions that are considered fatal, by default, are:
46004600

46014601
* `DeserializationException`
46024602
* `MessageConversionException`
4603+
* `ConversionException`
46034604
* `MethodArgumentResolutionException`
46044605
* `NoSuchMethodException`
46054606
* `ClassCastException`
@@ -4819,6 +4820,7 @@ The exceptions that are considered fatal, by default, are:
48194820

48204821
* `DeserializationException`
48214822
* `MessageConversionException`
4823+
* `ConversionException`
48224824
* `MethodArgumentResolutionException`
48234825
* `NoSuchMethodException`
48244826
* `ClassCastException`

0 commit comments

Comments
 (0)