Skip to content

Commit 56b7756

Browse files
garyrussellartembilan
authored andcommitted
GH-2268: Fix DLPR Record Logging
Resolves #2268 2.7.x Only The DLPR was incorrectly logging the full `ProducerRecord`; later versions log the original `ConsumerRecord` using `KafkaUtils.format()`. Backport the logging changes, but use a `ThreadLocal` for the input record to avoid API changes in a patch release.
1 parent a1c2092 commit 56b7756

File tree

1 file changed

+19
-6
lines changed

1 file changed

+19
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ public class DeadLetterPublishingRecoverer implements ConsumerAwareRecordRecover
7575

7676
private static final long THIRTY = 30L;
7777

78+
private static final ThreadLocal<ConsumerRecord<?, ?>> IN_RECORD = new ThreadLocal<>();
79+
7880
private final HeaderNames headerNames = getHeaderNames();
7981

8082
private final boolean transactional;
@@ -378,7 +380,13 @@ public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consume
378380
kDeserEx == null ? null : kDeserEx.getData(), vDeserEx == null ? null : vDeserEx.getData());
379381
KafkaOperations<Object, Object> kafkaTemplate =
380382
(KafkaOperations<Object, Object>) this.templateResolver.apply(outRecord);
381-
sendOrThrow(outRecord, kafkaTemplate);
383+
try {
384+
IN_RECORD.set(record);
385+
sendOrThrow(outRecord, kafkaTemplate);
386+
}
387+
finally {
388+
IN_RECORD.remove();
389+
}
382390
}
383391

384392
private void addAndEnhanceHeaders(ConsumerRecord<?, ?> record, Exception exception,
@@ -528,11 +536,11 @@ protected void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations
528536
sendResult.addCallback(result -> {
529537
this.logger.debug(() -> "Successful dead-letter publication: " + result);
530538
}, ex -> {
531-
this.logger.error(ex, () -> "Dead-letter publication failed for: " + outRecord);
539+
this.logger.error(ex, () -> pubFailMessage(outRecord));
532540
});
533541
}
534542
catch (Exception e) {
535-
this.logger.error(e, () -> "Dead-letter publication failed for: " + outRecord);
543+
this.logger.error(e, () -> pubFailMessage(outRecord));
536544
}
537545
if (this.failIfSendResultIsError) {
538546
verifySendResult(kafkaTemplate, outRecord, sendResult);
@@ -551,20 +559,25 @@ protected void verifySendResult(KafkaOperations<Object, Object> kafkaTemplate,
551559

552560
Duration sendTimeout = determineSendTimeout(kafkaTemplate);
553561
if (sendResult == null) {
554-
throw new KafkaException("Dead-letter publication failed for: " + outRecord);
562+
throw new KafkaException(pubFailMessage(outRecord));
555563
}
556564
try {
557565
sendResult.get(sendTimeout.toMillis(), TimeUnit.MILLISECONDS);
558566
}
559567
catch (InterruptedException e) {
560568
Thread.currentThread().interrupt();
561-
throw new KafkaException("Publication failed for: " + outRecord, e);
569+
throw new KafkaException(pubFailMessage(outRecord));
562570
}
563571
catch (ExecutionException | TimeoutException e) {
564-
throw new KafkaException("Publication failed for: " + outRecord, e);
572+
throw new KafkaException(pubFailMessage(outRecord));
565573
}
566574
}
567575

576+
private String pubFailMessage(ProducerRecord<Object, Object> outRecord) {
577+
return "Dead-letter publication to "
578+
+ outRecord.topic() + " failed for: " + KafkaUtils.format(IN_RECORD.get());
579+
}
580+
568581
/**
569582
* Determine the send timeout based on the template's producer factory and
570583
* {@link #setWaitForSendResultTimeout(Duration)}.

0 commit comments

Comments
 (0)