Skip to content

Commit 6dd162c

Browse files
committed
DeadLetterPublishingRecoverer Improvements
- make it easier to enhance headers - also polish the `ThreadLocal` in `KafkaUtils`
1 parent 7004a26 commit 6dd162c

File tree

4 files changed

+39
-13
lines changed

4 files changed

+39
-13
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,7 @@
5353
*/
5454
public class DeadLetterPublishingRecoverer implements ConsumerRecordRecoverer {
5555

56-
private static final LogAccessor LOGGER =
57-
new LogAccessor(LogFactory.getLog(DeadLetterPublishingRecoverer.class));
56+
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR
5857

5958
private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
6059
DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());
@@ -69,6 +68,8 @@ public class DeadLetterPublishingRecoverer implements ConsumerRecordRecoverer {
6968

7069
private boolean retainExceptionHeader;
7170

71+
private BiFunction<ConsumerRecord<?, ?>, Exception, Headers> headersFunction = (rec, ex) -> null;
72+
7273
/**
7374
* Create an instance with the provided template and a default destination resolving
7475
* function that returns a TopicPartition based on the original topic (appended with ".DLT")
@@ -182,15 +183,26 @@ public void setRetainExceptionHeader(boolean retainExceptionHeader) {
182183
this.retainExceptionHeader = retainExceptionHeader;
183184
}
184185

186+
/**
187+
* Set a function which will be called to obtain additional headers to add to the
188+
* published record.
189+
* @param headersFunction the headers function.
190+
* @since 2.5.4
191+
*/
192+
public void setHeadersFunction(BiFunction<ConsumerRecord<?, ?>, Exception, Headers> headersFunction) {
193+
Assert.notNull(headersFunction, "'headersFunction' cannot be null");
194+
this.headersFunction = headersFunction;
195+
}
196+
185197
@Override
186198
public void accept(ConsumerRecord<?, ?> record, Exception exception) {
187199
TopicPartition tp = this.destinationResolver.apply(record, exception);
188200
boolean isKey = false;
189201
DeserializationException deserEx = ListenerUtils.getExceptionFromHeader(record,
190-
ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER, LOGGER);
202+
ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
191203
if (deserEx == null) {
192204
deserEx = ListenerUtils.getExceptionFromHeader(record,
193-
ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER, LOGGER);
205+
ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER, this.logger);
194206
isKey = true;
195207
}
196208
Headers headers;
@@ -236,7 +248,7 @@ private KafkaOperations<Object, Object> findTemplateForValue(@Nullable Object va
236248
if (key.isPresent()) {
237249
return (KafkaOperations<Object, Object>) this.templates.get(key.get());
238250
}
239-
LOGGER.warn(() -> "Failed to find a template for " + value.getClass() + " attempting to use the last entry");
251+
this.logger.warn(() -> "Failed to find a template for " + value.getClass() + " attempting to use the last entry");
240252
return (KafkaOperations<Object, Object>) this.templates.values()
241253
.stream()
242254
.reduce((first, second) -> second)
@@ -276,13 +288,13 @@ protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?,
276288
protected void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations<Object, Object> kafkaTemplate) {
277289
try {
278290
kafkaTemplate.send(outRecord).addCallback(result -> {
279-
LOGGER.debug(() -> "Successful dead-letter publication: " + result);
291+
this.logger.debug(() -> "Successful dead-letter publication: " + result);
280292
}, ex -> {
281-
LOGGER.error(ex, () -> "Dead-letter publication failed for: " + outRecord);
293+
this.logger.error(ex, () -> "Dead-letter publication failed for: " + outRecord);
282294
});
283295
}
284296
catch (Exception e) {
285-
LOGGER.error(e, () -> "Dead-letter publication failed for: " + outRecord);
297+
this.logger.error(e, () -> "Dead-letter publication failed for: " + outRecord);
286298
}
287299
}
288300

@@ -306,6 +318,10 @@ private void enhanceHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?> record, E
306318
}
307319
kafkaHeaders.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE,
308320
getStackTraceAsString(exception).getBytes(StandardCharsets.UTF_8)));
321+
Headers headers = this.headersFunction.apply(record, exception);
322+
if (headers != null) {
323+
headers.forEach(header -> kafkaHeaders.add(header));
324+
}
309325
}
310326

