Skip to content

Commit f536406

Browse files
garyrussellartembilan
authored andcommitted
GH-1388: DeadLetterPR: Don't start a Tx if allowed
Resolves #1388 (comment) Don't start a new transaction in the `DeadLetterPublishingRecoverer` if the `KafkaTemplate` allows it.
1 parent 1372405 commit f536406

File tree

4 files changed

+120
-5
lines changed

4 files changed

+120
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -217,6 +217,15 @@ public interface KafkaOperations<K, V> {
217217
*/
218218
boolean isTransactional();
219219

220+
/**
221+
* Return true if this template, when transactional, allows non-transactional operations.
222+
* @return true to allow.
223+
* @since 2.4.3
224+
*/
225+
default boolean isAllowNonTransactional() {
226+
return false;
227+
}
228+
220229
/**
221230
* A callback for executing arbitrary operations on the {@link Producer}.
222231
* @param <K> the key type.

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,11 @@ public void setAllowNonTransactional(boolean allowNonTransactional) {
192192
this.allowNonTransactional = allowNonTransactional;
193193
}
194194

195+
@Override
196+
public boolean isAllowNonTransactional() {
197+
return this.allowNonTransactional;
198+
}
199+
195200
/**
196201
* Return the producer factory used by this template.
197202
* @return the factory.

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -149,7 +149,7 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
149149
ProducerRecord<Object, Object> outRecord = createProducerRecord(record, tp, headers,
150150
deserEx == null ? null : deserEx.getData(), isKey);
151151
KafkaTemplate<Object, Object> kafkaTemplate = findTemplateForValue(outRecord.value());
152-
if (this.transactional && !kafkaTemplate.inTransaction()) {
152+
if (this.transactional && !kafkaTemplate.inTransaction() && !kafkaTemplate.isAllowNonTransactional()) {
153153
kafkaTemplate.executeInTransaction(t -> {
154154
publish(outRecord, t);
155155
return null;
@@ -235,8 +235,11 @@ private void enhanceHeaders(RecordHeaders kafkaHeaders, ConsumerRecord<?, ?> rec
235235
record.timestampType().toString().getBytes(StandardCharsets.UTF_8)));
236236
kafkaHeaders.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_FQCN,
237237
exception.getClass().getName().getBytes(StandardCharsets.UTF_8)));
238-
kafkaHeaders.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE,
239-
exception.getMessage().getBytes(StandardCharsets.UTF_8)));
238+
String message = exception.getMessage();
239+
if (message != null) {
240+
kafkaHeaders.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE,
241+
exception.getMessage().getBytes(StandardCharsets.UTF_8)));
242+
}
240243
kafkaHeaders.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE,
241244
getStackTraceAsString(exception).getBytes(StandardCharsets.UTF_8)));
242245
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.mockito.ArgumentMatchers.any;
20+
import static org.mockito.BDDMockito.given;
21+
import static org.mockito.BDDMockito.willAnswer;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.never;
24+
import static org.mockito.Mockito.verify;
25+
26+
import org.apache.kafka.clients.consumer.ConsumerRecord;
27+
import org.apache.kafka.clients.producer.ProducerRecord;
28+
import org.junit.jupiter.api.Test;
29+
30+
import org.springframework.kafka.core.KafkaOperations.OperationsCallback;
31+
import org.springframework.kafka.core.KafkaTemplate;
32+
33+
/**
34+
* @author Gary Russell
35+
* @since 2.4.3
36+
*
37+
*/
38+
public class DeadLetterPublishingRecovererTests {
39+
40+
@SuppressWarnings("unchecked")
41+
@Test
42+
void testTxNoTx() {
43+
KafkaTemplate<?, ?> template = mock(KafkaTemplate.class);
44+
given(template.isTransactional()).willReturn(true);
45+
given(template.inTransaction()).willReturn(false);
46+
given(template.isAllowNonTransactional()).willReturn(true);
47+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
48+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
49+
recoverer.accept(record, new RuntimeException());
50+
verify(template, never()).executeInTransaction(any());
51+
verify(template).send(any(ProducerRecord.class));
52+
}
53+
54+
@SuppressWarnings("unchecked")
55+
@Test
56+
void testTxExisting() {
57+
KafkaTemplate<?, ?> template = mock(KafkaTemplate.class);
58+
given(template.isTransactional()).willReturn(true);
59+
given(template.inTransaction()).willReturn(true);
60+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
61+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
62+
recoverer.accept(record, new RuntimeException());
63+
verify(template, never()).executeInTransaction(any());
64+
verify(template).send(any(ProducerRecord.class));
65+
}
66+
67+
@SuppressWarnings("unchecked")
68+
@Test
69+
void testNonTx() {
70+
KafkaTemplate<?, ?> template = mock(KafkaTemplate.class);
71+
given(template.isTransactional()).willReturn(false);
72+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
73+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
74+
recoverer.accept(record, new RuntimeException());
75+
verify(template, never()).inTransaction();
76+
verify(template, never()).executeInTransaction(any());
77+
verify(template).send(any(ProducerRecord.class));
78+
}
79+
80+
@SuppressWarnings({ "unchecked", "rawtypes" })
81+
@Test
82+
void testTxNewTx() {
83+
KafkaTemplate<?, ?> template = mock(KafkaTemplate.class);
84+
given(template.isTransactional()).willReturn(true);
85+
given(template.inTransaction()).willReturn(false);
86+
given(template.isAllowNonTransactional()).willReturn(false);
87+
willAnswer(inv -> {
88+
((OperationsCallback) inv.getArgument(0)).doInOperations(template);
89+
return null;
90+
}).given(template).executeInTransaction(any());
91+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
92+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
93+
recoverer.accept(record, new RuntimeException());
94+
verify(template).executeInTransaction(any());
95+
verify(template).send(any(ProducerRecord.class));
96+
}
97+
98+
}

0 commit comments

Comments
 (0)