Skip to content

Commit 5aff841

Browse files
committed
GH-1494: Fix NPE in DeadLetterPublishingRecoverer
Resolves #1494
1 parent 96b4369 commit 5aff841

File tree

3 files changed

+59
-6
lines changed

3 files changed

+59
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ public DeadLetterPublishingRecoverer(KafkaTemplate<? extends Object, ? extends O
9898
* original topic. The templates map keys are classes and the value the corresponding
9999
* template to use for objects (producer record values) of that type. A
100100
* {@link java.util.LinkedHashMap} is recommended when there is more than one
101-
* template, to ensure the map is traversed in order.
101+
* template, to ensure the map is traversed in order. To send records with a null
102+
* value, add a template with the {@link Void} class as a key; otherwise the first
103+
* template from the map values iterator will be used.
102104
* @param templates the {@link KafkaTemplate}s to use for publishing.
103105
*/
104106
public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaTemplate<? extends Object, ? extends Object>> templates) {
@@ -112,7 +114,9 @@ public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaTemplate<? extends Objec
112114
* 0, no partition is set when publishing to the topic. The templates map keys are
113115
* classes and the value the corresponding template to use for objects (producer
114116
* record values) of that type. A {@link java.util.LinkedHashMap} is recommended when
115-
* there is more than one template, to ensure the map is traversed in order.
117+
* there is more than one template, to ensure the map is traversed in order. To send
118+
* records with a null value, add a template with the {@link Void} class as a key;
119+
* otherwise the first template from the map values iterator will be used.
116120
* @param templates the {@link KafkaTemplate}s to use for publishing.
117121
* @param destinationResolver the resolving function.
118122
*/
@@ -161,10 +165,19 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
161165
}
162166

