Skip to content

Commit 7da8092

Browse files
bananayonggaryrussell
authored andcommitted
Add ConversionException in classifier to skip
- ConversionException should be considered as fatal like MessageConversionException - Add ConversionException in FailedRecordProcessor.classifier to skip retrying - edit reference document
1 parent 9776c42 commit 7da8092

File tree

3 files changed

+13
-1
lines changed

3 files changed

+13
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.springframework.classify.BinaryExceptionClassifier;
3030
import org.springframework.core.log.LogAccessor;
3131
import org.springframework.kafka.support.TopicPartitionOffset;
32+
import org.springframework.kafka.support.converter.ConversionException;
3233
import org.springframework.kafka.support.serializer.DeserializationException;
3334
import org.springframework.lang.Nullable;
3435
import org.springframework.messaging.converter.MessageConversionException;
@@ -141,6 +142,7 @@ public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
141142
* <ul>
142143
* <li>{@link DeserializationException}</li>
143144
* <li>{@link MessageConversionException}</li>
145+
* <li>{@link ConversionException}</li>
144146
* <li>{@link MethodArgumentResolutionException}</li>
145147
* <li>{@link NoSuchMethodException}</li>
146148
* <li>{@link ClassCastException}</li>
@@ -163,6 +165,7 @@ public void addNotRetryableException(Class<? extends Exception> exceptionType) {
163165
* <ul>
164166
* <li>{@link DeserializationException}</li>
165167
* <li>{@link MessageConversionException}</li>
168+
* <li>{@link ConversionException}</li>
166169
* <li>{@link MethodArgumentResolutionException}</li>
167170
* <li>{@link NoSuchMethodException}</li>
168171
* <li>{@link ClassCastException}</li>
@@ -191,6 +194,7 @@ public final void addNotRetryableExceptions(Class<? extends Exception>... except
191194
* <ul>
192195
* <li>{@link DeserializationException}</li>
193196
* <li>{@link MessageConversionException}</li>
197+
* <li>{@link ConversionException}</li>
194198
* <li>{@link MethodArgumentResolutionException}</li>
195199
* <li>{@link NoSuchMethodException}</li>
196200
* <li>{@link ClassCastException}</li>
@@ -234,6 +238,7 @@ private static ExtendedBinaryExceptionClassifier configureDefaultClassifier() {
234238
Map<Class<? extends Throwable>, Boolean> classified = new HashMap<>();
235239
classified.put(DeserializationException.class, false);
236240
classified.put(MessageConversionException.class, false);
241+
classified.put(ConversionException.class, false);
237242
classified.put(MethodArgumentResolutionException.class, false);
238243
classified.put(NoSuchMethodException.class, false);
239244
classified.put(ClassCastException.class, false);

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentErrorHandlerTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.mockito.InOrder;
3838

3939
import org.springframework.kafka.KafkaException;
40+
import org.springframework.kafka.support.converter.ConversionException;
4041
import org.springframework.kafka.support.serializer.DeserializationException;
4142

4243
/**
@@ -68,6 +69,10 @@ consumer, mock(MessageListenerContainer.class)))
6869
handler.handle(new DeserializationException("intended", null, false, illegalState), records,
6970
consumer, mock(MessageListenerContainer.class));
7071
assertThat(recovered.get()).isSameAs(record1);
72+
recovered.set(null);
73+
handler.handle(new ConversionException("intended", null), records,
74+
consumer, mock(MessageListenerContainer.class));
75+
assertThat(recovered.get()).isSameAs(record1);
7176
handler.addNotRetryableExceptions(IllegalStateException.class);
7277
recovered.set(null);
7378
recovererShouldFail.set(true);
@@ -77,7 +82,7 @@ consumer, mock(MessageListenerContainer.class)))
7782
assertThat(recovered.get()).isSameAs(record1);
7883
InOrder inOrder = inOrder(consumer);
7984
inOrder.verify(consumer).seek(new TopicPartition("foo", 0), 0L); // not recovered so seek
80-
inOrder.verify(consumer, times(2)).seek(new TopicPartition("foo", 1), 1L);
85+
inOrder.verify(consumer, times(3)).seek(new TopicPartition("foo", 1), 1L);
8186
inOrder.verify(consumer).seek(new TopicPartition("foo", 0), 0L); // recovery failed
8287
inOrder.verify(consumer, times(2)).seek(new TopicPartition("foo", 1), 1L);
8388
inOrder.verifyNoMoreInteractions();

src/reference/asciidoc/kafka.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4611,6 +4611,7 @@ The exceptions that are considered fatal, by default, are:
46114611

46124612
* `DeserializationException`
46134613
* `MessageConversionException`
4614+
* `ConversionException`
46144615
* `MethodArgumentResolutionException`
46154616
* `NoSuchMethodException`
46164617
* `ClassCastException`
@@ -4863,6 +4864,7 @@ The exceptions that are considered fatal, by default, are:
48634864

48644865
* `DeserializationException`
48654866
* `MessageConversionException`
4867+
* `ConversionException`
48664868
* `MethodArgumentResolutionException`
48674869
* `NoSuchMethodException`
48684870
* `ClassCastException`

0 commit comments

Comments
 (0)