Skip to content

Commit 902e9b3

Browse files
garyrussellartembilan
authored andcommitted
GH-1283: Unique client.id for each producer
Resolves #1283 Avoid `InstanceAlreadyExistsException` s when the user supplies a custom `client.id`. **cherry-pick to all supported branches** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java # spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java
1 parent 54d83f3 commit 902e9b3

File tree

2 files changed

+22
-2
lines changed

2 files changed

+22
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 the original author or authors.
2+
* Copyright 2016-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -86,6 +86,8 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
8686

8787
private final Map<String, CloseSafeProducer<K, V>> consumerProducers = new HashMap<>();
8888

89+
private final AtomicInteger clientIdCounter = new AtomicInteger();
90+
8991
private volatile CloseSafeProducer<K, V> producer;
9092

9193
private Serializer<K> keySerializer;
@@ -100,6 +102,7 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
100102

101103
private boolean producerPerConsumerPartition = true;
102104

105+
private String clientIdPrefix;
103106
/**
104107
* Construct a factory with the provided configuration.
105108
* @param configs the configuration.
@@ -110,9 +113,13 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs) {
110113

111114
public DefaultKafkaProducerFactory(Map<String, Object> configs, Serializer<K> keySerializer,
112115
Serializer<V> valueSerializer) {
116+
113117
this.configs = new HashMap<>(configs);
114118
this.keySerializer = keySerializer;
115119
this.valueSerializer = valueSerializer;
120+
if (configs.get(ProducerConfig.CLIENT_ID_CONFIG) instanceof String) {
121+
this.clientIdPrefix = (String) configs.get(ProducerConfig.CLIENT_ID_CONFIG);
122+
}
116123
}
117124

118125
public void setKeySerializer(Serializer<K> keySerializer) {
@@ -256,7 +263,15 @@ public Producer<K, V> createProducer() {
256263
* @return the producer.
257264
*/
258265
protected Producer<K, V> createKafkaProducer() {
259-
return new KafkaProducer<K, V>(this.configs, this.keySerializer, this.valueSerializer);
266+
if (this.clientIdPrefix == null) {
267+
return new KafkaProducer<K, V>(this.configs, this.keySerializer, this.valueSerializer);
268+
}
269+
else {
270+
Map<String, Object> newConfigs = new HashMap<>(this.configs);
271+
newConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
272+
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
273+
return new KafkaProducer<>(newConfigs, this.keySerializer, this.valueSerializer);
274+
}
260275
}
261276

262277
Producer<K, V> createTransactionalProducerForPartition() {
@@ -298,6 +313,10 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String suffix, boolean isCons
298313
Producer<K, V> producer;
299314
Map<String, Object> configs = new HashMap<>(this.configs);
300315
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionIdPrefix + suffix);
316+
if (this.clientIdPrefix != null) {
317+
newProducerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
318+
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
319+
}
301320
producer = new KafkaProducer<K, V>(configs, this.keySerializer, this.valueSerializer);
302321
producer.initTransactions();
303322
return new CloseSafeProducer<K, V>(producer, this.cache,

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ public class KafkaTemplateTransactionTests {
104104
public void testLocalTransaction() throws Exception {
105105
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
106106
senderProps.put(ProducerConfig.RETRIES_CONFIG, 1);
107+
senderProps.put(ProducerConfig.CLIENT_ID_CONFIG, "customClientId");
107108
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
108109
pf.setKeySerializer(new StringSerializer());
109110
pf.setTransactionIdPrefix("my.transaction.");

0 commit comments

Comments
 (0)