11/*
2- * Copyright 2016-2018 the original author or authors.
2+ * Copyright 2016-2019 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
@@ -93,6 +93,8 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
9393
9494 private final Map <String , CloseSafeProducer <K , V >> consumerProducers = new HashMap <>();
9595
96+ private final AtomicInteger clientIdCounter = new AtomicInteger ();
97+
9698 private volatile CloseSafeProducer <K , V > producer ;
9799
98100 private Serializer <K > keySerializer ;
@@ -107,6 +109,7 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
107109
108110 private boolean producerPerConsumerPartition = true ;
109111
112+ private String clientIdPrefix ;
110113 /**
111114 * Construct a factory with the provided configuration.
112115 * @param configs the configuration.
@@ -117,9 +120,13 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs) {
117120
118121 public DefaultKafkaProducerFactory (Map <String , Object > configs , Serializer <K > keySerializer ,
119122 Serializer <V > valueSerializer ) {
123+
120124 this .configs = new HashMap <>(configs );
121125 this .keySerializer = keySerializer ;
122126 this .valueSerializer = valueSerializer ;
127+ if (configs .get (ProducerConfig .CLIENT_ID_CONFIG ) instanceof String ) {
128+ this .clientIdPrefix = (String ) configs .get (ProducerConfig .CLIENT_ID_CONFIG );
129+ }
123130 }
124131
125132 public void setKeySerializer (Serializer <K > keySerializer ) {
@@ -274,7 +281,15 @@ public Producer<K, V> createProducer() {
274281 * @return the producer.
275282 */
276283 protected Producer <K , V > createKafkaProducer () {
277- return new KafkaProducer <K , V >(this .configs , this .keySerializer , this .valueSerializer );
284+ if (this .clientIdPrefix == null ) {
285+ return new KafkaProducer <K , V >(this .configs , this .keySerializer , this .valueSerializer );
286+ }
287+ else {
288+ Map <String , Object > newConfigs = new HashMap <>(this .configs );
289+ newConfigs .put (ProducerConfig .CLIENT_ID_CONFIG ,
290+ this .clientIdPrefix + "-" + this .clientIdCounter .incrementAndGet ());
291+ return new KafkaProducer <>(newConfigs , this .keySerializer , this .valueSerializer );
292+ }
278293 }
279294
280295 Producer <K , V > createTransactionalProducerForPartition () {
@@ -328,6 +343,10 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String suffix, Consumer<Close
328343 Producer <K , V > newProducer ;
329344 Map <String , Object > newProducerConfigs = new HashMap <>(this .configs );
330345 newProducerConfigs .put (ProducerConfig .TRANSACTIONAL_ID_CONFIG , this .transactionIdPrefix + suffix );
346+ if (this .clientIdPrefix != null ) {
347+ newProducerConfigs .put (ProducerConfig .CLIENT_ID_CONFIG ,
348+ this .clientIdPrefix + "-" + this .clientIdCounter .incrementAndGet ());
349+ }
331350 newProducer = new KafkaProducer <K , V >(newProducerConfigs , this .keySerializer , this .valueSerializer );
332351 newProducer .initTransactions ();
333352 return new CloseSafeProducer <K , V >(newProducer , this .cache , remover ,
0 commit comments