Skip to content

Commit a39780c

Browse files
garyrussellartembilan
authored andcommitted
GH-518: Add clientIdPrefix to @KafkaListener
Resolves #518 * Polishing - PR Comments
1 parent 7d1aa4e commit a39780c

23 files changed

+163
-29
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,4 +165,13 @@
165165
*/
166166
boolean idIsGroup() default true;
167167

168+
/**
169+
* When provided, overrides the client id property in the consumer factory
170+
* configuration. A suffix ('-n') is added for each container instance to ensure
171+
* uniqueness when concurrency is used.
172+
* @return the client id prefix.
173+
* @since 2.1.1
174+
*/
175+
String clientIdPrefix() default "";
176+
168177
}

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,8 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
383383
endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
384384
endpoint.setTopics(resolveTopics(kafkaListener));
385385
endpoint.setTopicPattern(resolvePattern(kafkaListener));
386+
endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(),
387+
"clientIdPrefix"));
386388
String group = kafkaListener.containerGroup();
387389
if (StringUtils.hasText(group)) {
388390
Object resolvedGroup = resolveExpression(group);

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) {
227227
endpoint.setupListenerContainer(instance, this.messageConverter);
228228
initializeContainer(instance);
229229
instance.getContainerProperties().setGroupId(endpoint.getGroupId());
230+
instance.getContainerProperties().setClientId(endpoint.getClientIdPrefix());
230231

231232
return instance;
232233
}

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
9292

9393
private KafkaTemplate<K, V> replyTemplate;
9494

95+
private String clientIdPrefix;
96+
9597
@Override
9698
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
9799
this.beanFactory = beanFactory;
@@ -301,6 +303,21 @@ public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallb
301303
this.recoveryCallback = recoveryCallback;
302304
}
303305

