|
48 | 48 | */ |
49 | 49 | public class DeadLetterPublishingRecovererFactory { |
50 | 50 |
|
51 | | - private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(DeadLetterPublishingRecovererFactory.class)); |
| 51 | + private static final LogAccessor LOGGER = |
| 52 | + new LogAccessor(LogFactory.getLog(DeadLetterPublishingRecovererFactory.class)); |
52 | 53 |
|
53 | 54 | private final DestinationTopicResolver destinationTopicResolver; |
54 | 55 |
|
55 | 56 | private Consumer<DeadLetterPublishingRecoverer> recovererCustomizer = recoverer -> { }; |
56 | 57 |
|
| 58 | + private ListenerExceptionLoggingStrategy loggingStrategy = ListenerExceptionLoggingStrategy.AFTER_RETRIES_EXHAUSTED; |
| 59 | + |
57 | 60 | public DeadLetterPublishingRecovererFactory(DestinationTopicResolver destinationTopicResolver) { |
58 | 61 | this.destinationTopicResolver = destinationTopicResolver; |
59 | 62 | } |
60 | 63 |
|
| 64 | + /** |
| 65 | + * Never logs the listener exception. |
| 66 | + * The default is logging only after retries are exhausted. |
| 67 | + * @since 2.7.13 |
| 68 | + */ |
| 69 | + public void neverLogListenerException() { |
| 70 | + this.loggingStrategy = ListenerExceptionLoggingStrategy.NEVER; |
| 71 | + } |
| 72 | + |
| 73 | + /** |
| 74 | + * Logs the listener exception at each attempt. |
| 75 | + * The default is logging only after retries are exhausted. |
| 76 | + * @since 2.7.13 |
| 77 | + */ |
| 78 | + public void alwaysLogListenerException() { |
| 79 | + this.loggingStrategy = ListenerExceptionLoggingStrategy.EACH_ATTEMPT; |
| 80 | + } |
| 81 | + |
61 | 82 | @SuppressWarnings("unchecked") |
62 | 83 | public DeadLetterPublishingRecoverer create() { |
63 | 84 | DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(// NOSONAR anon. class size |
@@ -114,11 +135,45 @@ private TopicPartition resolveDestination(ConsumerRecord<?, ?> cr, Exception e) |
114 | 135 | ? "none" |
115 | 136 | : nextDestination.getDestinationName())); |
116 | 137 |
|
| 138 | + maybeLogListenerException(e, cr, nextDestination); |
| 139 | + |
117 | 140 | return nextDestination.isNoOpsTopic() |
118 | 141 | ? null |
119 | 142 | : resolveTopicPartition(cr, nextDestination); |
120 | 143 | } |
121 | 144 |
|
| 145 | + private void maybeLogListenerException(Exception e, ConsumerRecord<?, ?> cr, DestinationTopic nextDestination) { |
| 146 | + if (nextDestination.isDltTopic() |
| 147 | + && !ListenerExceptionLoggingStrategy.NEVER.equals(this.loggingStrategy)) { |
| 148 | + LOGGER.error(e, () -> getErrorMessage(cr) + " and won't be retried. " |
| 149 | + + "Sending to DLT with name " + nextDestination.getDestinationName() + "."); |
| 150 | + } |
| 151 | + else if (nextDestination.isNoOpsTopic() |
| 152 | + && !ListenerExceptionLoggingStrategy.NEVER.equals(this.loggingStrategy)) { |
| 153 | + LOGGER.error(e, () -> getErrorMessage(cr) + " and won't be retried. " |
| 154 | + + "No further action will be taken with this record."); |
| 155 | + } |
| 156 | + else if (ListenerExceptionLoggingStrategy.EACH_ATTEMPT.equals(this.loggingStrategy)) { |
| 157 | + LOGGER.error(e, () -> getErrorMessage(cr) + ". " |
| 158 | + + "Sending to retry topic " + nextDestination.getDestinationName() + "."); |
| 159 | + } |
| 160 | + else { |
| 161 | + LOGGER.debug(e, () -> getErrorMessage(cr) + ". " |
| 162 | + + "Sending to retry topic " + nextDestination.getDestinationName() + "."); |
| 163 | + } |
| 164 | + } |
| 165 | + |
| 166 | + private static String getErrorMessage(ConsumerRecord<?, ?> cr) { |
| 167 | + return "Record: " + getRecordInfo(cr) + " threw an error at topic " + cr.topic(); |
| 168 | + } |
| 169 | + |
| 170 | + private static String getRecordInfo(ConsumerRecord<?, ?> cr) { |
| 171 | + Header originalTopicHeader = cr.headers().lastHeader(KafkaHeaders.ORIGINAL_TOPIC); |
| 172 | + return String.format("topic = %s, partition = %s, offset = %s, main topic = %s", |
| 173 | + cr.topic(), cr.partition(), cr.offset(), |
| 174 | + originalTopicHeader != null ? new String(originalTopicHeader.value()) : cr.topic()); |
| 175 | + } |
| 176 | + |
122 | 177 | /** |
123 | 178 | * Creates and returns the {@link TopicPartition}, where the original record should be forwarded. |
124 | 179 | * By default, it will use the partition same as original record's partition, in the next destination topic. |
@@ -213,6 +268,23 @@ private Header getOriginaTimeStampHeader(ConsumerRecord<?, ?> consumerRecord) { |
213 | 268 | return consumerRecord.headers() |
214 | 269 | .lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP); |
215 | 270 | } |
216 | | -} |
217 | 271 |
|
| 272 | + private enum ListenerExceptionLoggingStrategy { |
| 273 | + |
| 274 | + /** |
| 275 | + * Never log the listener exception. |
| 276 | + */ |
| 277 | + NEVER, |
| 278 | + |
| 279 | + /** |
| 280 | + * Log the listener exception after each attempt. |
| 281 | + */ |
| 282 | + EACH_ATTEMPT, |
218 | 283 |
|
| 284 | + /** |
| 285 | + * Log the listener only after retries are exhausted. |
| 286 | + */ |
| 287 | + AFTER_RETRIES_EXHAUSTED |
| 288 | + |
| 289 | + } |
| 290 | +} |
0 commit comments