Skip to content

Commit 07b6240

Browse files
authored
GH-1936: Batch Listener Handle ConversionException
Resolves #1936 Make it easier to detect and recover from conversion exceptions in batch listeners. Also add `DelegatingByTypeSerializer`. * Polishing - improve exception message, add test for DBTSerializer. * Fix test - supported types order is random.
1 parent a661060 commit 07b6240

File tree

11 files changed

+514
-29
lines changed

11 files changed

+514
-29
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4179,6 +4179,22 @@ It is not necessary to configure serializers or deserializers for these types, t
41794179

41804180
For another technique to send different types to different topics, see <<routing-template>>.
41814181

4182+
2.8 introduced the `DelegatingByTypeSerializer`.
4183+
4184+
====
4185+
[source, java]
4186+
----
4187+
@Bean
4188+
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
4189+
return new DefaultKafkaProducerFactory<>(config,
4190+
null, new DelegatingByTypeSerializer(Map.of(
4191+
byte[].class, new ByteArraySerializer(),
4192+
Bytes.class, new BytesSerializer(),
4193+
String.class, new StringSerializer())));
4194+
}
4195+
----
4196+
====
4197+
41824198
[[retrying-deserialization]]
41834199
===== Retrying Deserializer
41844200

@@ -4402,6 +4418,28 @@ consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedFooProvider.cl
44024418
----
44034419
====
44044420

4421+
When using an `ErrorHandlingDeserializer` with a batch listener, you must check for the deserialization exceptions in message headers.
4422+
When used with a `RecoveringBatchErrorHandler`, you can use that header to determine which record the exception failed on and communicate to the error handler via a `BatchListenerFailedException`.
4423+
4424+
====
4425+
[source, java]
4426+
----
4427+
@KafkaListener(id = "test", topics = "test")
4428+
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
4429+
for (int i = 0; i < in.size(); i++) {
4430+
Thing thing = in.get(i);
4431+
if (thing == null
4432+
&& headers.get(i).get(ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
4433+
4434+
throw new BatchListenerFailedException("deserialization error",
4435+
new DeserializationException("Batch listener", null, false, null), i);
4436+
}
4437+
process(foo);
4438+
}
4439+
}
4440+
----
4441+
====
4442+
44054443
[[payload-conversion-with-batch]]
44064444
===== Payload Conversion with Batch Listeners
44074445

@@ -4909,6 +4947,7 @@ The container commits any pending offset commits before calling the error handle
49094947

49104948
If you are using Spring Boot, you simply need to add the error handler as a `@Bean` and Boot will add it to the auto-configured factory.
49114949

4950+
[[default-eh]]
49124951
===== DefaultErrorHandler
49134952

49144953
This new error handler replaces the `SeekToCurrentErrorHandler` and `RecoveringBatchErrorHandler`, which have been the default error handlers for several releases now.
@@ -5068,6 +5107,31 @@ By default, the exception type is not considered.
50685107

50695108
Also see <<delivery-header>>.
50705109

5110+
[[batch-listener-conv-errors]]
5111+
==== Conversion Errors with Batch Error Handlers
5112+
5113+
Starting with version 2.8, batch listeners can now properly handle conversion errors, when using a `MessageConverter` with a `ByteArrayDeserializer`, a `BytesDeserializer` or a `StringDeserializer`, as well as a `DefaultErrorHandler`.
5114+
When a conversion error occurs, the payload is set to null and a deserialization exception is added to the record headers, similar to the `ErrorHandlingDeserializer`.
5115+
A list of `ConversionException` s is available in the listener so the listener can throw a `BatchListenerFailedException` indicating the first index at which a conversion exception occurred.
5116+
5117+
Example:
5118+
5119+
====
5120+
[source, java]
5121+
----
5122+
@KafkaListener(id = "test", topics = "topic")
5123+
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
5124+
for (int i = 0; i < in.size(); i++) {
5125+
Foo foo = in.get(i);
5126+
if (foo == null && exceptions.get(i) != null) {
5127+
throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
5128+
}
5129+
process(foo);
5130+
}
5131+
}
5132+
---
5133+
====
5134+
50715135
[[retrying-batch-eh]]
50725136
===== Retrying Batch Error Handler
50735137

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,13 @@ See <<ooo-commits>> for more information.
2020

2121
It is now possible to specify whether the listener method is a batch listener on the method itself.
2222
This allows the same container factory to be used for both record and batch listeners.
23+
2324
See <<batch-listeners>> for more information.
2425

26+
Batch listeners can now handle conversion exceptions.
27+
28+
See <<batch-listener-conv-errors>> for more information.
29+
2530
[[x28-template]]
2631
==== `KafkaTemplate` Changes
2732

spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,4 +314,12 @@ public abstract class KafkaHeaders {
314314
* @since 2.2
315315
*/
316316
public static final String ORIGINAL_TIMESTAMP_TYPE = PREFIX + "original-timestamp-type";
317+
318+
/**
319+
* The header containing a list of conversion failures (for batch listeners only).
320+
* Type: List&lt;ConversionException&gt;.
321+
* @since 2.8
322+
*/
323+
public static final String CONVERSION_FAILURES = PREFIX + "conversionFailures";
324+
317325
}

spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.lang.reflect.ParameterizedType;
2020
import java.lang.reflect.Type;
21+
import java.nio.charset.StandardCharsets;
2122
import java.util.ArrayList;
2223
import java.util.HashMap;
2324
import java.util.List;
@@ -28,6 +29,7 @@
2829
import org.apache.kafka.clients.consumer.ConsumerRecord;
2930
import org.apache.kafka.clients.producer.ProducerRecord;
3031
import org.apache.kafka.common.header.Headers;
32+
import org.apache.kafka.common.utils.Bytes;
3133

3234
import org.springframework.core.log.LogAccessor;
3335
import org.springframework.kafka.support.Acknowledgment;
@@ -36,6 +38,7 @@
3638
import org.springframework.kafka.support.KafkaHeaderMapper;
3739
import org.springframework.kafka.support.KafkaHeaders;
3840
import org.springframework.kafka.support.KafkaNull;
41+
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
3942
import org.springframework.lang.Nullable;
4043
import org.springframework.messaging.Message;
4144
import org.springframework.messaging.support.MessageBuilder;
@@ -153,6 +156,7 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
153156
List<Map<String, Object>> convertedHeaders = new ArrayList<>();
154157
List<Headers> natives = new ArrayList<>();
155158
List<ConsumerRecord<?, ?>> raws = new ArrayList<>();
159+
List<ConversionException> conversionFailures = new ArrayList<>();
156160
if (this.headerMapper != null) {
157161
rawHeaders.put(KafkaHeaders.BATCH_CONVERTED_HEADERS, convertedHeaders);
158162
}
@@ -164,12 +168,11 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
164168
}
165169
commonHeaders(acknowledgment, consumer, rawHeaders, keys, topics, partitions, offsets, timestampTypes,
166170
timestamps);
171+
rawHeaders.put(KafkaHeaders.CONVERSION_FAILURES, conversionFailures);
167172

168173
boolean logged = false;
169174
for (ConsumerRecord<?, ?> record : records) {
170-
payloads.add(this.recordConverter == null || !containerType(type)
171-
? extractAndConvertValue(record, type)
172-
: convert(record, type));
175+
payloads.add(obtainPayload(type, record, conversionFailures));
173176
keys.add(record.key());
174177
topics.add(record.topic());
175178
partitions.add(record.partition());
@@ -200,6 +203,12 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
200203
return MessageBuilder.createMessage(payloads, kafkaMessageHeaders);
201204
}
202205

206+
private Object obtainPayload(Type type, ConsumerRecord<?, ?> record, List<ConversionException> conversionFailures) {
207+
return this.recordConverter == null || !containerType(type)
208+
? extractAndConvertValue(record, type)
209+
: convert(record, type, conversionFailures);
210+
}
211+
203212
@Override
204213
public List<ProducerRecord<?, ?>> fromMessage(Message<?> message, String defaultTopic) {
205214
throw new UnsupportedOperationException();
@@ -222,11 +231,35 @@ protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type)
222231
* @param record the record.
223232
* @param type the type - must be a {@link ParameterizedType} with a single generic
224233
* type parameter.
234+
* @param conversionFailures Conversion failures.
225235
* @return the converted payload.
226236
*/
227-
protected Object convert(ConsumerRecord<?, ?> record, Type type) {
228-
return this.recordConverter
229-
.toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0]).getPayload();
237+
protected Object convert(ConsumerRecord<?, ?> record, Type type, List<ConversionException> conversionFailures) {
238+
try {
239+
Object payload = this.recordConverter
240+
.toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0]).getPayload();
241+
conversionFailures.add(null);
242+
return payload;
243+
}
244+
catch (ConversionException ex) {
245+
byte[] original = null;
246+
if (record.value() instanceof byte[]) {
247+
original = (byte[]) record.value();
248+
}
249+
else if (record.value() instanceof Bytes) {
250+
original = ((Bytes) record.value()).get();
251+
}
252+
else if (record.value() instanceof String) {
253+
original = ((String) record.value()).getBytes(StandardCharsets.UTF_8);
254+
}
255+
if (original != null) {
256+
ErrorHandlingDeserializer.deserializationException(record.headers(), original, ex, false);
257+
conversionFailures.add(ex);
258+
return null;
259+
}
260+
throw new ConversionException("The batch converter can only report conversion failures to the listener "
261+
+ "if the record.value() is byte[], Bytes, or String", ex);
262+
}
230263
}
231264

