Skip to content

Commit b022f9a

Browse files
authored
GH-1605: DeadLetterPublishingRecoverer Improvement
Resolves #1605 When both the key and value fail deserialization, populate both original values in the record published to the DLT. * Fix javadocs for new headers.
1 parent f9bc419 commit b022f9a

File tree

6 files changed

+79
-34
lines changed

6 files changed

+79
-34
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -168,24 +168,21 @@ public void setHeadersFunction(BiFunction<ConsumerRecord<?, ?>, Exception, Heade
168168
@Override
169169
public void accept(ConsumerRecord<?, ?> record, Exception exception) {
170170
TopicPartition tp = this.destinationResolver.apply(record, exception);
171-
boolean isKey = false;
172-
DeserializationException deserEx = ListenerUtils.getExceptionFromHeader(record,
171+
DeserializationException vDeserEx = ListenerUtils.getExceptionFromHeader(record,
173172
ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
174-
if (deserEx == null) {
175-
deserEx = ListenerUtils.getExceptionFromHeader(record,
173+
DeserializationException kDeserEx = ListenerUtils.getExceptionFromHeader(record,
176174
ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER, this.logger);
177-
isKey = true;
175+
Headers headers = new RecordHeaders(record.headers().toArray());
176+
if (kDeserEx != null && !this.retainExceptionHeader) {
177+
headers.remove(ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER);
178+
addExceptionInfoHeaders(headers, kDeserEx, true);
178179
}
179-
Headers headers;
180-
if (deserEx == null || this.retainExceptionHeader) {
181-
headers = new RecordHeaders(record.headers().toArray());
182-
}
183-
else {
184-
headers = deserEx.getHeaders();
180+
if (vDeserEx != null && !this.retainExceptionHeader) {
181+
headers.remove(ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER);
185182
}
186183
enhanceHeaders(headers, record, exception); // NOSONAR headers are never null
187184
ProducerRecord<Object, Object> outRecord = createProducerRecord(record, tp, headers,
188-
deserEx == null ? null : deserEx.getData(), isKey);
185+
kDeserEx == null ? null : kDeserEx.getData(), vDeserEx == null ? null : vDeserEx.getData());
189186
KafkaOperations<Object, Object> kafkaTemplate = findTemplateForValue(outRecord.value());
190187
if (this.transactional && !kafkaTemplate.inTransaction() && !kafkaTemplate.isAllowNonTransactional()) {
191188
kafkaTemplate.executeInTransaction(t -> {
@@ -236,18 +233,18 @@ private KafkaOperations<Object, Object> findTemplateForValue(@Nullable Object va
236233
* @param topicPartition the {@link TopicPartition} returned by the destination
237234
* resolver.
238235
* @param headers the headers - original record headers plus DLT headers.
239-
* @param data the value to use instead of the consumer record value.
240-
* @param isKey true if key deserialization failed.
236+
* @param key the key to use instead of the consumer record key.
237+
* @param value the value to use instead of the consumer record value.
241238
* @return the producer record to send.
242239
* @see KafkaHeaders
243240
*/
244241
protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
245-
TopicPartition topicPartition, Headers headers, @Nullable byte[] data, boolean isKey) {
242+
TopicPartition topicPartition, Headers headers, @Nullable byte[] key, @Nullable byte[] value) {
246243

247244
return new ProducerRecord<>(topicPartition.topic(),
248245
topicPartition.partition() < 0 ? null : topicPartition.partition(),
249-
isKey && data != null ? data : record.key(),
250-
data == null || isKey ? record.value() : data, headers);
246+
key != null ? key : record.key(),
247+
value != null ? value : record.value(), headers);
251248
}
252249

253250
/**
@@ -280,19 +277,27 @@ private void enhanceHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?> record, E
280277
ByteBuffer.allocate(Long.BYTES).putLong(record.timestamp()).array()));
281278
kafkaHeaders.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE,
282279
record.timestampType().toString().getBytes(StandardCharsets.UTF_8)));
283-
kafkaHeaders.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_FQCN,
280+
addExceptionInfoHeaders(kafkaHeaders, exception, false);
281+
Headers headers = this.headersFunction.apply(record, exception);
282+
if (headers != null) {
283+
headers.forEach(header -> kafkaHeaders.add(header));
284+
}
285+
}
286+
287+
private void addExceptionInfoHeaders(Headers kafkaHeaders, Exception exception, boolean isKey) {
288+
kafkaHeaders.add(new RecordHeader(isKey ? KafkaHeaders.DLT_KEY_EXCEPTION_FQCN : KafkaHeaders.DLT_EXCEPTION_FQCN,
284289
exception.getClass().getName().getBytes(StandardCharsets.UTF_8)));
285290
String message = exception.getMessage();
286291
if (message != null) {
287-
kafkaHeaders.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE,
292+
kafkaHeaders.add(new RecordHeader(isKey
293+
? KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE
294+
: KafkaHeaders.DLT_EXCEPTION_MESSAGE,
288295
exception.getMessage().getBytes(StandardCharsets.UTF_8)));
289296
}
290-
kafkaHeaders.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE,
297+
kafkaHeaders.add(new RecordHeader(isKey
298+
? KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE
299+
: KafkaHeaders.DLT_EXCEPTION_STACKTRACE,
291300
getStackTraceAsString(exception).getBytes(StandardCharsets.UTF_8)));
292-
Headers headers = this.headersFunction.apply(record, exception);
293-
if (headers != null) {
294-
headers.forEach(header -> kafkaHeaders.add(header));
295-
}
296301
}
297302

298303
private String getStackTraceAsString(Throwable cause) {

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2020 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2020 the original author or authors.
2+
* Copyright 2014-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -159,6 +159,27 @@ public abstract class KafkaHeaders {
159159
*/
160160
public static final String DLT_EXCEPTION_MESSAGE = PREFIX + "dlt-exception-message";
161161

162+
/**
163+
* Exception stack trace for a record published to a dead-letter topic with a key
164+
* deserialization exception.
165+
* @since 2.7
166+
*/
167+
public static final String DLT_KEY_EXCEPTION_STACKTRACE = PREFIX + "dlt-key-exception-stacktrace";
168+
169+
/**
170+
* Exception message for a record published to a dead-letter topic with a key
171+
* deserialization exception.
172+
* @since 2.7
173+
*/
174+
public static final String DLT_KEY_EXCEPTION_MESSAGE = PREFIX + "dlt-key-exception-message";
175+
176+
/**
177+
* Exception class name for a record published sent to a dead-letter topic with a key
178+
* deserialization exception.
179+
* @since 2.7
180+
*/
181+
public static final String DLT_KEY_EXCEPTION_FQCN = PREFIX + "dlt-key-exception-fqcn";
182+
162183
/**
163184
* Original topic for a record published to a dead-letter topic.
164185
* @since 2.2

spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -43,6 +43,7 @@
4343

4444
import org.springframework.kafka.core.KafkaOperations;
4545
import org.springframework.kafka.core.KafkaOperations.OperationsCallback;
46+
import org.springframework.kafka.support.KafkaHeaders;
4647
import org.springframework.kafka.support.serializer.DeserializationException;
4748
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
4849
import org.springframework.util.concurrent.SettableListenableFuture;
@@ -130,13 +131,18 @@ void valueHeaderStripped() {
130131
willReturn(new SettableListenableFuture<Object>()).given(template).send(any(ProducerRecord.class));
131132
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, 0L, TimestampType.CREATE_TIME,
132133
0L, 0, 0, "bar", "baz", headers);
133-
recoverer.accept(record, new RuntimeException());
134+
recoverer.accept(record, new RuntimeException("testV"));
134135
ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
135136
verify(template).send(captor.capture());
136-
headers = captor.getValue().headers();
137+
ProducerRecord recovered = captor.getValue();
138+
assertThat(recovered.key()).isEqualTo("key".getBytes());
139+
assertThat(recovered.value()).isEqualTo("value".getBytes());
140+
headers = recovered.headers();
137141
assertThat(headers.lastHeader(ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isNull();
138-
assertThat(headers.lastHeader(ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER)).isNotNull();
142+
assertThat(headers.lastHeader(ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER)).isNull();
139143
assertThat(headers.lastHeader("foo")).isNotNull();
144+
assertThat(headers.lastHeader(KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE).value()).isEqualTo("testK".getBytes());
145+
assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE).value()).isEqualTo("testV".getBytes());
140146
}
141147

142148
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -209,7 +215,9 @@ void tombstoneWithMultiTemplatesExplicit() {
209215
private byte[] header(boolean isKey) {
210216
ByteArrayOutputStream baos = new ByteArrayOutputStream();
211217
try {
212-
new ObjectOutputStream(baos).writeObject(new DeserializationException("test", new byte[0], isKey, null));
218+
new ObjectOutputStream(baos).writeObject(new DeserializationException(
219+
isKey ? "testK" : "testV",
220+
isKey ? "key".getBytes() : "value".getBytes(), isKey, null));
213221
}
214222
catch (IOException e) {
215223
throw new UncheckedIOException(e);

src/reference/asciidoc/kafka.adoc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5169,6 +5169,9 @@ The record sent to the dead-letter topic is enhanced with the following headers:
51695169
* `KafkaHeaders.DLT_EXCEPTION_FQCN`: The Exception class name.
51705170
* `KafkaHeaders.DLT_EXCEPTION_STACKTRACE`: The Exception stack trace.
51715171
* `KafkaHeaders.DLT_EXCEPTION_MESSAGE`: The Exception message.
5172+
* `KafkaHeaders.DLT_KEY_EXCEPTION_FQCN`: The Exception class name (key deserialization errors only).
5173+
* `KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE`: The Exception stack trace (key deserialization errors only).
5174+
* `KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE`: The Exception message (key deserialization errors only).
51725175
* `KafkaHeaders.DLT_ORIGINAL_TOPIC`: The original topic.
51735176
* `KafkaHeaders.DLT_ORIGINAL_PARTITION`: The original partition.
51745177
* `KafkaHeaders.DLT_ORIGINAL_OFFSET`: The original offset.
@@ -5219,8 +5222,8 @@ By default, the exception type is not considered.
52195222
Starting with version 2.3, the recoverer can also be used with Kafka Streams - see <<streams-deser-recovery>> for more information.
52205223

52215224
The `ErrorHandlingDeserializer` adds the deserialization exception(s) in headers `ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER` and `ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER` (using java serialization).
5222-
By default, these headers are not retained in the message published to the dead letter topic, unless both the key and value fail deserialization.
5223-
In that case, the `DLT_*` headers are based on the value deserialization and the key `DeserializationException` is retained in the header.
5225+
By default, these headers are not retained in the message published to the dead letter topic.
5226+
Starting with version 2.7, if both the key and value fail deserialization, the original values of both are populated in the record sent to the DLT.
52245227

52255228
If incoming records are dependent on each other, but may arrive out of order, it may be useful to republish a failed record to the tail of the original topic (for some number of times), instead of sending it directly to the dead letter topic.
52265229
See https://stackoverflow.com/questions/64646996[this Stack Overflow Question] for an example.

src/reference/asciidoc/whats-new.adoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,11 @@ See <<container-props>> for more information.
2222

2323
You can now validate the payload parameter of `@KafkaHandler` methods (class-level listeners).
2424
See <<kafka-validation>> for more information.
25+
26+
[[27-dlt]]
27+
==== `DeadLetterPublishingRecover` Changes
28+
29+
Now, if both the key and value fail deserialization, the original values are published to the DLT.
30+
Previously, the value was populated but the key `DeserializationException` remained in the headers.
31+
There is a breaking API change, if you subclassed the recoverer and overrode the `createProducerRecord` method.
32+
See <<dead-letters>> for more information.

0 commit comments

Comments
 (0)