163167
@SuppressWarnings("unchecked")
164-
private KafkaTemplate<Object, Object> findTemplateForValue(Object value) {
168+
private KafkaTemplate<Object, Object> findTemplateForValue(@Nullable Object value) {
165169
if (this.template != null) {
166170
return this.template;
167171
}
172+
if (value == null) {
173+
KafkaOperations<?, ?> operations = this.templates.get(Void.class);
174+
if (operations == null) {
175+
return (KafkaTemplate<Object, Object>) this.templates.values().iterator().next();
176+
}
177+
else {
178+
return (KafkaTemplate<Object, Object>) operations;
179+
}
180+
}
168181
Optional<Class<?>> key = this.templates.keySet()
169182
.stream()
170183
.filter((k) -> k.isAssignableFrom(value.getClass()))

spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,16 @@
2323
import static org.mockito.Mockito.never;
2424
import static org.mockito.Mockito.verify;
2525

26+
import java.util.LinkedHashMap;
27+
import java.util.Map;
28+
2629
import org.apache.kafka.clients.consumer.ConsumerRecord;
2730
import org.apache.kafka.clients.producer.ProducerRecord;
2831
import org.junit.jupiter.api.Test;
2932

3033
import org.springframework.kafka.core.KafkaOperations.OperationsCallback;
3134
import org.springframework.kafka.core.KafkaTemplate;
35+
import org.springframework.util.concurrent.SettableListenableFuture;
3236

3337
/**
3438
* @author Gary Russell
@@ -37,38 +41,41 @@
3741
*/
3842
public class DeadLetterPublishingRecovererTests {
3943

40-
@SuppressWarnings("unchecked")
44+
@SuppressWarnings({ "unchecked", "rawtypes" })
4145
@Test
4246
void testTxNoTx() {
4347
KafkaTemplate<?, ?> template = mock(KafkaTemplate.class);
4448
given(template.isTransactional()).willReturn(true);
4549
given(template.inTransaction()).willReturn(false);
4650
given(template.isAllowNonTransactional()).willReturn(true);
51+
given(template.send(any(ProducerRecord.class))).willReturn(new SettableListenableFuture());
4752
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
4853
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
4954
recoverer.accept(record, new RuntimeException());
5055
verify(template, never()).executeInTransaction(any());
5156
verify(template).send(any(ProducerRecord.class));
5257
}
5358

54-
@SuppressWarnings("unchecked")
59+
@SuppressWarnings({ "unchecked", "rawtypes" })
5560
@Test
5661
void testTxExisting() {
5762
KafkaTemplate<?, ?> template = mock(KafkaTemplate.class);
5863
given(template.isTransactional()).willReturn(true);
5964
given(template.inTransaction()).willReturn(true);
65+
given(template.send(any(ProducerRecord.class))).willReturn(new SettableListenableFuture());
6066
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
6167
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
6268
recoverer.accept(record, new RuntimeException());
6369
verify(template, never()).executeInTransaction(any());
6470
verify(template).send(any(ProducerRecord.class));
6571
}
6672

67-
@SuppressWarnings("unchecked")
73+
@SuppressWarnings({ "unchecked", "rawtypes" })
6874
@Test
6975
void testNonTx() {
7076
KafkaTemplate<?, ?> template = mock(KafkaTemplate.class);
7177
given(template.isTransactional()).willReturn(false);
78+
given(template.send(any(ProducerRecord.class))).willReturn(new SettableListenableFuture());
7279
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
7380
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
7481
recoverer.accept(record, new RuntimeException());
@@ -88,11 +95,42 @@ void testTxNewTx() {
8895
((OperationsCallback) inv.getArgument(0)).doInOperations(template);
8996
return null;
9097
}).given(template).executeInTransaction(any());
98+
given(template.send(any(ProducerRecord.class))).willReturn(new SettableListenableFuture());
9199
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
92100
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
93101
recoverer.accept(record, new RuntimeException());
94102
verify(template).executeInTransaction(any());
95103
verify(template).send(any(ProducerRecord.class));
96104
}
97105

106+
@SuppressWarnings({ "unchecked", "rawtypes" })
107+
@Test
108+
void tombstoneWithMultiTemplates() {
109+
KafkaTemplate<?, ?> template1 = mock(KafkaTemplate.class);
110+
given(template1.send(any(ProducerRecord.class))).willReturn(new SettableListenableFuture());
111+
KafkaTemplate<?, ?> template2 = mock(KafkaTemplate.class);
112+
Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
113+
templates.put(String.class, template1);
114+
templates.put(Integer.class, template2);
115+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(templates);
116+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", null);
117+
recoverer.accept(record, new RuntimeException());
118+
verify(template1).send(any(ProducerRecord.class));
119+
}
120+
121+
@SuppressWarnings({ "unchecked", "rawtypes" })
122+
@Test
123+
void tombstoneWithMultiTemplatesExplicit() {
124+
KafkaTemplate<?, ?> template1 = mock(KafkaTemplate.class);
125+
KafkaTemplate<?, ?> template2 = mock(KafkaTemplate.class);
126+
given(template2.send(any(ProducerRecord.class))).willReturn(new SettableListenableFuture());
127+
Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
128+
templates.put(String.class, template1);
129+
templates.put(Void.class, template2);
130+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(templates);
131+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", null);
132+
recoverer.accept(record, new RuntimeException());
133+
verify(template2).send(any(ProducerRecord.class));
134+
}
135+
98136
}

src/reference/asciidoc/kafka.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3944,6 +3944,8 @@ public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplat
39443944
The publisher uses the map keys to locate a template that is suitable for the `value()` about to be published.
39453945
A `LinkedHashMap` is recommended so that the keys are examined in order.
39463946

3947+
When publishing `null` values, when there are multiple templates, the recoverer will look for a template for the `Void` class; if none is present, the first template from the `values().iterator()` will be used.
3948+
39473949
IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery.
39483950

39493951
Starting with version 2.3, the recoverer can also be used with Kafka Streams - see <<streams-deser-recovery>> for more information.

0 commit comments

Comments
 (0)