Skip to content

Commit 01154a9

Browse files
artembilangaryrussell
authored andcommitted
GH-1008: Use transactional.id config as a prefix (#1065)
* GH-1008: Use `transactional.id` config as a prefix Fixes #1008 * If provided `configs` for the `DefaultKafkaProducerFactory` contains a `ProducerConfig.TRANSACTIONAL_ID_CONFIG` value, treat it as a `prefix` and override for the target `Producer` configs. Also log an `INFO` that this value is going to be prefix the target `Producer` * An explicit `setTransactionIdPrefix()` will override this value any way * Polishing for `KafkaTemplateTransactionTests` together with an explicit `ProducerConfig.TRANSACTIONAL_ID_CONFIG` config in the `testLocalTransaction()` test * * Add `DefaultKafkaProducerFactory.getTransactionIdPrefix()`
1 parent b00f542 commit 01154a9

File tree

2 files changed

+127
-130
lines changed

2 files changed

+127
-130
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 55 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.springframework.kafka.support.TransactionSupport;
5656
import org.springframework.lang.Nullable;
5757
import org.springframework.util.Assert;
58+
import org.springframework.util.StringUtils;
5859

5960
/**
6061
* The {@link ProducerFactory} implementation for a {@code singleton} shared
@@ -92,7 +93,7 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
9293
*/
9394
public static final Duration DEFAULT_PHYSICAL_CLOSE_TIMEOUT = Duration.ofSeconds(30);
9495

95-
private static final Log logger = LogFactory.getLog(DefaultKafkaProducerFactory.class); // NOSONAR
96+
private static final Log LOGGER = LogFactory.getLog(DefaultKafkaProducerFactory.class);
9697

9798
private final Map<String, Object> configs;
9899

@@ -102,8 +103,6 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
102103

103104
private final Map<String, CloseSafeProducer<K, V>> consumerProducers = new HashMap<>();
104105

105-
private volatile CloseSafeProducer<K, V> producer;
106-
107106
private Serializer<K> keySerializer;
108107

109108
private Serializer<V> valueSerializer;
@@ -116,6 +115,8 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
116115

117116
private boolean producerPerConsumerPartition = true;
118117

118+
private volatile CloseSafeProducer<K, V> producer;
119+
119120
/**
120121
* Construct a factory with the provided configuration.
121122
* @param configs the configuration.
@@ -126,16 +127,30 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs) {
126127

127128
/**
128129
* Construct a factory with the provided configuration and {@link Serializer}s.
130+
* Also configures a {@link #transactionIdPrefix} as a value from the
131+
* {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided.
132+
* This config is going to be overridden with a suffix for target {@link Producer} instance.
129133
* @param configs the configuration.
130134
* @param keySerializer the key {@link Serializer}.
131135
* @param valueSerializer the value {@link Serializer}.
132136
*/
133137
public DefaultKafkaProducerFactory(Map<String, Object> configs,
134138
@Nullable Serializer<K> keySerializer,
135139
@Nullable Serializer<V> valueSerializer) {
140+
136141
this.configs = new HashMap<>(configs);
137142
this.keySerializer = keySerializer;
138143
this.valueSerializer = valueSerializer;
144+
145+
String txId = (String) this.configs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
146+
if (StringUtils.hasText(txId)) {
147+
setTransactionIdPrefix(txId);
148+
if (LOGGER.isInfoEnabled()) {
149+
LOGGER.info("If 'setTransactionIdPrefix()' is not going to be configured, " +
150+
"an existing 'transactional.id' config with value: '" + txId +
151+
"' will be suffixed with the number for concurrent transactions support.");
152+
}
153+
}
139154
}
140155

141156
@Override
@@ -152,7 +167,7 @@ public void setValueSerializer(@Nullable Serializer<V> valueSerializer) {
152167
}
153168

154169
/**
155-
* The time to wait when physically closing the producer (when {@link #stop()} or {@link #destroy()} is invoked).
170+
* The time to wait when physically closing the producer (when {@link #reset()} or {@link #destroy()} is invoked).
156171
* Specified in seconds; default {@link #DEFAULT_PHYSICAL_CLOSE_TIMEOUT}.
157172
* @param physicalCloseTimeout the timeout in seconds.
158173
* @since 1.0.7
@@ -162,23 +177,29 @@ public void setPhysicalCloseTimeout(int physicalCloseTimeout) {
162177
}
163178

164179
/**
165-
* Set the transactional.id prefix.
180+
* Set a prefix for the {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} config.
181+
* By default a {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} value from configs is used as a prefix
182+
* in the target producer configs.
166183
* @param transactionIdPrefix the prefix.
167184
* @since 1.3
168185
*/
169-
public void setTransactionIdPrefix(String transactionIdPrefix) {
186+
public final void setTransactionIdPrefix(String transactionIdPrefix) {
170187
Assert.notNull(transactionIdPrefix, "'transactionIdPrefix' cannot be null");
171188
this.transactionIdPrefix = transactionIdPrefix;
172189
enableIdempotentBehaviour();
173190
}
174191

192+
protected String getTransactionIdPrefix() {
193+
return this.transactionIdPrefix;
194+
}
195+
175196
/**
176197
* When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream.
177198
*/
178199
private void enableIdempotentBehaviour() {
179200
Object previousValue = this.configs.putIfAbsent(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
180-
if (logger.isDebugEnabled() && Boolean.FALSE.equals(previousValue)) {
181-
logger.debug("The '" + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG +
201+
if (LOGGER.isDebugEnabled() && Boolean.FALSE.equals(previousValue)) {
202+
LOGGER.debug("The '" + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG +
182203
"' is set to false, may result in duplicate messages");
183204
}
184205
}
@@ -233,13 +254,13 @@ public void destroy() {
233254
producerToClose.delegate.close(this.physicalCloseTimeout);
234255
}
235256
catch (Exception e) {
236-
logger.error("Exception while closing producer", e);
257+
LOGGER.error("Exception while closing producer", e);
237258
}
238259
producerToClose = this.cache.poll();
239260
}
240261
synchronized (this.consumerProducers) {
241262
this.consumerProducers.forEach(
242-
(k, v) -> v.delegate.close(this.physicalCloseTimeout));
263+
(k, v) -> v.delegate.close(this.physicalCloseTimeout));
243264
this.consumerProducers.clear();
244265
}
245266
}
@@ -251,47 +272,13 @@ public void onApplicationEvent(ContextStoppedEvent event) {
251272
}
252273
}
253274

254-
/**
255-
* NoOp.
256-
* @deprecated {@link org.springframework.context.Lifecycle} is no longer implemented.
257-
*/
258-
@Deprecated
259-
public void start() {
260-
// NOSONAR
261-
}
262-
263-
/**
264-
* NoOp.
265-
* @deprecated {@link org.springframework.context.Lifecycle} is no longer implemented;
266-
* use {@link #reset()} to close the {@link Producer}(s).
267-
*/
268-
@Deprecated
269-
public void stop() {
270-
reset();
271-
}
272-
273275
/**
274276
* Close the {@link Producer}(s) and clear the cache of transactional
275277
* {@link Producer}(s).
276278
* @since 2.2
277279
*/
278280
public void reset() {
279-
try {
280-
destroy();
281-
}
282-
catch (Exception e) {
283-
logger.error("Exception while closing producer", e);
284-
}
285-
}
286-
287-
/**
288-
* NoOp.
289-
* @return always true.
290-
* @deprecated {@link org.springframework.context.Lifecycle} is no longer implemented.
291-
*/
292-
@Deprecated
293-
public boolean isRunning() {
294-
return true;
281+
destroy();
295282
}
296283

297284
@Override
@@ -307,7 +294,7 @@ public Producer<K, V> createProducer() {
307294
if (this.producer == null) {
308295
synchronized (this) {
309296
if (this.producer == null) {
310-
this.producer = new CloseSafeProducer<K, V>(createKafkaProducer());
297+
this.producer = new CloseSafeProducer<>(createKafkaProducer());
311298
}
312299
}
313300
}
@@ -320,7 +307,7 @@ public Producer<K, V> createProducer() {
320307
* @return the producer.
321308
*/
322309
protected Producer<K, V> createKafkaProducer() {
323-
return new KafkaProducer<K, V>(this.configs, this.keySerializer, this.valueSerializer);
310+
return new KafkaProducer<>(this.configs, this.keySerializer, this.valueSerializer);
324311
}
325312

326313
Producer<K, V> createTransactionalProducerForPartition() {
@@ -370,13 +357,15 @@ protected Producer<K, V> createTransactionalProducer() {
370357
}
371358
}
372359

373-
private CloseSafeProducer<K, V> doCreateTxProducer(String suffix, Consumer<CloseSafeProducer<K, V>> remover) {
360+
private CloseSafeProducer<K, V> doCreateTxProducer(String suffix,
361+
@Nullable Consumer<CloseSafeProducer<K, V>> remover) {
362+
374363
Producer<K, V> newProducer;
375364
Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
376365
newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionIdPrefix + suffix);
377-
newProducer = new KafkaProducer<K, V>(newProducerConfigs, this.keySerializer, this.valueSerializer);
366+
newProducer = new KafkaProducer<>(newProducerConfigs, this.keySerializer, this.valueSerializer);
378367
newProducer.initTransactions();
379-
return new CloseSafeProducer<K, V>(newProducer, this.cache, remover,
368+
return new CloseSafeProducer<>(newProducer, this.cache, remover,
380369
(String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
381370
}
382371

@@ -471,15 +460,15 @@ public void initTransactions() {
471460

472461
@Override
473462
public void beginTransaction() throws ProducerFencedException {
474-
if (logger.isDebugEnabled()) {
475-
logger.debug("beginTransaction: " + this);
463+
if (LOGGER.isDebugEnabled()) {
464+
LOGGER.debug("beginTransaction: " + this);
476465
}
477466
try {
478467
this.delegate.beginTransaction();
479468
}
480469
catch (RuntimeException e) {
481-
if (logger.isErrorEnabled()) {
482-
logger.error("beginTransaction failed: " + this, e);
470+
if (LOGGER.isErrorEnabled()) {
471+
LOGGER.error("beginTransaction failed: " + this, e);
483472
}
484473
this.txFailed = true;
485474
throw e;
@@ -495,15 +484,15 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
495484

496485
@Override
497486
public void commitTransaction() throws ProducerFencedException {
498-
if (logger.isDebugEnabled()) {
499-
logger.debug("commitTransaction: " + this);
487+
if (LOGGER.isDebugEnabled()) {
488+
LOGGER.debug("commitTransaction: " + this);
500489
}
501490
try {
502491
this.delegate.commitTransaction();
503492
}
504493
catch (RuntimeException e) {
505-
if (logger.isErrorEnabled()) {
506-
logger.error("commitTransaction failed: " + this, e);
494+
if (LOGGER.isErrorEnabled()) {
495+
LOGGER.error("commitTransaction failed: " + this, e);
507496
}
508497
this.txFailed = true;
509498
throw e;
@@ -512,15 +501,15 @@ public void commitTransaction() throws ProducerFencedException {
512501

513502
@Override
514503
public void abortTransaction() throws ProducerFencedException {
515-
if (logger.isDebugEnabled()) {
516-
logger.debug("abortTransaction: " + this);
504+
if (LOGGER.isDebugEnabled()) {
505+
LOGGER.debug("abortTransaction: " + this);
517506
}
518507
try {
519508
this.delegate.abortTransaction();
520509
}
521510
catch (RuntimeException e) {
522-
if (logger.isErrorEnabled()) {
523-
logger.error("Abort failed: " + this, e);
511+
if (LOGGER.isErrorEnabled()) {
512+
LOGGER.error("Abort failed: " + this, e);
524513
}
525514
this.txFailed = true;
526515
throw e;
@@ -543,9 +532,10 @@ public void close(long timeout, @Nullable TimeUnit unit) {
543532
public void close(@Nullable Duration timeout) {
544533
if (this.cache != null) {
545534
if (this.txFailed) {
546-
if (logger.isWarnEnabled()) {
547-
logger.warn("Error during transactional operation; producer removed from cache; possible cause: "
548-
+ "broker restarted during transaction: " + this);
535+
if (LOGGER.isWarnEnabled()) {
536+
LOGGER.warn("Error during transactional operation; producer removed from cache; possible " +
537+
"cause: "
538+
+ "broker restarted during transaction: " + this);
549539
}
550540
if (timeout == null) {
551541
this.delegate.close();

0 commit comments

Comments
 (0)