Skip to content

Commit 803725d

Browse files
committed
GH-1499: Option to suppress ConsumerRecord logging
Resolves #1499 **I will do the backports because I expect many conflicts.**
1 parent 7f793f4 commit 803725d

File tree

9 files changed

+65
-9
lines changed

9 files changed

+65
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ public class ConsumerProperties {
9797

9898
private LogIfLevelEnabled.Level commitLogLevel = LogIfLevelEnabled.Level.DEBUG;
9999

100+
private boolean onlyLogRecordMetadata;
101+
100102
private Properties kafkaConsumerProperties = new Properties();
101103

102104
private Duration authorizationExceptionRetryInterval;
@@ -350,6 +352,20 @@ public void setCommitRetries(int commitRetries) {
350352
this.commitRetries = commitRetries;
351353
}
352354

355+
public boolean isOnlyLogRecordMetadata() {
356+
return this.onlyLogRecordMetadata;
357+
}
358+
359+
/**
360+
* Set to true to only log {@code topic-partition@offset} in log messages instead
361+
* of {@code record.toString()}.
362+
* @param onlyLogRecordMetadata true to only log the topic/parrtition/offset.
363+
* @since 2.2.14
364+
*/
365+
public void setOnlyLogRecordMetadata(boolean onlyLogRecordMetadata) {
366+
this.onlyLogRecordMetadata = onlyLogRecordMetadata;
367+
}
368+
353369
@Override
354370
public String toString() {
355371
return "ConsumerProperties ["

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,10 @@ public boolean removeNotRetryableException(Class<? extends Exception> exceptionT
183183
this.failureTracker.getRecoverer().accept(records.get(0), thrownException);
184184
}
185185
catch (Exception ex) {
186-
this.logger.error(ex, () -> "Recovery of record (" + records.get(0) + ") failed");
186+
if (records.size() > 0) {
187+
this.logger.error(ex, () -> "Recovery of record ("
188+
+ ListenerUtils.recordToString(records.get(0)) + ") failed");
189+
}
187190
return NEVER_SKIP_PREDICATE;
188191
}
189192
return ALWAYS_SKIP_PREDICATE;

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class FailedRecordTracker {
6161
+ (failedRecord == null
6262
? "none"
6363
: failedRecord.getBackOffExecution())
64-
+ " exhausted for " + rec);
64+
+ " exhausted for " + ListenerUtils.recordToString(rec));
6565
};
6666
}
6767
else {

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -942,6 +942,7 @@ public boolean isLongLived() {
942942

943943
@Override
944944
public void run() {
945+
ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
945946
publishConsumerStartingEvent();
946947
this.consumerThread = Thread.currentThread();
947948
if (this.consumerSeekAwareListener != null) {
@@ -1245,7 +1246,7 @@ record = this.acks.poll();
12451246
}
12461247

12471248
private void traceAck(ConsumerRecord<K, V> record) {
1248-
this.logger.trace(() -> "Ack: " + record);
1249+
this.logger.trace(() -> "Ack: " + ListenerUtils.recordToString(record));
12491250
}
12501251

12511252
private void processAck(ConsumerRecord<K, V> record) {
@@ -1586,7 +1587,7 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
15861587
if (record == null) {
15871588
continue;
15881589
}
1589-
this.logger.trace(() -> "Processing " + record);
1590+
this.logger.trace(() -> "Processing " + ListenerUtils.recordToString(record));
15901591
try {
15911592
TransactionSupport
15921593
.setTransactionIdSuffix(zombieFenceTxIdSuffix(record.topic(), record.partition()));
@@ -1666,7 +1667,7 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
16661667
if (record == null) {
16671668
continue;
16681669
}
1669-
this.logger.trace(() -> "Processing " + record);
1670+
this.logger.trace(() -> "Processing " + ListenerUtils.recordToString(record));
16701671
doInvokeRecordListener(record, null, iterator);
16711672
if (this.nackSleep >= 0) {
16721673
handleNack(records, record);
@@ -1680,7 +1681,8 @@ private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> nextArg) {
16801681
if (this.earlyRecordInterceptor != null) {
16811682
next = this.earlyRecordInterceptor.intercept(next);
16821683
if (next == null && this.logger.isDebugEnabled()) {
1683-
this.logger.debug("RecordInterceptor returned null, skipping: " + nextArg);
1684+
this.logger.debug("RecordInterceptor returned null, skipping: "
1685+
+ ListenerUtils.recordToString(nextArg));
16841686
}
16851687
}
16861688
return next;
@@ -1785,7 +1787,8 @@ private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {
17851787
record = this.recordInterceptor.intercept(record);
17861788
}
17871789
if (record == null) {
1788-
this.logger.debug(() -> "RecordInterceptor returned null, skipping: " + recordArg);
1790+
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
1791+
+ ListenerUtils.recordToString(recordArg));
17891792
}
17901793
else {
17911794
switch (this.listenerType) {

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ private ListenerUtils() {
4646
super();
4747
}
4848

49+
private static final ThreadLocal<Boolean> LOG_METADATA_ONLY = new ThreadLocal<>();
50+
4951
public static ListenerType determineListenerType(Object listener) {
5052
Assert.notNull(listener, "Listener cannot be null");
5153
ListenerType listenerType;
@@ -102,4 +104,31 @@ public static DeserializationException getExceptionFromHeader(final ConsumerReco
102104
return null;
103105
}
104106

107+
/**
108+
* Set to true to only log record metadata.
109+
* @param onlyMeta true to only log record metadata.
110+
* @since 2.2.14
111+
* @see #recordToString(ConsumerRecord)
112+
*/
113+
public static void setLogOnlyMetadata(boolean onlyMeta) {
114+
LOG_METADATA_ONLY.set(onlyMeta);
115+
}
116+
117+
/**
118+
* Return the {@link ConsumerRecord} as a String; either {@code toString()} or
119+
* {@code topic-partition@offset}.
120+
* @param record the record.
121+
* @return the rendered record.
122+
* @since 2.2.14
123+
* @see #setLogOnlyMetadata(boolean)
124+
*/
125+
public static String recordToString(ConsumerRecord<?, ?> record) {
126+
if (Boolean.TRUE.equals(LOG_METADATA_ONLY.get())) {
127+
return record.topic() + "-" + record.partition() + "@" + record.offset();
128+
}
129+
else {
130+
return record.toString();
131+
}
132+
}
133+
105134
}

spring-kafka/src/main/java/org/springframework/kafka/support/SeekUtils.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.kafka.common.TopicPartition;
2828

2929
import org.springframework.core.log.LogAccessor;
30+
import org.springframework.kafka.listener.ListenerUtils;
3031
import org.springframework.util.backoff.FixedBackOff;
3132

3233
/**
@@ -76,12 +77,12 @@ public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?>
7677
skipped.set(test);
7778
}
7879
catch (Exception ex) {
79-
logger.error(ex, "Failed to determine if this record (" + record
80+
logger.error(ex, "Failed to determine if this record (" + ListenerUtils.recordToString(record)
8081
+ ") should be recovererd, including in seeks");
8182
skipped.set(false);
8283
}
8384
if (skipped.get()) {
84-
logger.debug(() -> "Skipping seek of: " + record);
85+
logger.debug(() -> "Skipping seek of: " + ListenerUtils.recordToString(record));
8586
}
8687
}
8788
if (!recoverable || !first.get() || !skipped.get()) {

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,7 @@ public void testRecordAckAfterRecoveryMock() throws Exception {
654654
containerProps.setGroupId("grp");
655655
containerProps.setAckMode(AckMode.RECORD);
656656
containerProps.setMissingTopicsFatal(false);
657+
containerProps.setOnlyLogRecordMetadata(true);
657658
final CountDownLatch latch = new CountDownLatch(2);
658659
MessageListener<Integer, String> messageListener = spy(
659660
new MessageListener<Integer, String>() { // Cannot be lambda: Mockito doesn't mock final classes

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentErrorHandlerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class SeekToCurrentErrorHandlerTests {
4949

5050
@Test
5151
public void testClassifier() {
52+
ListenerUtils.setLogOnlyMetadata(true);
5253
AtomicReference<ConsumerRecord<?, ?>> recovered = new AtomicReference<>();
5354
AtomicBoolean recovererShouldFail = new AtomicBoolean(false);
5455
SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler((r, t) -> {

src/reference/asciidoc/whats-new.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ See its JavaDocs and <<kafka-container>> for more information.
8181
Static group membership is now supported.
8282
See <<message-listener-container>> for more information.
8383

84+
You can now suppress logging entire `ConsumerRecord` s in error, debug logs etc., by setting the `onlyLogRecordMetadata` container property to `true`.
85+
8486
==== @KafkaListener
8587

8688
The `@KafkaListener` annotation has a new property `splitIterables`; default true.

0 commit comments

Comments
 (0)