Skip to content

Commit bac7166

Browse files
authored
GH-2046: Add Property to Always Check for Deser Ex
Resolves #2046 * Fix javadoc copy/paste.
1 parent 03a5e42 commit bac7166

File tree

4 files changed

+70
-2
lines changed

4 files changed

+70
-2
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2443,6 +2443,16 @@ When null, such exceptions are considered fatal and the container will stop.
24432443
|A prefix for the `client.id` consumer property.
24442444
Overrides the consumer factory `client.id` property; in a concurrent container, `-n` is added as a suffix for each consumer instance.
24452445

2446+
|[[checkDeserExWhenKeyNull]]<<checkDeserExWhenKeyNull,`checkDeserExWhenKeyNull`>>
2447+
|false
2448+
|Set to `true` to always check for a `DeserializationException` header when a `null` `key` is received.
2449+
Useful when the consumer code cannot determine that an `ErrorHandlingDeserializer` has been configured, such as when using a delegating deserializer.
2450+
2451+
|[[checkDeserExWhenValueNull]]<<checkDeserExWhenValueNull,`checkDeserExWhenValueNull`>>
2452+
|false
2453+
|Set to `true` to always check for a `DeserializationException` header when a `null` `value` is received.
2454+
Useful when the consumer code cannot determine that an `ErrorHandlingDeserializer` has been configured, such as when using a delegating deserializer.
2455+
24462456
|[[commitCallback]]<<commitCallback,`commitCallback`>>
24472457
|`null`
24482458
|When present and `syncCommits` is `false` a callback invoked after the commit completes.

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ public class ConsumerProperties {
107107

108108
private boolean fixTxOffsets;
109109

110+
private boolean checkDeserExWhenKeyNull;
111+
112+
private boolean checkDeserExWhenValueNull;
113+
110114
/**
111115
* Create properties for a container that will subscribe to the specified topics.
112116
* @param topics the topics.
@@ -466,6 +470,52 @@ public void setFixTxOffsets(boolean fixTxOffsets) {
466470
this.fixTxOffsets = fixTxOffsets;
467471
}
468472

473+
/**
474+
* Always check for a deserialization exception header with a null key.
475+
* @return true to check.
476+
* @since 2.8.1
477+
*/
478+
public boolean isCheckDeserExWhenKeyNull() {
479+
return this.checkDeserExWhenKeyNull;
480+
}
481+
482+
/**
483+
* Set to true to always check for
484+
* {@link org.springframework.kafka.support.serializer.DeserializationException}
485+
* header when a null key is received. Useful when the consumer code cannot determine
486+
* that an
487+
* {@link org.springframework.kafka.support.serializer.ErrorHandlingDeserializer} has
488+
* been configured, such as when using a delegating deserializer.
489+
* @param checkDeserExWhenKeyNull true to always check.
490+
* @since 2.8.1
491+
*/
492+
public void setCheckDeserExWhenKeyNull(boolean checkDeserExWhenKeyNull) {
493+
this.checkDeserExWhenKeyNull = checkDeserExWhenKeyNull;
494+
}
495+
496+
/**
497+
* Always check for a deserialization exception header with a null value.
498+
* @return true to check.
499+
* @since 2.8.1
500+
*/
501+
public boolean isCheckDeserExWhenValueNull() {
502+
return this.checkDeserExWhenValueNull;
503+
}
504+
505+
/**
506+
* Set to true to always check for
507+
* {@link org.springframework.kafka.support.serializer.DeserializationException}
508+
* header when a null value is received. Useful when the consumer code cannot
509+
* determine that an
510+
* {@link org.springframework.kafka.support.serializer.ErrorHandlingDeserializer} has
511+
* been configured, such as when using a delegating deserializer.
512+
* @param checkDeserExWhenValueNull true to always check.
513+
* @since 2.8.1
514+
*/
515+
public void setCheckDeserExWhenValueNull(boolean checkDeserExWhenValueNull) {
516+
this.checkDeserExWhenValueNull = checkDeserExWhenValueNull;
517+
}
518+
469519
@Override
470520
public String toString() {
471521
return "ConsumerProperties ["

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -810,8 +810,10 @@ else if (listener instanceof MessageListener) {
810810
this.logger.info(toString());
811811
}
812812
Map<String, Object> props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();
813-
this.checkNullKeyForExceptions = checkDeserializer(findDeserializerClass(props, consumerProperties, false));
814-
this.checkNullValueForExceptions = checkDeserializer(findDeserializerClass(props, consumerProperties, true));
813+
this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull()
814+
|| checkDeserializer(findDeserializerClass(props, consumerProperties, false));
815+
this.checkNullValueForExceptions = this.containerProperties.isCheckDeserExWhenValueNull()
816+
|| checkDeserializer(findDeserializerClass(props, consumerProperties, true));
815817
this.syncCommitTimeout = determineSyncCommitTimeout();
816818
if (this.containerProperties.getSyncCommitTimeout() == null) {
817819
// update the property so we can use it directly from code elsewhere

spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationIntegrationTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ void configurePreLoadedDelegates() {
5858
new DefaultKafkaConsumerFactory<String, Object>(consumerProps,
5959
new StringDeserializer(), delegating);
6060
ContainerProperties props = new ContainerProperties(DBTD_TOPIC);
61+
props.setCheckDeserExWhenKeyNull(true);
62+
props.setCheckDeserExWhenValueNull(true);
6163
props.setMessageListener(mock(MessageListener.class));
6264
KafkaMessageListenerContainer<String, Object> container = new KafkaMessageListenerContainer<>(cFact, props);
6365
container.start();
@@ -67,6 +69,10 @@ void configurePreLoadedDelegates() {
6769
assertThat(delegates).hasSize(1);
6870
assertThat(delegates.values().iterator().next()).isSameAs(testDeser);
6971
assertThat(testDeser.configured).isTrue();
72+
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.checkNullKeyForExceptions",
73+
Boolean.class)).isTrue();
74+
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.checkNullValueForExceptions",
75+
Boolean.class)).isTrue();
7076
container.stop();
7177
}
7278

0 commit comments

Comments
 (0)