Skip to content

Commit b178176

Browse files
garyrussellartembilan
authored andcommitted
GH-537: Configurable log level for offset commits
Resolves #537 Use a `Supplier` to avoid early string concatenation. Docs Polishing
1 parent 63b2bcb commit b178176

File tree

6 files changed

+205
-16
lines changed

6 files changed

+205
-16
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -56,6 +56,7 @@
5656
import org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback;
5757
import org.springframework.kafka.listener.config.ContainerProperties;
5858
import org.springframework.kafka.support.Acknowledgment;
59+
import org.springframework.kafka.support.LogIfLevelEnabled;
5960
import org.springframework.kafka.support.TopicPartitionInitialOffset;
6061
import org.springframework.kafka.support.TopicPartitionInitialOffset.SeekPosition;
6162
import org.springframework.kafka.transaction.KafkaTransactionManager;
@@ -371,6 +372,9 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
371372

372373
private final ScheduledFuture<?> monitorTask;
373374

375+
private final LogIfLevelEnabled commitLogger = new LogIfLevelEnabled(this.logger,
376+
this.containerProperties.getCommitLogLevel());
377+
374378
private volatile Map<TopicPartition, OffsetMetadata> definedPartitions;
375379

376380
private volatile Collection<TopicPartition> assignedPartitions;
@@ -529,9 +533,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
529533
return;
530534
}
531535
}
532-
if (ListenerConsumer.this.logger.isDebugEnabled()) {
533-
ListenerConsumer.this.logger.debug("Committing on assignment: " + offsets);
534-
}
536+
ListenerConsumer.this.commitLogger.log(() -> "Committing on assignment: " + offsets);
535537
if (ListenerConsumer.this.transactionTemplate != null &&
536538
ListenerConsumer.this.kafkaTxManager != null) {
537539
ListenerConsumer.this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@@ -755,15 +757,12 @@ private void ackImmediate(ConsumerRecord<K, V> record) {
755757
Map<TopicPartition, OffsetAndMetadata> commits = Collections.singletonMap(
756758
new TopicPartition(record.topic(), record.partition()),
757759
new OffsetAndMetadata(record.offset() + 1));
758-
if (ListenerConsumer.this.logger.isDebugEnabled()) {
759-
ListenerConsumer.this.logger.debug("Committing: " + commits);
760-
}
760+
this.commitLogger.log(() -> "Committing: " + commits);
761761
if (this.containerProperties.isSyncCommits()) {
762-
ListenerConsumer.this.consumer.commitSync(commits);
762+
this.consumer.commitSync(commits);
763763
}
764764
else {
765-
ListenerConsumer.this.consumer.commitAsync(commits,
766-
ListenerConsumer.this.commitCallback);
765+
this.consumer.commitAsync(commits, this.commitCallback);
767766
}
768767
}
769768

@@ -995,6 +994,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> recor
995994
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
996995
new OffsetAndMetadata(record.offset() + 1));
997996
if (producer == null) {
997+
this.commitLogger.log(() -> "Committing: " + offsetsToCommit);
998998
if (this.containerProperties.isSyncCommits()) {
999999
this.consumer.commitSync(offsetsToCommit);
10001000
}
@@ -1019,6 +1019,7 @@ else if (!this.isAnyManualAck && !this.autoCommit) {
10191019
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
10201020
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
10211021
new OffsetAndMetadata(record.offset() + 1));
1022+
this.commitLogger.log(() -> "Committing: " + offsetsToCommit);
10221023
if (this.containerProperties.isSyncCommits()) {
10231024
this.consumer.commitSync(offsetsToCommit);
10241025
}
@@ -1072,6 +1073,7 @@ else if (!this.isAnyManualAck) {
10721073
private void sendOffsetsToTransaction(Producer producer) {
10731074
handleAcks();
10741075
Map<TopicPartition, OffsetAndMetadata> commits = buildCommits();
1076+
this.commitLogger.log(() -> "Sending offsets to transaction: " + commits);
10751077
producer.sendOffsetsToTransaction(commits, this.consumerGroupId);
10761078
}
10771079

@@ -1209,9 +1211,7 @@ private void commitIfNecessary() {
12091211
this.logger.debug("Commit list: " + commits);
12101212
}
12111213
if (!commits.isEmpty()) {
1212-
if (this.logger.isDebugEnabled()) {
1213-
this.logger.debug("Committing: " + commits);
1214-
}
1214+
this.commitLogger.log(() -> "Committing: " + commits);
12151215
try {
12161216
if (this.containerProperties.isSyncCommits()) {
12171217
this.consumer.commitSync(commits);

spring-kafka/src/main/java/org/springframework/kafka/listener/config/ContainerProperties.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@
2929
import org.springframework.kafka.listener.BatchErrorHandler;
3030
import org.springframework.kafka.listener.ErrorHandler;
3131
import org.springframework.kafka.listener.GenericErrorHandler;
32+
import org.springframework.kafka.support.LogIfLevelEnabled;
3233
import org.springframework.kafka.support.TopicPartitionInitialOffset;
3334
import org.springframework.scheduling.TaskScheduler;
3435
import org.springframework.transaction.PlatformTransactionManager;
@@ -162,6 +163,8 @@ public class ContainerProperties {
162163

163164
private boolean logContainerConfig;
164165

166+
private LogIfLevelEnabled.Level commitLogLevel = LogIfLevelEnabled.Level.DEBUG;
167+
165168
public ContainerProperties(String... topics) {
166169
Assert.notEmpty(topics, "An array of topicPartitions must be provided");
167170
this.topics = Arrays.asList(topics).toArray(new String[topics.length]);
@@ -490,7 +493,7 @@ public void setClientId(String clientId) {
490493
/**
491494
* Log the container configuration if true (INFO).
492495
* @return true to log.
493-
* @since 2.0.1
496+
* @since 2.1.1
494497
*/
495498
public boolean isLogContainerConfig() {
496499
return this.logContainerConfig;
@@ -505,6 +508,26 @@ public void setLogContainerConfig(boolean logContainerConfig) {
505508
this.logContainerConfig = logContainerConfig;
506509
}
507510

511+
/**
512+
* The level at which to log offset commits.
513+
* @return the level.
514+
* @since 2.1.2
515+
*/
516+
public LogIfLevelEnabled.Level getCommitLogLevel() {
517+
return this.commitLogLevel;
518+
}
519+
520+
/**
521+
* Set the level at which to log offset commits.
522+
* Default: DEBUG.
523+
* @param commitLogLevel the level.
524+
* @since 2.1.2
525+
*/
526+
public void setCommitLogLevel(LogIfLevelEnabled.Level commitLogLevel) {
527+
Assert.notNull(commitLogLevel, "'commitLogLevel' cannot be nul");
528+
this.commitLogLevel = commitLogLevel;
529+
}
530+
508531
@Override
509532
public String toString() {
510533
return "ContainerProperties ["
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support;
18+
19+
import java.util.function.Supplier;
20+
21+
import org.apache.commons.logging.Log;
22+
23+
import org.springframework.util.Assert;
24+
25+
/**
26+
* Wrapper for a commons-logging Log supporting configurable
27+
* logging levels.
28+
*
29+
* @author Gary Russell
30+
* @since 2.1.2
31+
*
32+
*/
33+
public final class LogIfLevelEnabled {
34+
35+
private final Log logger;
36+
37+
private final Level level;
38+
39+
public LogIfLevelEnabled(Log logger, Level level) {
40+
Assert.notNull(logger, "'logger' cannot be null");
41+
Assert.notNull(level, "'level' cannot be null");
42+
this.logger = logger;
43+
this.level = level;
44+
}
45+
46+
/**
47+
* Logging levels.
48+
*/
49+
public enum Level {
50+
51+
/**
52+
* Fatal.
53+
*/
54+
FATAL,
55+
56+
/**
57+
* Error.
58+
*/
59+
ERROR,
60+
61+
/**
62+
* Warn.
63+
*/
64+
WARN,
65+
66+
/**
67+
* Info.
68+
*/
69+
INFO,
70+
71+
/**
72+
* Debug.
73+
*/
74+
DEBUG,
75+
76+
/**
77+
* Trace.
78+
*/
79+
TRACE
80+
81+
}
82+
83+
public void log(Supplier<Object> messageSupplier) {
84+
switch (this.level) {
85+
case FATAL:
86+
if (this.logger.isFatalEnabled()) {
87+
this.logger.fatal(messageSupplier.get());
88+
}
89+
break;
90+
case ERROR:
91+
if (this.logger.isErrorEnabled()) {
92+
this.logger.error(messageSupplier.get());
93+
}
94+
break;
95+
case WARN:
96+
if (this.logger.isWarnEnabled()) {
97+
this.logger.warn(messageSupplier.get());
98+
}
99+
break;
100+
case INFO:
101+
if (this.logger.isInfoEnabled()) {
102+
this.logger.info(messageSupplier.get());
103+
}
104+
break;
105+
case DEBUG:
106+
if (this.logger.isDebugEnabled()) {
107+
this.logger.debug(messageSupplier.get());
108+
}
109+
break;
110+
case TRACE:
111+
if (this.logger.isTraceEnabled()) {
112+
this.logger.trace(messageSupplier.get());
113+
}
114+
break;
115+
}
116+
}
117+
118+
public void log(Supplier<Object> messageSupplier, Throwable t) {
119+
switch (this.level) {
120+
case FATAL:
121+
if (this.logger.isFatalEnabled()) {
122+
this.logger.fatal(messageSupplier.get(), t);
123+
}
124+
break;
125+
case ERROR:
126+
if (this.logger.isErrorEnabled()) {
127+
this.logger.error(messageSupplier.get(), t);
128+
}
129+
break;
130+
case WARN:
131+
if (this.logger.isWarnEnabled()) {
132+
this.logger.warn(messageSupplier.get(), t);
133+
}
134+
break;
135+
case INFO:
136+
if (this.logger.isInfoEnabled()) {
137+
this.logger.info(messageSupplier.get(), t);
138+
}
139+
break;
140+
case DEBUG:
141+
if (this.logger.isDebugEnabled()) {
142+
this.logger.debug(messageSupplier.get(), t);
143+
}
144+
break;
145+
case TRACE:
146+
if (this.logger.isTraceEnabled()) {
147+
this.logger.trace(messageSupplier.get(), t);
148+
}
149+
break;
150+
}
151+
}
152+
153+
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -441,6 +441,7 @@ public void testRecordAck() throws Exception {
441441
containerProps.setSyncCommits(true);
442442
containerProps.setAckMode(AckMode.RECORD);
443443
containerProps.setAckOnError(false);
444+
// containerProps.setCommitLogLevel(LogIfLevelEnabled.Level.WARN);
444445

445446
CountDownLatch stubbingComplete = new CountDownLatch(1);
446447
KafkaMessageListenerContainer<Integer, String> container = spyOnContainer(

src/reference/asciidoc/kafka.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,7 @@ The `KafkaMessageListenerContainer` receives all message from all topics/partiti
441441
The `ConcurrentMessageListenerContainer` delegates to 1 or more `KafkaMessageListenerContainer` s to provide
442442
multi-threaded consumption.
443443

444+
[[kafka-container]]
444445
====== KafkaMessageListenerContainer
445446

446447
The following constructors are available.
@@ -498,6 +499,10 @@ Refer to the JavaDocs for `ContainerProperties` for more information about the v
498499

499500
Since version _2.1.1_, a new property `logContainerConfig` is available; when true, and INFO logging is enabled, each listener container will write a log message summarizing its configuration properties.
500501

502+
By default, logging of topic offset commits is performed with the DEBUG logging level.
503+
Starting with _version 2.1.2_, there is a new property in `ContainerProperties` called `commitLogLevel` which allows you to specify the log level for these messages.
504+
For example, to change the log level to INFO, use `containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);`.
505+
501506
====== ConcurrentMessageListenerContainer
502507

503508
The single constructor is similar to the first `KafkaListenerContainer` constructor:

src/reference/asciidoc/whats-new.adoc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,13 @@ Starting with _version 2.1.1_, it is now possible to set the `client.id` prefix
2222
Previously, to customize the client id, you would need a separate consumer factory (and container factory) per listener.
2323
The prefix is suffixed with `-n` to provide unique client ids when using concurrency.
2424

25+
26+
==== Logging Offset Commits
27+
28+
By default, logging of topic offset commits is performed with the DEBUG logging level.
29+
Starting with _version 2.1.2_, there is a new property in `ContainerProperties` called `commitLogLevel` which allows you to specify the log level for these messages.
30+
See <<kafka-container>> for more information.
31+
2532
==== Migration Guide from 2.0
2633

2734
https://github.com/spring-projects/spring-kafka/wiki/Spring-for-Apache-Kafka-2.0-to-2.1-Migration-Guide[2.0 to 2.1 Migration].

0 commit comments

Comments
 (0)