Skip to content

Commit ff174e0

Browse files
garyrussellartembilan
authored andcommitted
@KafkaListener Consumer Property Overrides
Add the `properties` attribute to the `@KafkaListener` annotation to enhance or override the consumer factory properties.
1 parent c5b2ee4 commit ff174e0

26 files changed

+244
-48
lines changed

.editorconfig

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
root=true
2+
3+
[*.java]
4+
indent_style = tab
5+
indent_size = 4
6+
continuation_indent_size = 8
7+
8+
[*.xml]
9+
indent_style = tab
10+
indent_size = 4
11+
continuation_indent_size = 8

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,4 +220,25 @@
220220
*/
221221
String autoStartup() default "";
222222

223+
/**
224+
* Kafka consumer properties; they will supersede any properties with the same name
225+
* defined in the consumer factory (if the consumer factory supports property overrides).
226+
* <h3>Supported Syntax</h3>
227+
* <p>The supported syntax for key-value pairs is the same as the
228+
* syntax defined for entries in a Java
229+
* {@linkplain java.util.Properties#load(java.io.Reader) properties file}:
230+
* <ul>
231+
* <li>{@code key=value}</li>
232+
* <li>{@code key:value}</li>
233+
* <li>{@code key value}</li>
234+
* </ul>
235+
* {@code group.id} and {@code client.id} are ignored.
236+
* @return the properties.
237+
* @since 2.2.4
238+
* @see org.apache.kafka.clients.consumer.ConsumerConfig
239+
* @see #groupId()
240+
* @see #clientIdPrefix()
241+
*/
242+
String[] properties() default {};
243+
223244
}

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2018 the original author or authors.
2+
* Copyright 2014-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.kafka.annotation;
1818

19+
import java.io.IOException;
20+
import java.io.StringReader;
1921
import java.lang.reflect.Method;
2022
import java.nio.charset.Charset;
2123
import java.nio.charset.StandardCharsets;
@@ -27,6 +29,7 @@
2729
import java.util.HashSet;
2830
import java.util.List;
2931
import java.util.Map;
32+
import java.util.Properties;
3033
import java.util.Set;
3134
import java.util.concurrent.ConcurrentHashMap;
3235
import java.util.concurrent.atomic.AtomicInteger;
@@ -439,6 +442,19 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
439442
if (StringUtils.hasText(autoStartup)) {
440443
endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
441444
}
445+
String[] propertyStrings = kafkaListener.properties();
446+
if (propertyStrings.length > 0) {
447+
Properties properties = new Properties();
448+
for (String property : propertyStrings) {
449+
try {
450+
properties.load(new StringReader(resolveExpressionAsString(property, "property")));
451+
}
452+
catch (IOException e) {
453+
this.logger.error("Failed to load property " + property + ", continuing...", e);
454+
}
455+
}
456+
endpoint.setConsumerProperties(properties);
457+
}
442458

443459
KafkaListenerContainerFactory<?> factory = null;
444460
String containerFactoryBeanName = resolve(kafkaListener.containerFactory());

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,8 +296,6 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) {
296296

297297
endpoint.setupListenerContainer(instance, this.messageConverter);
298298
initializeContainer(instance, endpoint);
299-
instance.getContainerProperties().setGroupId(endpoint.getGroupId());
300-
instance.getContainerProperties().setClientId(endpoint.getClientIdPrefix());
301299

302300
return instance;
303301
}
@@ -371,6 +369,11 @@ else if (this.autoStartup != null) {
371369
if (this.applicationEventPublisher != null) {
372370
instance.setApplicationEventPublisher(this.applicationEventPublisher);
373371
}
372+
instance.getContainerProperties().setGroupId(endpoint.getGroupId());
373+
instance.getContainerProperties().setClientId(endpoint.getClientIdPrefix());
374+
if (endpoint.getConsumerProperties() != null) {
375+
instance.getContainerProperties().setConsumerProperties(endpoint.getConsumerProperties());
376+
}
374377
}
375378

376379
@Override

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Arrays;
2121
import java.util.Collection;
2222
import java.util.Collections;
23+
import java.util.Properties;
2324
import java.util.regex.Pattern;
2425

2526
import org.apache.commons.logging.Log;
@@ -46,6 +47,7 @@
4647
import org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter;
4748
import org.springframework.kafka.support.TopicPartitionInitialOffset;
4849
import org.springframework.kafka.support.converter.MessageConverter;
50+
import org.springframework.lang.Nullable;
4951
import org.springframework.retry.RecoveryCallback;
5052
import org.springframework.retry.support.RetryTemplate;
5153
import org.springframework.util.Assert;
@@ -109,6 +111,8 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
109111

110112
private ReplyHeadersConfigurer replyHeadersConfigurer;
111113

114+
private Properties consumerProperties;
115+
112116
@Override
113117
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
114118
this.beanFactory = beanFactory;
@@ -386,6 +390,27 @@ public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigu
386390
this.replyHeadersConfigurer = replyHeadersConfigurer;
387391
}
388392

