Skip to content

Commit ec446d4

Browse files
garyrussellartembilan
authored andcommitted
GH-1283: Unique client.id for each producer
Resolves #1283 Avoid `InstanceAlreadyExistsException` s when the user supplies a custom `client.id`. **cherry-pick to all supported branches**
1 parent ae0dec7 commit ec446d4

File tree

2 files changed

+25
-2
lines changed

2 files changed

+25
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
123123

124124
private final Map<String, CloseSafeProducer<K, V>> consumerProducers = new HashMap<>();
125125

126+
private final AtomicInteger clientIdCounter = new AtomicInteger();
127+
126128
private Supplier<Serializer<K>> keySerializerSupplier;
127129

128130
private Supplier<Serializer<V>> valueSerializerSupplier;
@@ -139,6 +141,8 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
139141

140142
private ThreadLocal<CloseSafeProducer<K, V>> threadBoundProducers;
141143

144+
private String clientIdPrefix;
145+
142146
private volatile CloseSafeProducer<K, V> producer;
143147

144148
/**
@@ -182,6 +186,9 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
182186
this.configs = new HashMap<>(configs);
183187
this.keySerializerSupplier = keySerializerSupplier == null ? () -> null : keySerializerSupplier;
184188
this.valueSerializerSupplier = valueSerializerSupplier == null ? () -> null : valueSerializerSupplier;
189+
if (this.clientIdPrefix == null && configs.get(ProducerConfig.CLIENT_ID_CONFIG) instanceof String) {
190+
this.clientIdPrefix = (String) configs.get(ProducerConfig.CLIENT_ID_CONFIG);
191+
}
185192

186193
String txId = (String) this.configs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
187194
if (StringUtils.hasText(txId)) {
@@ -393,7 +400,17 @@ public Producer<K, V> createProducer(@Nullable String txIdPrefixArg) {
393400
* @return the producer.
394401
*/
395402
protected Producer<K, V> createKafkaProducer() {
396-
return new KafkaProducer<>(this.configs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
403+
if (this.clientIdPrefix == null) {
404+
return new KafkaProducer<>(this.configs, this.keySerializerSupplier.get(),
405+
this.valueSerializerSupplier.get());
406+
}
407+
else {
408+
Map<String, Object> newConfigs = new HashMap<>(this.configs);
409+
newConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
410+
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
411+
return new KafkaProducer<>(newConfigs, this.keySerializerSupplier.get(),
412+
this.valueSerializerSupplier.get());
413+
}
397414
}
398415

399416
protected Producer<K, V> createTransactionalProducerForPartition() {
@@ -460,6 +477,10 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
460477
Producer<K, V> newProducer;
461478
Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
462479
newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, prefix + suffix);
480+
if (this.clientIdPrefix != null) {
481+
newProducerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
482+
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
483+
}
463484
newProducer = new KafkaProducer<>(newProducerConfigs, this.keySerializerSupplier
464485
.get(), this.valueSerializerSupplier.get());
465486
newProducer.initTransactions();

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ public void testLocalTransaction() {
102102
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
103103
senderProps.put(ProducerConfig.RETRIES_CONFIG, 1);
104104
senderProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my.transaction.");
105+
senderProps.put(ProducerConfig.CLIENT_ID_CONFIG, "customClientId");
105106
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
106107
pf.setKeySerializer(new StringSerializer());
107108
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
@@ -116,6 +117,7 @@ public void testLocalTransaction() {
116117
template.executeInTransaction(kt -> kt.send(LOCAL_TX_IN_TOPIC, "one"));
117118
ConsumerRecord<String, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, LOCAL_TX_IN_TOPIC);
118119
template.executeInTransaction(t -> {
120+
pf.createProducer("testCustomClientIdIsUnique").close();
119121
t.sendDefault("foo", "bar");
120122
t.sendDefault("baz", "qux");
121123
t.sendOffsetsToTransaction(Collections.singletonMap(
@@ -144,7 +146,7 @@ record = iterator.next();
144146
template.executeInTransaction(t -> {
145147
assertThat(KafkaTestUtils.getPropertyValue(
146148
KafkaTestUtils.getPropertyValue(template, "producers", ThreadLocal.class).get(),
147-
"delegate.transactionManager.transactionalId")).isEqualTo("tx.template.override.1");
149+
"delegate.transactionManager.transactionalId")).isEqualTo("tx.template.override.2");
148150
return null;
149151
});
150152
assertThat(pf.getCache("tx.template.override.")).hasSize(1);

0 commit comments

Comments
 (0)