Skip to content

Commit 29146ba

Browse files
authored
GH-3424: Customizing DeadLetterPublishingRecovererFactory logging (#3461)
* GH-3424: Customizing DeadLetterPublishingRecovererFactory logging Fixes: #3424 Provide the ability to customize logging in `DeadLetterPublishingRecovererFactory`. * PR review addressing Adding whats-new
1 parent 45a8167 commit 29146ba

File tree

2 files changed

+36
-22
lines changed

2 files changed

+36
-22
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,8 @@ For more details, see xref:kafka/events.adoc[Application Events] and `Concurrent
4040

4141
When using `ReplyingKafkaTemplate`, if the original record from the request contains a key, then that same key will be part of the reply as well.
4242
For more details, see xref:kafka/sending-messages.adoc[Sending Messages] section of the reference docs.
43+
44+
[[x33-customize-logging-in-DeadLetterPublishingRecovererFactory]]
45+
=== Customizing Logging in DeadLetterPublishingRecovererFactory
46+
47+
When using `DeadLetterPublishingRecovererFactory`, the user applications can override the `maybeLogListenerException` method to customize the logging behavior.

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -54,6 +54,7 @@
5454
*
5555
* @author Tomaz Fernandes
5656
* @author Gary Russell
57+
* @author Soby Chacko
5758
* @since 2.7
5859
*
5960
*/
@@ -237,6 +238,35 @@ public DeadLetterPublishingRecoverer create(String mainListenerId) {
237238
return recoverer;
238239
}
239240

241+
/**
242+
* Log the exception before sending the record in error to the retry topic.
243+
* This method can be overridden by downstream applications to customize how the error is logged.
244+
* @param exception the exception that caused the error
245+
* @param consumerRecord the original consumer record
246+
* @param nextDestination the next topic where the record goes
247+
* @since 3.3.0
248+
*/
249+
protected void maybeLogListenerException(Exception exception, ConsumerRecord<?, ?> consumerRecord, DestinationTopic nextDestination) {
250+
if (nextDestination.isDltTopic()
251+
&& !ListenerExceptionLoggingStrategy.NEVER.equals(this.loggingStrategy)) {
252+
LOGGER.error(exception, () -> getErrorMessage(consumerRecord) + " and won't be retried. "
253+
+ "Sending to DLT with name " + nextDestination.getDestinationName() + ".");
254+
}
255+
else if (nextDestination.isNoOpsTopic()
256+
&& !ListenerExceptionLoggingStrategy.NEVER.equals(this.loggingStrategy)) {
257+
LOGGER.error(exception, () -> getErrorMessage(consumerRecord) + " and won't be retried. "
258+
+ "No further action will be taken with this record.");
259+
}
260+
else if (ListenerExceptionLoggingStrategy.EACH_ATTEMPT.equals(this.loggingStrategy)) {
261+
LOGGER.error(exception, () -> getErrorMessage(consumerRecord) + ". "
262+
+ "Sending to retry topic " + nextDestination.getDestinationName() + ".");
263+
}
264+
else {
265+
LOGGER.debug(exception, () -> getErrorMessage(consumerRecord) + ". "
266+
+ "Sending to retry topic " + nextDestination.getDestinationName() + ".");
267+
}
268+
}
269+
240270
private DeadLetterPublishingRecoverer create(
241271
Function<ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver,
242272
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
@@ -271,27 +301,6 @@ private DeadLetterPublishingRecoverer create(
271301
};
272302
}
273303

274-
private void maybeLogListenerException(Exception e, ConsumerRecord<?, ?> cr, DestinationTopic nextDestination) {
275-
if (nextDestination.isDltTopic()
276-
&& !ListenerExceptionLoggingStrategy.NEVER.equals(this.loggingStrategy)) {
277-
LOGGER.error(e, () -> getErrorMessage(cr) + " and won't be retried. "
278-
+ "Sending to DLT with name " + nextDestination.getDestinationName() + ".");
279-
}
280-
else if (nextDestination.isNoOpsTopic()
281-
&& !ListenerExceptionLoggingStrategy.NEVER.equals(this.loggingStrategy)) {
282-
LOGGER.error(e, () -> getErrorMessage(cr) + " and won't be retried. "
283-
+ "No further action will be taken with this record.");
284-
}
285-
else if (ListenerExceptionLoggingStrategy.EACH_ATTEMPT.equals(this.loggingStrategy)) {
286-
LOGGER.error(e, () -> getErrorMessage(cr) + ". "
287-
+ "Sending to retry topic " + nextDestination.getDestinationName() + ".");
288-
}
289-
else {
290-
LOGGER.debug(e, () -> getErrorMessage(cr) + ". "
291-
+ "Sending to retry topic " + nextDestination.getDestinationName() + ".");
292-
}
293-
}
294-
295304
private static String getErrorMessage(ConsumerRecord<?, ?> cr) {
296305
return "Record: " + getRecordInfo(cr) + " threw an error at topic " + cr.topic();
297306
}

0 commit comments

Comments
 (0)