Skip to content

Commit af83f60

Browse files
garyrussellartembilan
authored andcommitted
GH-1499: Option to suppress ConsumerRecord logging
Resolves #1499 **I will do the backports because I expect many conflicts.**
1 parent b226d6c commit af83f60

File tree

10 files changed

+69
-9
lines changed

10 files changed

+69
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ public class ConsumerProperties {
9797

9898
private LogIfLevelEnabled.Level commitLogLevel = LogIfLevelEnabled.Level.DEBUG;
9999

100+
private boolean onlyLogRecordMetadata;
101+
100102
private Properties kafkaConsumerProperties = new Properties();
101103

102104
private Duration authorizationExceptionRetryInterval;
@@ -374,6 +376,20 @@ public void setCommitRetries(int commitRetries) {
374376
this.commitRetries = commitRetries;
375377
}
376378

379+
public boolean isOnlyLogRecordMetadata() {
380+
return this.onlyLogRecordMetadata;
381+
}
382+
383+
/**
384+
* Set to true to only log {@code topic-partition@offset} in log messages instead
385+
* of {@code record.toString()}.
386+
* @param onlyLogRecordMetadata true to only log the topic/parrtition/offset.
387+
* @since 2.2.14
388+
*/
389+
public void setOnlyLogRecordMetadata(boolean onlyLogRecordMetadata) {
390+
this.onlyLogRecordMetadata = onlyLogRecordMetadata;
391+
}
392+
377393
@Override
378394
public String toString() {
379395
return "ConsumerProperties ["

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,8 @@ public boolean removeNotRetryableException(Class<? extends Exception> exceptionT
168168
}
169169
catch (Exception ex) {
170170
if (records.size() > 0) {
171-
this.logger.error(ex, () -> "Recovery of record (" + records.get(0) + ") failed");
171+
this.logger.error(ex, () -> "Recovery of record ("
172+
+ ListenerUtils.recordToString(records.get(0)) + ") failed");
172173
}
173174
return NEVER_SKIP_PREDICATE;
174175
}

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class FailedRecordTracker {
6363
+ (failedRecord == null
6464
? "none"
6565
: failedRecord.getBackOffExecution())
66-
+ " exhausted for " + rec);
66+
+ " exhausted for " + ListenerUtils.recordToString(rec));
6767
};
6868
}
6969
else {

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -958,6 +958,7 @@ public boolean isLongLived() {
958958

959959
@Override
960960
public void run() {
961+
ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
961962
publishConsumerStartingEvent();
962963
this.consumerThread = Thread.currentThread();
963964
setupSeeks();
@@ -1280,7 +1281,7 @@ record = this.acks.poll();
12801281
}
12811282

12821283
private void traceAck(ConsumerRecord<K, V> record) {
1283-
this.logger.trace(() -> "Ack: " + record);
1284+
this.logger.trace(() -> "Ack: " + ListenerUtils.recordToString(record));
12841285
}
12851286

12861287
private void processAck(ConsumerRecord<K, V> record) {
@@ -1629,7 +1630,7 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
16291630
if (record == null) {
16301631
continue;
16311632
}
1632-
this.logger.trace(() -> "Processing " + record);
1633+
this.logger.trace(() -> "Processing " + ListenerUtils.recordToString(record));
16331634
try {
16341635
if (this.producerPerConsumerPartition) {
16351636
TransactionSupport
@@ -1714,7 +1715,7 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
17141715
if (record == null) {
17151716
continue;
17161717
}
1717-
this.logger.trace(() -> "Processing " + record);
1718+
this.logger.trace(() -> "Processing " + ListenerUtils.recordToString(record));
17181719
doInvokeRecordListener(record, iterator);
17191720
if (this.nackSleep >= 0) {
17201721
handleNack(records, record);
@@ -1728,7 +1729,8 @@ private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> nextArg) {
17281729
if (this.earlyRecordInterceptor != null) {
17291730
next = this.earlyRecordInterceptor.intercept(next);
17301731
if (next == null && this.logger.isDebugEnabled()) {
1731-
this.logger.debug("RecordInterceptor returned null, skipping: " + nextArg);
1732+
this.logger.debug("RecordInterceptor returned null, skipping: "
1733+
+ ListenerUtils.recordToString(nextArg));
17321734
}
17331735
}
17341736
return next;
@@ -1843,7 +1845,8 @@ private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {
18431845
record = this.recordInterceptor.intercept(record);
18441846
}
18451847
if (record == null) {
1846-
this.logger.debug(() -> "RecordInterceptor returned null, skipping: " + recordArg);
1848+
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
1849+
+ ListenerUtils.recordToString(recordArg));
18471850
}
18481851
else {
18491852
switch (this.listenerType) {

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public final class ListenerUtils {
4242
private ListenerUtils() {
4343
}
4444

45+
private static final ThreadLocal<Boolean> LOG_METADATA_ONLY = new ThreadLocal<>();
46+
4547
public static ListenerType determineListenerType(Object listener) {
4648
Assert.notNull(listener, "Listener cannot be null");
4749
ListenerType listenerType;
@@ -96,4 +98,32 @@ public static DeserializationException getExceptionFromHeader(final ConsumerReco
9698
return null;
9799
}
98100

101+
/**
102+
* Set to true to only log record metadata.
103+
* @param onlyMeta true to only log record metadata.
104+
* @since 2.2.14
105+
* @see #recordToString(ConsumerRecord)
106+
*/
107+
public static void setLogOnlyMetadata(boolean onlyMeta) {
108+
LOG_METADATA_ONLY.set(onlyMeta);
109+
}
110+
111+
/**
112+
* Return the {@link ConsumerRecord} as a String; either {@code toString()} or
113+
* {@code topic-partition@offset}.
114+
* @param record the record.
115+
* @return the rendered record.
116+
* @since 2.2.14
117+
* @see #setLogOnlyMetadata(boolean)
118+
*/
119+
public static String recordToString(ConsumerRecord<?, ?> record) {
120+
if (Boolean.TRUE.equals(LOG_METADATA_ONLY.get())) {
121+
return record.topic() + "-" + record.partition() + "@" + record.offset();
122+
}
123+
else {
124+
return record.toString();
125+
}
126+
}
127+
99128
}
129+

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,12 @@ public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?>
8585
skipped.set(test);
8686
}
8787
catch (Exception ex) {
88-
logger.error(ex, "Failed to determine if this record (" + record
88+
logger.error(ex, "Failed to determine if this record (" + ListenerUtils.recordToString(record)
8989
+ ") should be recovererd, including in seeks");
9090
skipped.set(false);
9191
}
9292
if (skipped.get()) {
93-
logger.debug(() -> "Skipping seek of: " + record);
93+
logger.debug(() -> "Skipping seek of: " + ListenerUtils.recordToString(record));
9494
}
9595
}
9696
if (!recoverable || !first.get() || !skipped.get()) {

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,7 @@ public void testRecordAckAfterRecoveryMock() throws Exception {
655655
containerProps.setGroupId("grp");
656656
containerProps.setAckMode(AckMode.RECORD);
657657
containerProps.setMissingTopicsFatal(false);
658+
containerProps.setOnlyLogRecordMetadata(true);
658659
final CountDownLatch latch = new CountDownLatch(2);
659660
MessageListener<Integer, String> messageListener = spy(
660661
new MessageListener<Integer, String>() { // Cannot be lambda: Mockito doesn't mock final classes

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentErrorHandlerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public class SeekToCurrentErrorHandlerTests {
4848

4949
@Test
5050
public void testClassifier() {
51+
ListenerUtils.setLogOnlyMetadata(true);
5152
AtomicReference<ConsumerRecord<?, ?>> recovered = new AtomicReference<>();
5253
AtomicBoolean recovererShouldFail = new AtomicBoolean(false);
5354
SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler((r, t) -> {

src/reference/asciidoc/kafka.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2241,6 +2241,11 @@ See `noPollThreshold` and `pollTimeout`.
22412241
|Multiplied by `pollTimeOut` to determine whether to publish a `NonResponsiveConsumerEvent`.
22422242
See `monitorInterval`.
22432243

2244+
|onlyLogRecord
2245+
Metadata
2246+
|`false`
2247+
|Set to true to show only the `topic-partition@offset` for a record instead of the whole consumer record (in error, debug logs etc).
2248+
22442249
|pollTimeout
22452250
|5000
22462251
|The timeout passed into `Consumer.poll()`.

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ See <<error-handlers>> for more information.
6767
The `getAssignmentsByClientId()` method has been added, making it easier to determine which consumers in a concurrent container are assigned which partition(s).
6868
See <<container-props>> for more information.
6969

70+
You can now suppress logging entire `ConsumerRecord` s in error, debug logs etc.
71+
See `onlyLogRecordMetadata` in <<container-props>>.
72+
7073
[[x25-template]]
7174
==== KafkaTemplate Changes
7275

0 commit comments

Comments
 (0)