Skip to content

Commit 27f2f65

Browse files
committed
GH-1494: Fix NPE in DeadLetterPublishingRecoverer
Resolves #1494
1 parent a3c8fc0 commit 27f2f65

File tree

3 files changed

+95
-6
lines changed

3 files changed

+95
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -98,7 +98,9 @@ public DeadLetterPublishingRecoverer(KafkaTemplate<? extends Object, ? extends O
9898
* original topic. The templates map keys are classes and the value the corresponding
9999
* template to use for objects (producer record values) of that type. A
100100
* {@link java.util.LinkedHashMap} is recommended when there is more than one
101-
* template, to ensure the map is traversed in order.
101+
* template, to ensure the map is traversed in order. To send records with a null
102+
* value, add a template with the {@link Void} class as a key; otherwise the first
103+
* template from the map values iterator will be used.
102104
* @param templates the {@link KafkaTemplate}s to use for publishing.
103105
*/
104106
public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaTemplate<? extends Object, ? extends Object>> templates) {
@@ -112,7 +114,9 @@ public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaTemplate<? extends Objec
112114
* 0, no partition is set when publishing to the topic. The templates map keys are
113115
* classes and the value the corresponding template to use for objects (producer
114116
* record values) of that type. A {@link java.util.LinkedHashMap} is recommended when
115-
* there is more than one template, to ensure the map is traversed in order.
117+
* there is more than one template, to ensure the map is traversed in order. To send
118+
* records with a null value, add a template with the {@link Void} class as a key;
119+
* otherwise the first template from the map values iterator will be used.
116120
* @param templates the {@link KafkaTemplate}s to use for publishing.
117121
* @param destinationResolver the resolving function.
118122
*/
@@ -161,10 +165,19 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
161165
}
162166

163167
@SuppressWarnings("unchecked")
164-
private KafkaTemplate<Object, Object> findTemplateForValue(Object value) {
168+
private KafkaTemplate<Object, Object> findTemplateForValue(@Nullable Object value) {
165169
if (this.template != null) {
166170
return this.template;
167171
}
172+
if (value == null) {
173+
KafkaOperations<?, ?> operations = this.templates.get(Void.class);
174+
if (operations == null) {
175+
return (KafkaTemplate<Object, Object>) this.templates.values().iterator().next();
176+
}
177+
else {
178+
return (KafkaTemplate<Object, Object>) operations;
179+
}
180+
}
168181
Optional<Class<?>> key = this.templates.keySet()
169182
.stream()
170183
.filter((k) -> k.isAssignableFrom(value.getClass()))
@@ -235,8 +248,11 @@ private void enhanceHeaders(RecordHeaders kafkaHeaders, ConsumerRecord<?, ?> rec
235248
record.timestampType().toString().getBytes(StandardCharsets.UTF_8)));
236249
kafkaHeaders.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_FQCN,
237250
exception.getClass().getName().getBytes(StandardCharsets.UTF_8)));
238-
kafkaHeaders.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE,
239-
exception.getMessage().getBytes(StandardCharsets.UTF_8)));
251+
String message = exception.getMessage();
252+
if (message != null) {
253+
kafkaHeaders.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE,
254+
exception.getMessage().getBytes(StandardCharsets.UTF_8)));
255+
}
240256
kafkaHeaders.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE,
241257
getStackTraceAsString(exception).getBytes(StandardCharsets.UTF_8)));
242258
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.mockito.ArgumentMatchers.any;
20+
import static org.mockito.BDDMockito.given;
21+
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.verify;
23+
24+
import java.util.LinkedHashMap;
25+
import java.util.Map;
26+
27+
import org.apache.kafka.clients.consumer.ConsumerRecord;
28+
import org.apache.kafka.clients.producer.ProducerRecord;
29+
import org.junit.jupiter.api.Test;
30+
31+
import org.springframework.kafka.core.KafkaTemplate;
32+
import org.springframework.util.concurrent.SettableListenableFuture;
33+
34+
/**
35+
* @author Gary Russell
36+
* @since 2.3.9
37+
*
38+
*/
39+
public class DeadLetterPublishingRecovererTests {
40+
41+
@SuppressWarnings({ "unchecked", "rawtypes" })
42+
@Test
43+
void tombstoneWithMultiTemplates() {
44+
KafkaTemplate<?, ?> template1 = mock(KafkaTemplate.class);
45+
given(template1.send(any(ProducerRecord.class))).willReturn(new SettableListenableFuture());
46+
KafkaTemplate<?, ?> template2 = mock(KafkaTemplate.class);
47+
Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
48+
templates.put(String.class, template1);
49+
templates.put(Integer.class, template2);
50+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(templates);
51+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", null);
52+
recoverer.accept(record, new RuntimeException());
53+
verify(template1).send(any(ProducerRecord.class));
54+
}
55+
56+
@SuppressWarnings({ "unchecked", "rawtypes" })
57+
@Test
58+
void tombstoneWithMultiTemplatesExplicit() {
59+
KafkaTemplate<?, ?> template1 = mock(KafkaTemplate.class);
60+
KafkaTemplate<?, ?> template2 = mock(KafkaTemplate.class);
61+
given(template2.send(any(ProducerRecord.class))).willReturn(new SettableListenableFuture());
62+
Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
63+
templates.put(String.class, template1);
64+
templates.put(Void.class, template2);
65+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(templates);
66+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", null);
67+
recoverer.accept(record, new RuntimeException());
68+
verify(template2).send(any(ProducerRecord.class));
69+
}
70+
71+
}

src/reference/asciidoc/kafka.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3830,6 +3830,8 @@ public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplat
38303830
The publisher uses the map keys to locate a template that is suitable for the `value()` about to be published.
38313831
A `LinkedHashMap` is recommended so that the keys are examined in order.
38323832

3833+
When publishing `null` values, when there are multiple templates, the recoverer will look for a template for the `Void` class; if none is present, the first template from the `values().iterator()` will be used.
3834+
38333835
IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery.
38343836

38353837
Starting with version 2.3, the recoverer can also be used with Kafka Streams - see <<streams-deser-recovery>> for more information.

0 commit comments

Comments
 (0)