306+
@Override
307+
public String getClientIdPrefix() {
308+
return this.clientIdPrefix;
309+
}
310+
311+
/**
312+
* Set the client id prefix; overrides the client id in the consumer configuration
313+
* properties.
314+
* @param clientIdPrefix the prefix.
315+
* @since 2.1.1
316+
*/
317+
public void setClientIdPrefix(String clientIdPrefix) {
318+
this.clientIdPrefix = clientIdPrefix;
319+
}
320+
304321
@Override
305322
public void afterPropertiesSet() {
306323
boolean topicsEmpty = getTopics().isEmpty();

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,15 @@ public interface KafkaListenerEndpoint {
7474
*/
7575
Pattern getTopicPattern();
7676

77+
78+
/**
79+
* Return the client id prefix for the container; it will be suffixed by
80+
* '-n' to provide a unique id when concurrency is used.
81+
* @return the client id prefix.
82+
* @since 2.1.1
83+
*/
84+
String getClientIdPrefix();
85+
7786
/**
7887
* Setup the specified message listener container with the model
7988
* defined by this endpoint.

spring-kafka/src/main/java/org/springframework/kafka/core/ConsumerFactory.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,23 @@ public interface ConsumerFactory<K, V> {
5757
*/
5858
Consumer<K, V> createConsumer(String groupId, String clientIdSuffix);
5959

60+
/**
61+
* Create a consumer with an explicit group id; in addition, the
62+
* client id suffix is appended to the clientIdPrefix which overrides the
63+
* {@code client.id} property, if present.
64+
* If a factory does not implement this method, {@link #createConsumer(String, String)}
65+
* is invoked, ignoring the prefix.
66+
* TODO: remove default in 2.2
67+
* @param groupId the group id.
68+
* @param clientIdPrefix the prefix.
69+
* @param clientIdSuffix the suffix.
70+
* @return the consumer.
71+
* @since 2.1.1
72+
*/
73+
default Consumer<K, V> createConsumer(String groupId, String clientIdPrefix, String clientIdSuffix) {
74+
return createConsumer(groupId, clientIdSuffix);
75+
}
76+
6077
/**
6178
* Return true if consumers created by this factory use auto commit.
6279
* @return true if auto commit.

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.kafka.clients.consumer.KafkaConsumer;
2626
import org.apache.kafka.common.serialization.Deserializer;
2727

28+
import org.springframework.util.StringUtils;
29+
2830
/**
2931
* The {@link ConsumerFactory} implementation to produce a new {@link Consumer} instance
3032
* for provided {@link Map} {@code configs} and optional {@link Deserializer} {@code keyDeserializer},
@@ -95,13 +97,27 @@ public Consumer<K, V> createConsumer(String groupId, String clientIdSuffix) {
9597
return createKafkaConsumer(groupId, clientIdSuffix);
9698
}
9799

100+
@Override
101+
public Consumer<K, V> createConsumer(String groupId, String clientIdPrefix, String clientIdSuffix) {
102+
return createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffix);
103+
}
104+
98105
protected KafkaConsumer<K, V> createKafkaConsumer() {
99106
return createKafkaConsumer(this.configs);
100107
}
101108

102109
protected KafkaConsumer<K, V> createKafkaConsumer(String groupId, String clientIdSuffix) {
103-
boolean shouldModifyClientId = this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)
104-
&& clientIdSuffix != null;
110+
return createKafkaConsumer(groupId, null, clientIdSuffix);
111+
}
112+
113+
protected KafkaConsumer<K, V> createKafkaConsumer(String groupId, String clientIdPrefix,
114+
String clientIdSuffix) {
115+
boolean overrideClientIdPrefix = StringUtils.hasText(clientIdPrefix);
116+
if (clientIdSuffix == null) {
117+
clientIdSuffix = "";
118+
}
119+
boolean shouldModifyClientId = (this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)
120+
&& StringUtils.hasText(clientIdSuffix)) || overrideClientIdPrefix;
105121
if (groupId == null && !shouldModifyClientId) {
106122
return createKafkaConsumer();
107123
}
@@ -112,7 +128,8 @@ protected KafkaConsumer<K, V> createKafkaConsumer(String groupId, String clientI
112128
}
113129
if (shouldModifyClientId) {
114130
modifiedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG,
115-
modifiedConfigs.get(ConsumerConfig.CLIENT_ID_CONFIG) + clientIdSuffix);
131+
(overrideClientIdPrefix ? clientIdPrefix
132+
: modifiedConfigs.get(ConsumerConfig.CLIENT_ID_CONFIG)) + clientIdSuffix);
116133
}
117134
return createKafkaConsumer(modifiedConfigs);
118135
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -387,8 +387,11 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
387387
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
388388
Assert.state(!this.isAnyManualAck || !this.autoCommit,
389389
"Consumer cannot be configured for auto commit for ackMode " + this.containerProperties.getAckMode());
390-
final Consumer<K, V> consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
391-
this.consumerGroupId, KafkaMessageListenerContainer.this.clientIdSuffix);
390+
final Consumer<K, V> consumer =
391+
KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
392+
this.consumerGroupId,
393+
this.containerProperties.getClientId(),
394+
KafkaMessageListenerContainer.this.clientIdSuffix);
392395
this.consumer = consumer;
393396

394397
ConsumerRebalanceListener rebalanceListener = createRebalanceListener(consumer);

spring-kafka/src/main/java/org/springframework/kafka/listener/config/ContainerProperties.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ public class ContainerProperties {
157157

158158
private float noPollThreshold = DEFAULT_NO_POLL_THRESHOLD;
159159

160+
private String clientId = "";
161+
160162
public ContainerProperties(String... topics) {
161163
Assert.notEmpty(topics, "An array of topicPartitions must be provided");
162164
this.topics = Arrays.asList(topics).toArray(new String[topics.length]);
@@ -460,4 +462,25 @@ public void setNoPollThreshold(float noPollThreshold) {
460462
this.noPollThreshold = noPollThreshold;
461463
}
462464

465+
/**
466+
* Return the client id.
467+
* @return the client id.
468+
* @since 2.1.1
469+
* @see #setClientId(String)
470+
*/
471+
public String getClientId() {
472+
return this.clientId;
473+
}
474+
475+
/**
476+
* Set the client id; overrides the consumer factory client.id property.
477+
* When used in a concurrent container, will be suffixed with '-n' to
478+
* provide a unique value for each consumer.
479+
* @param clientId the client id.
480+
* @since 2.1.1
481+
*/
482+
public void setClientId(String clientId) {
483+
this.clientId = clientId;
484+
}
485+
463486
}

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,8 @@ public void testSimple() throws Exception {
218218
List<?> containers = KafkaTestUtils.getPropertyValue(manualContainer, "containers", List.class);
219219
assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.consumerGroupId"))
220220
.isEqualTo("qux");
221+
assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.consumer.clientId"))
222+
.isEqualTo("clientIdViaProps3-0");
221223

222224
template.send("annotated5", 0, 0, "foo");
223225
template.send("annotated5", 1, 0, "bar");
@@ -237,6 +239,8 @@ public void testSimple() throws Exception {
237239
assertThat(offset.isRelativeToCurrent()).isTrue();
238240
assertThat(KafkaTestUtils.getPropertyValue(fizContainer, "listenerConsumer.consumer.coordinator.groupId"))
239241
.isEqualTo("fiz");
242+
assertThat(KafkaTestUtils.getPropertyValue(fizContainer, "listenerConsumer.consumer.clientId"))
243+
.isEqualTo("clientIdViaAnnotation-0");
240244

241245
template.send("annotated11", 0, "foo");
242246
template.flush();
@@ -251,6 +255,13 @@ public void testSimple() throws Exception {
251255
.getPropertyValue(rebalanceConcurrentContainer, "containers", List.class).get(0);
252256
assertThat(KafkaTestUtils.getPropertyValue(rebalanceContainer, "listenerConsumer.consumer.coordinator.groupId"))
253257
.isNotEqualTo("rebalanceListener");
258+
String clientId = KafkaTestUtils.getPropertyValue(rebalanceContainer, "listenerConsumer.consumer.clientId",
259+
String.class);
260+
assertThat(
261+
clientId)
262+
.startsWith("consumer-");
263+
assertThat(clientId.indexOf('-')).isEqualTo(clientId.lastIndexOf('-'));
264+
254265
}
255266

256267
@Test
@@ -673,7 +684,8 @@ public KafkaListenerContainerFactory<?> batchSpyFactory() {
673684
ConsumerFactory spiedCf = mock(ConsumerFactory.class);
674685
willAnswer(i -> {
675686
Consumer<Integer, String> spy =
676-
spy(consumerFactory().createConsumer(i.getArgument(0), i.getArgument(1)));
687+
spy(consumerFactory().createConsumer(i.getArgument(0), i.getArgument(1),
688+
i.getArgument(2)));
677689
willAnswer(invocation -> {
678690

679691
try {
@@ -685,7 +697,7 @@ public KafkaListenerContainerFactory<?> batchSpyFactory() {
685697

686698
}).given(spy).commitSync(anyMap());
687699
return spy;
688-
}).given(spiedCf).createConsumer(anyString(), anyString());
700+
}).given(spiedCf).createConsumer(anyString(), anyString(), anyString());
689701
factory.setConsumerFactory(spiedCf);
690702
factory.setBatchListener(true);
691703
factory.setRecordFilterStrategy(recordFilter());
@@ -698,7 +710,18 @@ public KafkaListenerContainerFactory<?> batchSpyFactory() {
698710
public KafkaListenerContainerFactory<?> batchManualFactory() {
699711
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
700712
new ConcurrentKafkaListenerContainerFactory<>();
701-
factory.setConsumerFactory(manualConsumerFactory());
713+
factory.setConsumerFactory(manualConsumerFactory("clientIdViaProps1"));
714+
ContainerProperties props = factory.getContainerProperties();
715+
props.setAckMode(AckMode.MANUAL_IMMEDIATE);
716+
factory.setBatchListener(true);
717+
return factory;
718+
}
719+
720+
@Bean
721+
public KafkaListenerContainerFactory<?> batchManualFactory2() {
722+
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
723+
new ConcurrentKafkaListenerContainerFactory<>();
724+
factory.setConsumerFactory(manualConsumerFactory("clientIdViaProps2"));
702725
ContainerProperties props = factory.getContainerProperties();
703726
props.setAckMode(AckMode.MANUAL_IMMEDIATE);
704727
factory.setBatchListener(true);
@@ -710,7 +733,7 @@ public KafkaListenerContainerFactory<?> batchManualFactory() {
710733
kafkaManualAckListenerContainerFactory() {
711734
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
712735
new ConcurrentKafkaListenerContainerFactory<>();
713-
factory.setConsumerFactory(manualConsumerFactory());
736+
factory.setConsumerFactory(manualConsumerFactory("clientIdViaProps3"));
714737
ContainerProperties props = factory.getContainerProperties();
715738
props.setAckMode(AckMode.MANUAL_IMMEDIATE);
716739
props.setIdleEventInterval(100L);
@@ -751,10 +774,10 @@ public DefaultKafkaConsumerFactory<Integer, String> consumerFactory() {
751774
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
752775
}
753776

754-
@Bean
755-
public ConsumerFactory<Integer, String> manualConsumerFactory() {
777+
private ConsumerFactory<Integer, String> manualConsumerFactory(String clientId) {
756778
Map<String, Object> configs = consumerConfigs();
757779
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
780+
configs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
758781
return new DefaultKafkaConsumerFactory<>(configs);
759782
}
760783

@@ -1090,7 +1113,7 @@ public void eventHandler(ListenerContainerIdleEvent event) {
10901113
@TopicPartition(topic = "annotated6", partitions = "0",
10911114
partitionOffsets = @PartitionOffset(partition = "${xxx:1}", initialOffset = "${yyy:0}",
10921115
relativeToCurrent = "${zzz:true}"))
1093-
})
1116+
}, clientIdPrefix = "${foo.xxx:clientIdViaAnnotation}")
10941117
public void listen5(ConsumerRecord<?, ?> record) {
10951118
this.record = record;
10961119
this.latch5.countDown();
@@ -1183,7 +1206,7 @@ public void listen14(List<Message<?>> list) {
11831206
this.latch14.countDown();
11841207
}
11851208

1186-
@KafkaListener(id = "list6", topics = "annotated19", containerFactory = "batchManualFactory")
1209+
@KafkaListener(id = "list6", topics = "annotated19", containerFactory = "batchManualFactory2")
11871210
public void listen15(List<Message<?>> list, Acknowledgment ack) {
11881211
this.payload = list;
11891212
this.ack = ack;

0 commit comments

Comments
 (0)