Skip to content

Commit adb2f9a

Browse files
garyrussellartembilan
authored andcommitted
GH-1287: Detect mis-configured deserialization
Resolves #1287 Throw an `IllegalStateException` in the `SeekToCurrentErrorHandler` if the root exception is a `SerializationException`. Handling of deserializatin problms requires the configuration of an `ErrorHandlingDeserializer2`. **cherry-pick to 2.2.x, 2.1.x** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java # spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentErrorHandlerTests.java # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java
1 parent 36fdb2d commit adb2f9a

File tree

1 file changed

+8
-0
lines changed

1 file changed

+8
-0
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.clients.consumer.Consumer;
2424
import org.apache.kafka.clients.consumer.ConsumerRecord;
2525
import org.apache.kafka.common.TopicPartition;
26+
import org.apache.kafka.common.errors.SerializationException;
2627

2728
import org.springframework.kafka.KafkaException;
2829

@@ -40,6 +41,13 @@ public class SeekToCurrentErrorHandler implements ContainerAwareErrorHandler {
4041
@Override
4142
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records,
4243
Consumer<?, ?> consumer, MessageListenerContainer container) {
44+
45+
if (thrownException instanceof SerializationException) {
46+
throw new IllegalStateException("This error handler cannot process 'SerializationException's directly, "
47+
+ "please consider configuring an 'ErrorHandlingDeserializer2' in the value and/or key "
48+
+ "deserializer", thrownException);
49+
}
50+
4351
Map<TopicPartition, Long> offsets = new LinkedHashMap<>();
4452
records.forEach(r ->
4553
offsets.computeIfAbsent(new TopicPartition(r.topic(), r.partition()), k -> r.offset()));

0 commit comments

Comments
 (0)