232265
/**

spring-kafka/src/main/java/org/springframework/kafka/support/converter/ConversionException.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@
3535
@SuppressWarnings("serial")
3636
public class ConversionException extends KafkaException {
3737

38-
private final ConsumerRecord<?, ?> record;
38+
private transient ConsumerRecord<?, ?> record;
3939

40-
private final List<ConsumerRecord<?, ?>> records = new ArrayList<>();
40+
private transient List<ConsumerRecord<?, ?>> records = new ArrayList<>();
4141

42-
private final Message<?> message;
42+
private transient Message<?> message;
4343

4444
/**
4545
* Construct an instance with the provided properties.
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.serializer;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
import java.util.stream.Collectors;
22+
23+
import org.apache.kafka.common.errors.SerializationException;
24+
import org.apache.kafka.common.header.Headers;
25+
import org.apache.kafka.common.serialization.Serializer;
26+
27+
import org.springframework.util.Assert;
28+
29+
/**
30+
* Delegates to a serializer based on type.
31+
*
32+
* @author Gary Russell
33+
* @since 2.8
34+
*
35+
*/
36+
public class DelegatingByTypeSerializer implements Serializer<Object> {
37+
38+
@SuppressWarnings("rawtypes")
39+
private final Map<Class<?>, Serializer> delegates = new HashMap<>();
40+
41+
/**
42+
* Construct an instance with the map of delegates.
43+
* @param delegates the delegates.
44+
*/
45+
@SuppressWarnings("rawtypes")
46+
public DelegatingByTypeSerializer(Map<Class<?>, Serializer> delegates) {
47+
Assert.notNull(delegates, "'delegates' cannot be null");
48+
Assert.noNullElements(delegates.values(), "Serializers in delegates map cannot be null");
49+
this.delegates.putAll(delegates);
50+
}
51+
52+
@SuppressWarnings("unchecked")
53+
@Override
54+
public void configure(Map<String, ?> configs, boolean isKey) {
55+
this.delegates.values().forEach(del -> del.configure(configs, isKey));
56+
}
57+
58+
@SuppressWarnings({ "rawtypes", "unchecked" })
59+
@Override
60+
public byte[] serialize(String topic, Object data) {
61+
Serializer delegate = findDelegate(data);
62+
return delegate.serialize(topic, data);
63+
}
64+
65+
@SuppressWarnings({ "unchecked", "rawtypes" })
66+
@Override
67+
public byte[] serialize(String topic, Headers headers, Object data) {
68+
Serializer delegate = findDelegate(data);
69+
return delegate.serialize(topic, headers, data);
70+
}
71+
72+
@SuppressWarnings("rawtypes")
73+
private Serializer findDelegate(Object data) {
74+
Serializer delegate = this.delegates.get(data.getClass());
75+
if (delegate == null) {
76+
throw new SerializationException("No matching delegate for type: " + data.getClass().getName()
77+
+ "; supported types: " + this.delegates.keySet().stream()
78+
.map(clazz -> clazz.getName())
79+
.collect(Collectors.toList()));
80+
}
81+
return delegate;
82+
}
83+
84+
85+
}

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -188,7 +188,7 @@ public T deserialize(String topic, Headers headers, byte[] data) {
188188
return this.delegate.deserialize(topic, headers, data);
189189
}
190190
catch (Exception e) {
191-
deserializationException(headers, data, e);
191+
deserializationException(headers, data, e, this.isForKey);
192192
return recoverFromSupplier(topic, headers, data, e);
193193
}
194194
}
@@ -211,28 +211,36 @@ public void close() {
211211
}
212212
}
213213

214-
private void deserializationException(Headers headers, byte[] data, Exception e) {
214+
/**
215+
* Populate the record headers with a serialized {@link DeserializationException}.
216+
* @param headers the headers.
217+
* @param data the data.
218+
* @param ex the exception.
219+
* @param isForKeyArg true if this is a key deserialization problem, otherwise value.
220+
* @since 2.8
221+
*/
222+
public static void deserializationException(Headers headers, byte[] data, Exception ex, boolean isForKeyArg) {
215223
ByteArrayOutputStream stream = new ByteArrayOutputStream();
216224
DeserializationException exception =
217-
new DeserializationException("failed to deserialize", data, this.isForKey, e);
225+
new DeserializationException("failed to deserialize", data, isForKeyArg, ex);
218226
try (ObjectOutputStream oos = new ObjectOutputStream(stream)) {
219227
oos.writeObject(exception);
220228
}
221-
catch (IOException ex) {
229+
catch (IOException ioex) {
222230
stream = new ByteArrayOutputStream();
223231
try (ObjectOutputStream oos = new ObjectOutputStream(stream)) {
224232
exception = new DeserializationException("failed to deserialize",
225-
data, this.isForKey, new RuntimeException("Could not deserialize type "
226-
+ e.getClass().getName() + " with message " + e.getMessage()
227-
+ " failure: " + ex.getMessage()));
233+
data, isForKeyArg, new RuntimeException("Could not deserialize type "
234+
+ ioex.getClass().getName() + " with message " + ioex.getMessage()
235+
+ " failure: " + ioex.getMessage()));
228236
oos.writeObject(exception);
229237
}
230238
catch (IOException ex2) {
231239
throw new IllegalStateException("Could not serialize a DeserializationException", ex2); // NOSONAR
232240
}
233241
}
234242
headers.add(
235-
new RecordHeader(this.isForKey
243+
new RecordHeader(isForKeyArg
236244
? KEY_DESERIALIZER_EXCEPTION_HEADER
237245
: VALUE_DESERIALIZER_EXCEPTION_HEADER,
238246
stream.toByteArray()));

0 commit comments

Comments
 (0)