Skip to content

Commit ce506ea

Browse files
committed
GH-1499: Option to suppress ConsumerRecord logging
Resolves #1499 **I will do the backports because I expect many conflicts.**
1 parent 959f0f5 commit ce506ea

File tree

9 files changed

+66
-9
lines changed

9 files changed

+66
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ public class ConsumerProperties {
9797

9898
private LogIfLevelEnabled.Level commitLogLevel = LogIfLevelEnabled.Level.DEBUG;
9999

100+
private boolean onlyLogRecordMetadata;
101+
100102
private Properties kafkaConsumerProperties = new Properties();
101103

102104
private Duration authorizationExceptionRetryInterval;
@@ -350,6 +352,20 @@ public void setCommitRetries(int commitRetries) {
350352
this.commitRetries = commitRetries;
351353
}
352354

355+
public boolean isOnlyLogRecordMetadata() {
356+
return this.onlyLogRecordMetadata;
357+
}
358+
359+
/**
360+
* Set to true to only log {@code topic-partition@offset} in log messages instead
361+
* of {@code record.toString()}.
362+
* @param onlyLogRecordMetadata true to only log the topic/parrtition/offset.
363+
* @since 2.2.14
364+
*/
365+
public void setOnlyLogRecordMetadata(boolean onlyLogRecordMetadata) {
366+
this.onlyLogRecordMetadata = onlyLogRecordMetadata;
367+
}
368+
353369
@Override
354370
public String toString() {
355371
return "ConsumerProperties ["

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,10 @@ public boolean removeNotRetryableException(Class<? extends Exception> exceptionT
183183
this.failureTracker.getRecoverer().accept(records.get(0), thrownException);
184184
}
185185
catch (Exception ex) {
186-
this.logger.error(ex, () -> "Recovery of record (" + records.get(0) + ") failed");
186+
if (records.size() > 0) {
187+
this.logger.error(ex, () -> "Recovery of record ("
188+
+ ListenerUtils.recordToString(records.get(0)) + ") failed");
189+
}
187190
return NEVER_SKIP_PREDICATE;
188191
}
189192
return ALWAYS_SKIP_PREDICATE;

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class FailedRecordTracker {
6161
+ (failedRecord == null
6262
? "none"
6363
: failedRecord.getBackOffExecution())
64-
+ " exhausted for " + rec);
64+
+ " exhausted for " + ListenerUtils.recordToString(rec));
6565
};
6666
}
6767
else {

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -944,6 +944,7 @@ public boolean isLongLived() {
944944

945945
@Override
946946
public void run() {
947+
ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
947948
publishConsumerStartingEvent();
948949
this.consumerThread = Thread.currentThread();
949950
if (this.consumerSeekAwareListener != null) {
@@ -1248,7 +1249,7 @@ record = this.acks.poll();
12481249
}
12491250

12501251
private void traceAck(ConsumerRecord<K, V> record) {
1251-
this.logger.trace(() -> "Ack: " + record);
1252+
this.logger.trace(() -> "Ack: " + ListenerUtils.recordToString(record));
12521253
}
12531254

12541255
private void processAck(ConsumerRecord<K, V> record) {
@@ -1589,7 +1590,7 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
15891590
if (record == null) {
15901591
continue;
15911592
}
1592-
this.logger.trace(() -> "Processing " + record);
1593+
this.logger.trace(() -> "Processing " + ListenerUtils.recordToString(record));
15931594
try {
15941595
TransactionSupport
15951596
.setTransactionIdSuffix(zombieFenceTxIdSuffix(record.topic(), record.partition()));
@@ -1669,7 +1670,7 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
16691670
if (record == null) {
16701671
continue;
16711672
}
1672-
this.logger.trace(() -> "Processing " + record);
1673+
this.logger.trace(() -> "Processing " + ListenerUtils.recordToString(record));
16731674
doInvokeRecordListener(record, null, iterator);
16741675
if (this.nackSleep >= 0) {
16751676
handleNack(records, record);
@@ -1683,7 +1684,8 @@ private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> nextArg) {
16831684
if (this.earlyRecordInterceptor != null) {
16841685
next = this.earlyRecordInterceptor.intercept(next);
16851686
if (next == null && this.logger.isDebugEnabled()) {
1686-
this.logger.debug("RecordInterceptor returned null, skipping: " + nextArg);
1687+
this.logger.debug("RecordInterceptor returned null, skipping: "
1688+
+ ListenerUtils.recordToString(nextArg));
16871689
}
16881690
}
16891691
return next;
@@ -1788,7 +1790,8 @@ private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {
17881790
record = this.recordInterceptor.intercept(record);
17891791
}
17901792
if (record == null) {
1791-
this.logger.debug(() -> "RecordInterceptor returned null, skipping: " + recordArg);
1793+
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
1794+
+ ListenerUtils.recordToString(recordArg));
17921795
}
17931796
else {
17941797
switch (this.listenerType) {

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public final class ListenerUtils {
4545
private ListenerUtils() {
4646
}
4747

48+
private static final ThreadLocal<Boolean> LOG_METADATA_ONLY = new ThreadLocal<>();
49+
4850
public static ListenerType determineListenerType(Object listener) {
4951
Assert.notNull(listener, "Listener cannot be null");
5052
ListenerType listenerType;
@@ -101,4 +103,32 @@ public static DeserializationException getExceptionFromHeader(final ConsumerReco
101103
return null;
102104
}
103105

106+
/**
107+
* Set to true to only log record metadata.
108+
* @param onlyMeta true to only log record metadata.
109+
* @since 2.2.14
110+
* @see #recordToString(ConsumerRecord)
111+
*/
112+
public static void setLogOnlyMetadata(boolean onlyMeta) {
113+
LOG_METADATA_ONLY.set(onlyMeta);
114+
}
115+
116+
/**
117+
* Return the {@link ConsumerRecord} as a String; either {@code toString()} or
118+
* {@code topic-partition@offset}.
119+
* @param record the record.
120+
* @return the rendered record.
121+
* @since 2.2.14
122+
* @see #setLogOnlyMetadata(boolean)
123+
*/
124+
public static String recordToString(ConsumerRecord<?, ?> record) {
125+
if (Boolean.TRUE.equals(LOG_METADATA_ONLY.get())) {
126+
return record.topic() + "-" + record.partition() + "@" + record.offset();
127+
}
128+
else {
129+
return record.toString();
130+
}
131+
}
132+
104133
}
134+

spring-kafka/src/main/java/org/springframework/kafka/support/SeekUtils.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.kafka.common.TopicPartition;
2828

2929
import org.springframework.core.log.LogAccessor;
30+
import org.springframework.kafka.listener.ListenerUtils;
3031
import org.springframework.util.backoff.FixedBackOff;
3132

3233
/**
@@ -75,12 +76,12 @@ public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?>
7576
skipped.set(test);
7677
}
7778
catch (Exception ex) {
78-
logger.error(ex, "Failed to determine if this record (" + record
79+
logger.error(ex, "Failed to determine if this record (" + ListenerUtils.recordToString(record)
7980
+ ") should be recovererd, including in seeks");
8081
skipped.set(false);
8182
}
8283
if (skipped.get()) {
83-
logger.debug(() -> "Skipping seek of: " + record);
84+
logger.debug(() -> "Skipping seek of: " + ListenerUtils.recordToString(record));
8485
}
8586
}
8687
if (!recoverable || !first.get() || !skipped.get()) {

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,7 @@ public void testRecordAckAfterRecoveryMock() throws Exception {
657657
containerProps.setGroupId("grp");
658658
containerProps.setAckMode(AckMode.RECORD);
659659
containerProps.setMissingTopicsFatal(false);
660+
containerProps.setOnlyLogRecordMetadata(true);
660661
final CountDownLatch latch = new CountDownLatch(2);
661662
MessageListener<Integer, String> messageListener = spy(
662663
new MessageListener<Integer, String>() { // Cannot be lambda: Mockito doesn't mock final classes

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentErrorHandlerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class SeekToCurrentErrorHandlerTests {
4949

5050
@Test
5151
public void testClassifier() {
52+
ListenerUtils.setLogOnlyMetadata(true);
5253
AtomicReference<ConsumerRecord<?, ?>> recovered = new AtomicReference<>();
5354
AtomicBoolean recovererShouldFail = new AtomicBoolean(false);
5455
SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler((r, t) -> {

src/reference/asciidoc/whats-new.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ See its JavaDocs and <<kafka-container>> for more information.
4747
Static group membership is now supported.
4848
See <<message-listener-container>> for more information.
4949

50+
You can now suppress logging entire `ConsumerRecord` s in error, debug logs etc., by setting the `onlyLogRecordMetadata` container property to `true`.
51+
5052
==== @KafkaListener
5153

5254
The `@KafkaListener` annotation has a new property `splitIterables`; default true.

0 commit comments

Comments
 (0)