Skip to content

Commit 2f61ebf

Browse files
garyrussellartembilan
authored andcommitted
1 parent a39780c commit 2f61ebf

File tree

4 files changed

+75
-1
lines changed

4 files changed

+75
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
364364

365365
private final String consumerGroupId = this.containerProperties.getGroupId() == null
366366
? (String) KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties()
367-
.get(ConsumerConfig.GROUP_ID_CONFIG)
367+
.get(ConsumerConfig.GROUP_ID_CONFIG)
368368
: this.containerProperties.getGroupId();
369369

370370
private final TaskScheduler taskScheduler;
@@ -463,6 +463,9 @@ else if (listener instanceof MessageListener) {
463463
}
464464
this.monitorTask = this.taskScheduler.scheduleAtFixedRate(() -> checkConsumer(),
465465
this.containerProperties.getMonitorInterval() * 1000);
466+
if (this.containerProperties.isLogContainerConfig() && this.logger.isInfoEnabled()) {
467+
this.logger.info(this);
468+
}
466469
}
467470

468471
protected void checkConsumer() {
@@ -1260,6 +1263,19 @@ public void seekToEnd(String topic, int partition) {
12601263
this.seeks.add(new TopicPartitionInitialOffset(topic, partition, SeekPosition.END));
12611264
}
12621265

1266+
@Override
1267+
public String toString() {
1268+
return "KafkaMessageListenerContainer.ListenerConsumer ["
1269+
+ "containerProperties=" + this.containerProperties
1270+
+ ", listenerType=" + this.listenerType
1271+
+ ", isConsumerAwareListener=" + this.isConsumerAwareListener
1272+
+ ", isBatchListener=" + this.isBatchListener
1273+
+ ", autoCommit=" + this.autoCommit
1274+
+ ", consumerGroupId=" + this.consumerGroupId
1275+
+ ", clientIdSuffix=" + KafkaMessageListenerContainer.this.clientIdSuffix
1276+
+ "]";
1277+
}
1278+
12631279
private final class ConsumerAcknowledgment implements Acknowledgment {
12641280

12651281
private final ConsumerRecord<K, V> record;

spring-kafka/src/main/java/org/springframework/kafka/listener/config/ContainerProperties.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.springframework.scheduling.TaskScheduler;
3434
import org.springframework.transaction.PlatformTransactionManager;
3535
import org.springframework.util.Assert;
36+
import org.springframework.util.StringUtils;
3637

3738
/**
3839
* Contains runtime properties for a listener container.
@@ -159,6 +160,8 @@ public class ContainerProperties {
159160

160161
private String clientId = "";
161162

163+
private boolean logContainerConfig;
164+
162165
public ContainerProperties(String... topics) {
163166
Assert.notEmpty(topics, "An array of topicPartitions must be provided");
164167
this.topics = Arrays.asList(topics).toArray(new String[topics.length]);
@@ -204,6 +207,7 @@ public void setMessageListener(Object messageListener) {
204207
* @param ackMode the {@link AckMode}; default BATCH.
205208
*/
206209
public void setAckMode(AbstractMessageListenerContainer.AckMode ackMode) {
210+
Assert.notNull(ackMode, "'ackMode' cannot be null");
207211
this.ackMode = ackMode;
208212
}
209213

@@ -483,4 +487,55 @@ public void setClientId(String clientId) {
483487
this.clientId = clientId;
484488
}
485489

490+
/**
491+
* Log the container configuration if true (INFO).
492+
* @return true to log.
493+
* @since 2.0.1
494+
*/
495+
public boolean isLogContainerConfig() {
496+
return this.logContainerConfig;
497+
}
498+
499+
/**
500+
* Set to true to instruct each container to log this configuration.
501+
* @param logContainerConfig true to log.
502+
* @since 2.1.1
503+
*/
504+
public void setLogContainerConfig(boolean logContainerConfig) {
505+
this.logContainerConfig = logContainerConfig;
506+
}
507+
508+
@Override
509+
public String toString() {
510+
return "ContainerProperties ["
511+
+ (this.topics != null ? "topics=" + Arrays.toString(this.topics) : "")
512+
+ (this.topicPattern != null ? ", topicPattern=" + this.topicPattern : "")
513+
+ (this.topicPartitions != null
514+
? ", topicPartitions=" + Arrays.toString(this.topicPartitions) : "")
515+
+ ", ackMode=" + this.ackMode
516+
+ ", ackCount=" + this.ackCount
517+
+ ", ackTime=" + this.ackTime
518+
+ ", messageListener=" + this.messageListener
519+
+ ", pollTimeout=" + this.pollTimeout
520+
+ (this.consumerTaskExecutor != null
521+
? ", consumerTaskExecutor=" + this.consumerTaskExecutor : "")
522+
+ (this.errorHandler != null ? ", errorHandler=" + this.errorHandler : "")
523+
+ ", shutdownTimeout=" + this.shutdownTimeout
524+
+ (this.consumerRebalanceListener != null
525+
? ", consumerRebalanceListener=" + this.consumerRebalanceListener : "")
526+
+ (this.commitCallback != null ? ", commitCallback=" + this.commitCallback : "")
527+
+ ", syncCommits=" + this.syncCommits
528+
+ ", ackOnError=" + this.ackOnError
529+
+ ", idleEventInterval="
530+
+ (this.idleEventInterval == null ? "not enabled" : this.idleEventInterval)
531+
+ (this.groupId != null ? ", groupId=" + this.groupId : "")
532+
+ (this.transactionManager != null
533+
? ", transactionManager=" + this.transactionManager : "")
534+
+ ", monitorInterval=" + this.monitorInterval
535+
+ (this.scheduler != null ? ", scheduler=" + this.scheduler : "")
536+
+ ", noPollThreshold=" + this.noPollThreshold
537+
+ (StringUtils.hasText(this.clientId) ? ", clientId=" + this.clientId : "")
538+
+ "]";
539+
}
540+
486541
}

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ public void testAutoCommit() throws Exception {
102102
Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "true", embeddedKafka);
103103
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
104104
ContainerProperties containerProps = new ContainerProperties(topic1);
105+
containerProps.setLogContainerConfig(true);
105106

106107
final CountDownLatch latch = new CountDownLatch(4);
107108
final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();

src/reference/asciidoc/kafka.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,8 @@ return container;
496496

497497
Refer to the JavaDocs for `ContainerProperties` for more information about the various properties that can be set.
498498

499+
Since version _2.1.1_, a new property `logContainerConfig` is available; when true, and INFO logging is enabled, each listener container will write a log message summarizing its configuration properties.
500+
499501
====== ConcurrentMessageListenerContainer
500502

501503
The single constructor is similar to the first `KafkaListenerContainer` constructor:

0 commit comments

Comments
 (0)