393+
@Override
394+
@Nullable
395+
public Properties getConsumerProperties() {
396+
return this.consumerProperties;
397+
}
398+
399+
/**
400+
* Set the consumer properties that will be merged with the consumer properties
401+
* provided by the consumer factory; properties here will supersede any with the same
402+
* name(s) in the consumer factory.
403+
* {@code group.id} and {@code client.id} are ignored.
404+
* @param consumerProperties the properties.
405+
* @since 2.1.4
406+
* @see org.apache.kafka.clients.consumer.ConsumerConfig
407+
* @see #setGroupId(String)
408+
* @see #setClientIdPrefix(String)
409+
*/
410+
public void setConsumerProperties(Properties consumerProperties) {
411+
this.consumerProperties = consumerProperties;
412+
}
413+
389414
@Override
390415
public void afterPropertiesSet() {
391416
boolean topicsEmpty = getTopics().isEmpty();

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,11 +17,13 @@
1717
package org.springframework.kafka.config;
1818

1919
import java.util.Collection;
20+
import java.util.Properties;
2021
import java.util.regex.Pattern;
2122

2223
import org.springframework.kafka.listener.MessageListenerContainer;
2324
import org.springframework.kafka.support.TopicPartitionInitialOffset;
2425
import org.springframework.kafka.support.converter.MessageConverter;
26+
import org.springframework.lang.Nullable;
2527

2628
/**
2729
* Model for a Kafka listener endpoint. Can be used against a
@@ -97,6 +99,22 @@ public interface KafkaListenerEndpoint {
9799
*/
98100
Boolean getAutoStartup();
99101

102+
/**
103+
* Get the consumer properties that will be merged with the consumer properties
104+
* provided by the consumer factory; properties here will supersede any with the same
105+
* name(s) in the consumer factory.
106+
* {@code group.id} and {@code client.id} are ignored.
107+
* @return the properties.
108+
* @since 2.1.4
109+
* @see org.apache.kafka.clients.consumer.ConsumerConfig
110+
* @see #getGroupId()
111+
* @see #getClientIdPrefix()
112+
*/
113+
@Nullable
114+
default Properties getConsumerProperties() {
115+
return null;
116+
}
117+
100118
/**
101119
* Setup the specified message listener container with the model
102120
* defined by this endpoint.

spring-kafka/src/main/java/org/springframework/kafka/core/ConsumerFactory.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.core;
1818

1919
import java.util.Map;
20+
import java.util.Properties;
2021

2122
import org.apache.kafka.clients.consumer.Consumer;
2223
import org.apache.kafka.common.serialization.Deserializer;
@@ -70,8 +71,6 @@ default Consumer<K, V> createConsumer(@Nullable String groupId, @Nullable String
7071
* Create a consumer with an explicit group id; in addition, the
7172
* client id suffix is appended to the clientIdPrefix which overrides the
7273
* {@code client.id} property, if present.
73-
* If a factory does not implement this method, {@link #createConsumer(String, String)}
74-
* is invoked, ignoring the prefix.
7574
* @param groupId the group id.
7675
* @param clientIdPrefix the prefix.
7776
* @param clientIdSuffix the suffix.
@@ -81,6 +80,24 @@ default Consumer<K, V> createConsumer(@Nullable String groupId, @Nullable String
8180
Consumer<K, V> createConsumer(@Nullable String groupId, @Nullable String clientIdPrefix,
8281
@Nullable String clientIdSuffix);
8382

83+
/**
84+
* Create a consumer with an explicit group id; in addition, the
85+
* client id suffix is appended to the clientIdPrefix which overrides the
86+
* {@code client.id} property, if present. In addition, consumer properties can
87+
* be overridden if the factory implementation supports it.
88+
* @param groupId the group id.
89+
* @param clientIdPrefix the prefix.
90+
* @param clientIdSuffix the suffix.
91+
* @param properties the properties to override.
92+
* @return the consumer.
93+
* @since 2.2.4
94+
*/
95+
default Consumer<K, V> createConsumer(@Nullable String groupId, @Nullable String clientIdPrefix,
96+
@Nullable String clientIdSuffix, @Nullable Properties properties) {
97+
98+
return createConsumer(groupId, clientIdPrefix, clientIdSuffix);
99+
}
100+
84101
/**
85102
* Return true if consumers created by this factory use auto commit.
86103
* @return true if auto commit.

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 the original author or authors.
2+
* Copyright 2016-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919
import java.util.Collections;
2020
import java.util.HashMap;
2121
import java.util.Map;
22+
import java.util.Properties;
2223

2324
import org.apache.kafka.clients.consumer.Consumer;
2425
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -98,20 +99,34 @@ public Deserializer<V> getValueDeserializer() {
9899
public Consumer<K, V> createConsumer(@Nullable String groupId, @Nullable String clientIdPrefix,
99100
@Nullable String clientIdSuffix) {
100101

101-
return createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffix);
102+
return createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffix, null);
102103
}
103104

105+
@Override
106+
public Consumer<K, V> createConsumer(@Nullable String groupId, @Nullable String clientIdPrefix,
107+
@Nullable final String clientIdSuffixArg, @Nullable Properties properties) {
108+
109+
return createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties);
110+
}
111+
112+
@Deprecated
104113
protected KafkaConsumer<K, V> createKafkaConsumer(@Nullable String groupId, @Nullable String clientIdPrefix,
105114
@Nullable final String clientIdSuffixArg) {
106115

116+
return createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, null);
117+
}
118+
119+
protected KafkaConsumer<K, V> createKafkaConsumer(@Nullable String groupId, @Nullable String clientIdPrefix,
120+
@Nullable final String clientIdSuffixArg, @Nullable Properties properties) {
121+
107122
boolean overrideClientIdPrefix = StringUtils.hasText(clientIdPrefix);
108123
String clientIdSuffix = clientIdSuffixArg;
109124
if (clientIdSuffix == null) {
110125
clientIdSuffix = "";
111126
}
112127
boolean shouldModifyClientId = (this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)
113128
&& StringUtils.hasText(clientIdSuffix)) || overrideClientIdPrefix;
114-
if (groupId == null && !shouldModifyClientId) {
129+
if (groupId == null && properties == null && !shouldModifyClientId) {
115130
return createKafkaConsumer(this.configs);
116131
}
117132
else {
@@ -124,6 +139,13 @@ protected KafkaConsumer<K, V> createKafkaConsumer(@Nullable String groupId, @Nul
124139
(overrideClientIdPrefix ? clientIdPrefix
125140
: modifiedConfigs.get(ConsumerConfig.CLIENT_ID_CONFIG)) + clientIdSuffix);
126141
}
142+
if (properties != null) {
143+
properties.forEach((k, v) -> {
144+
if (!k.equals(ConsumerConfig.CLIENT_ID_CONFIG) && !k.equals(ConsumerConfig.GROUP_ID_CONFIG)) {
145+
modifiedConfigs.put((String) k, v);
146+
}
147+
});
148+
}
127149
return createKafkaConsumer(modifiedConfigs);
128150
}
129151
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.Arrays;
2020
import java.util.LinkedHashSet;
21+
import java.util.Properties;
2122
import java.util.regex.Pattern;
2223

2324
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -26,6 +27,7 @@
2627
import org.springframework.core.task.AsyncListenableTaskExecutor;
2728
import org.springframework.kafka.support.LogIfLevelEnabled;
2829
import org.springframework.kafka.support.TopicPartitionInitialOffset;
30+
import org.springframework.lang.Nullable;
2931
import org.springframework.scheduling.TaskScheduler;
3032
import org.springframework.transaction.PlatformTransactionManager;
3133
import org.springframework.util.Assert;
@@ -220,6 +222,8 @@ public enum AckMode {
220222

221223
private boolean missingTopicsFatal = true;
222224

225+
private Properties consumerProperties;
226+
223227
/**
224228
* Create properties for a container that will subscribe to the specified topics.
225229
* @param topics the topics.
@@ -609,6 +613,37 @@ public void setMissingTopicsFatal(boolean missingTopicsFatal) {
609613
this.missingTopicsFatal = missingTopicsFatal;
610614
}
611615

616+
/**
617+
* Get the consumer properties that will be merged with the consumer properties
618+
* provided by the consumer factory; properties here will supersede any with the same
619+
* name(s) in the consumer factory.
620+
* {@code group.id} and {@code client.id} are ignored.
621+
* @return the properties.
622+
* @since 2.1.4
623+
* @see org.apache.kafka.clients.consumer.ConsumerConfig
624+
* @see #setGroupId(String)
625+
* @see #setClientId(String)
626+
*/
627+
@Nullable
628+
public Properties getConsumerProperties() {
629+
return this.consumerProperties;
630+
}
631+
632+
/**
633+
* Set the consumer properties that will be merged with the consumer properties
634+
* provided by the consumer factory; properties here will supersede any with the same
635+
* name(s) in the consumer factory.
636+
* {@code group.id} and {@code client.id} are ignored.
637+
* @param consumerProperties the properties.
638+
* @since 2.1.4
639+
* @see org.apache.kafka.clients.consumer.ConsumerConfig
640+
* @see #setGroupId(String)
641+
* @see #setClientId(String)
642+
*/
643+
public void setConsumerProperties(Properties consumerProperties) {
644+
this.consumerProperties = consumerProperties;
645+
}
646+
612647
@Override
613648
public String toString() {
614649
return "ContainerProperties ["

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
497497
KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
498498
this.consumerGroupId,
499499
this.containerProperties.getClientId(),
500-
KafkaMessageListenerContainer.this.clientIdSuffix);
500+
KafkaMessageListenerContainer.this.clientIdSuffix,
501+
this.containerProperties.getConsumerProperties());
501502

502503
if (this.transactionManager != null) {
503504
this.transactionTemplate = new TransactionTemplate(this.transactionManager);

0 commit comments

Comments
 (0)