311327
private String getStackTraceAsString(Throwable cause) {

spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public final class KafkaUtils {
4040
public static final boolean MICROMETER_PRESENT = ClassUtils.isPresent(
4141
"io.micrometer.core.instrument.MeterRegistry", KafkaUtils.class.getClassLoader());
4242

43-
private static ThreadLocal<String> groupIds = new ThreadLocal<>();
43+
private static final ThreadLocal<String> GROUP_IDS = new ThreadLocal<>();
4444

4545
/**
4646
* Return true if the method return type is {@link Message} or
@@ -78,7 +78,7 @@ public static boolean returnTypeMessageOrCollectionOf(Method method) {
7878
* @since 2.3
7979
*/
8080
public static void setConsumerGroupId(String groupId) {
81-
KafkaUtils.groupIds.set(groupId);
81+
KafkaUtils.GROUP_IDS.set(groupId);
8282
}
8383

8484
/**
@@ -87,15 +87,15 @@ public static void setConsumerGroupId(String groupId) {
8787
* @since 2.3
8888
*/
8989
public static String getConsumerGroupId() {
90-
return KafkaUtils.groupIds.get();
90+
return KafkaUtils.GROUP_IDS.get();
9191
}
9292

9393
/**
9494
* Clear the group id for the consumer bound to this thread.
9595
* @since 2.3
9696
*/
9797
public static void clearConsumerGroupId() {
98-
KafkaUtils.groupIds.remove();
98+
KafkaUtils.GROUP_IDS.remove();
9999
}
100100

101101
private KafkaUtils() {

spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ void valueHeaderStripped() {
124124
Headers headers = new RecordHeaders();
125125
headers.add(new RecordHeader(ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER, header(false)));
126126
headers.add(new RecordHeader(ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER, header(true)));
127+
Headers custom = new RecordHeaders();
128+
custom.add(new RecordHeader("foo", "bar".getBytes()));
129+
recoverer.setHeadersFunction((rec, ex) -> custom);
127130
willReturn(new SettableListenableFuture<Object>()).given(template).send(any(ProducerRecord.class));
128131
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, 0L, TimestampType.CREATE_TIME,
129132
0L, 0, 0, "bar", "baz", headers);
@@ -133,6 +136,7 @@ void valueHeaderStripped() {
133136
headers = captor.getValue().headers();
134137
assertThat(headers.lastHeader(ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isNull();
135138
assertThat(headers.lastHeader(ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER)).isNotNull();
139+
assertThat(headers.lastHeader("foo")).isNotNull();
136140
}
137141

138142
@SuppressWarnings({ "rawtypes", "unchecked" })

src/reference/asciidoc/kafka.adoc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4868,6 +4868,12 @@ The record sent to the dead-letter topic is enhanced with the following headers:
48684868
* `KafkaHeaders.DLT_ORIGINAL_TIMESTAMP`: The original timestamp.
48694869
* `KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE`: The original timestamp type.
48704870

4871+
There are two mechanisms to add more headers.
4872+
4873+
1. Subclass the recoverer and override `createProducerRecord()` - call `super.createProducerRecord()` and add more headers.
4874+
2. Provide a `BiFunction` to receive the consumer record and exception, returning a `Headers` object; headers from there will be copied to the final producer record.
4875+
4876+
The second is simpler to implement but the first has more information, including the already assembled standard headers.
48714877

48724878
Starting with version 2.3, when used in conjunction with an `ErrorHandlingDeserializer`, the publisher will restore the record `value()`, in the dead-letter producer record, to the original value that failed to be deserialized.
48734879
Previously, the `value()` was null and user code had to decode the `DeserializationException` from the message headers.
@@ -4903,7 +4909,7 @@ By default, these headers are not retained in the message published to the dead
49034909
In that case, the `DLT_*` headers are based on the value deserialization and the key `DeserializationException` is retained in the header.
49044910

49054911
[[kerberos]]
4906-
==== Kerberos
4912+
==== JAAS and Kerberos
49074913

49084914
Starting with version 2.0, a `KafkaJaasLoginModuleInitializer` class has been added to assist with Kerberos configuration.
49094915
You can add this bean, with the desired configuration, to your application context.

0 commit comments

Comments
 (0)