Skip to content

Commit 3294f6c

Browse files
committed
GH-1499: Option to suppress ConsumerRecord logging
Resolves #1499 **I will do the backports because I expect many conflicts.**
1 parent 423deb0 commit 3294f6c

File tree

6 files changed

+63
-7
lines changed

6 files changed

+63
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,8 @@ public enum AckMode {
222222

223223
private Properties consumerProperties;
224224

225+
private boolean onlyLogRecordMetadata;
226+
225227
/**
226228
* Create properties for a container that will subscribe to the specified topics.
227229
* @param topics the topics.
@@ -646,6 +648,20 @@ public void setConsumerProperties(Properties consumerProperties) {
646648
this.consumerProperties = consumerProperties;
647649
}
648650

651+
public boolean isOnlyLogRecordMetadata() {
652+
return this.onlyLogRecordMetadata;
653+
}
654+
655+
/**
656+
* Set to true to only log {@code topic-partition@offset} in log messages instead
657+
* of {@code record.toString()}.
658+
* @param onlyLogRecordMetadata true to only log the topic/parrtition/offset.
659+
* @since 2.2.14
660+
*/
661+
public void setOnlyLogRecordMetadata(boolean onlyLogRecordMetadata) {
662+
this.onlyLogRecordMetadata = onlyLogRecordMetadata;
663+
}
664+
649665
@Override
650666
public String toString() {
651667
return "ContainerProperties ["

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ class FailedRecordTracker {
4848

4949
FailedRecordTracker(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, int maxFailures, Log logger) {
5050
if (recoverer == null) {
51-
this.recoverer = (r, t) -> logger.error("Max failures (" + maxFailures + ") reached for: " + r, t);
51+
this.recoverer = (r, t) -> logger.error("Max failures (" + maxFailures + ") reached for: "
52+
+ ListenerUtils.recordToString(r), t);
5253
}
5354
else {
5455
this.recoverer = recoverer;

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,7 @@ public boolean isLongLived() {
692692

693693
@Override
694694
public void run() {
695+
ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
695696
this.consumerThread = Thread.currentThread();
696697
if (this.genericListener instanceof ConsumerSeekAware) {
697698
((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
@@ -911,7 +912,7 @@ private void handleAcks() {
911912
ConsumerRecord<K, V> record = this.acks.poll();
912913
while (record != null) {
913914
if (this.logger.isTraceEnabled()) {
914-
this.logger.trace("Ack: " + record);
915+
this.logger.trace("Ack: " + ListenerUtils.recordToString(record));
915916
}
916917
processAck(record);
917918
record = this.acks.poll();
@@ -1172,7 +1173,7 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
11721173
while (iterator.hasNext()) {
11731174
final ConsumerRecord<K, V> record = iterator.next();
11741175
if (this.logger.isTraceEnabled()) {
1175-
this.logger.trace("Processing " + record);
1176+
this.logger.trace("Processing " + ListenerUtils.recordToString(record));
11761177
}
11771178
try {
11781179
TransactionSupport
@@ -1242,7 +1243,7 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
12421243
while (iterator.hasNext()) {
12431244
final ConsumerRecord<K, V> record = iterator.next();
12441245
if (this.logger.isTraceEnabled()) {
1245-
this.logger.trace("Processing " + record);
1246+
this.logger.trace("Processing " + ListenerUtils.recordToString(record));
12461247
}
12471248
doInvokeRecordListener(record, null, iterator);
12481249
}
@@ -1316,7 +1317,8 @@ record = this.recordInterceptor.intercept(record);
13161317
}
13171318
if (record == null) {
13181319
if (this.logger.isDebugEnabled()) {
1319-
this.logger.debug("RecordInterceptor returned null, skipping: " + recordArg);
1320+
this.logger.debug("RecordInterceptor returned null, skipping: "
1321+
+ ListenerUtils.recordToString(recordArg));
13201322
}
13211323
}
13221324
else {

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import org.apache.kafka.clients.consumer.ConsumerRecord;
20+
1921
import org.springframework.util.Assert;
2022

2123
/**
@@ -31,6 +33,8 @@ private ListenerUtils() {
3133
super();
3234
}
3335

36+
private static final ThreadLocal<Boolean> LOG_METADATA_ONLY = new ThreadLocal<>();
37+
3438
public static ListenerType determineListenerType(Object listener) {
3539
Assert.notNull(listener, "Listener cannot be null");
3640
ListenerType listenerType;
@@ -55,4 +59,31 @@ else if (listener instanceof GenericMessageListener) {
5559
return listenerType;
5660
}
5761

62+
/**
63+
* Set to true to only log record metadata.
64+
* @param onlyMeta true to only log record metadata.
65+
* @since 2.2.14
66+
* @see #recordToString(ConsumerRecord)
67+
*/
68+
public static void setLogOnlyMetadata(boolean onlyMeta) {
69+
LOG_METADATA_ONLY.set(onlyMeta);
70+
}
71+
72+
/**
73+
* Return the {@link ConsumerRecord} as a String; either {@code toString()} or
74+
* {@code topic-partition@offset}.
75+
* @param record the record.
76+
* @return the rendered record.
77+
* @since 2.2.14
78+
* @see #setLogOnlyMetadata(boolean)
79+
*/
80+
public static String recordToString(ConsumerRecord<?, ?> record) {
81+
if (Boolean.TRUE.equals(LOG_METADATA_ONLY.get())) {
82+
return record.topic() + "-" + record.partition() + "@" + record.offset();
83+
}
84+
else {
85+
return record.toString();
86+
}
87+
}
88+
5889
}

spring-kafka/src/main/java/org/springframework/kafka/support/SeekUtils.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.kafka.clients.consumer.ConsumerRecord;
2828
import org.apache.kafka.common.TopicPartition;
2929

30+
import org.springframework.kafka.listener.ListenerUtils;
31+
3032
/**
3133
* Seek utilities.
3234
*
@@ -68,11 +70,13 @@ public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?>
6870
skipped.set(test);
6971
}
7072
catch (Exception ex) {
71-
logger.error("Failed to determine if this record should be recovererd, including in seeks", ex);
73+
logger.error("Failed to determine if this record ("
74+
+ ListenerUtils.recordToString(record)
75+
+ ") should be recovererd, including in seeks", ex);
7276
skipped.set(false);
7377
}
7478
if (skipped.get() && logger.isDebugEnabled()) {
75-
logger.debug("Skipping seek of: " + record);
79+
logger.debug("Skipping seek of: " + ListenerUtils.recordToString(record));
7680
}
7781
}
7882
if (!recoverable || !first.get() || !skipped.get()) {

src/reference/asciidoc/whats-new.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ See <<events>> for more information.
4848
The `SeekToCurrentErrorHandler` can now be configured to commit the offset of a recovered record when the container is configured with `AckMode.MANUAL_IMMEDIATE` (since 2.2.4).
4949
See <<seek-to-current>> for more information.
5050

51+
You can now suppress logging entire `ConsumerRecord` s in error, debug logs etc., by setting the `onlyLogRecordMetadata` container property to `true`.
52+
5153
==== @KafkaListener Changes
5254

5355
You can now override the `concurrency` and `autoStartup` properties of the listener container factory by setting properties on the annotation.

0 commit comments

Comments
 (0)