|
108 | 108 | * @author Nakul Mishra |
109 | 109 | * @author Artem Bilan |
110 | 110 | * @author Chris Gilbert |
| 111 | + * @author Thomas Strauß |
111 | 112 | */ |
112 | 113 | public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory |
113 | 114 | implements ProducerFactory<K, V>, ApplicationContextAware, |
114 | | - BeanNameAware, ApplicationListener<ContextStoppedEvent>, DisposableBean { |
| 115 | + BeanNameAware, ApplicationListener<ContextStoppedEvent>, DisposableBean { |
115 | 116 |
|
116 | 117 | private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(DefaultKafkaProducerFactory.class)); |
117 | 118 |
|
@@ -361,6 +362,63 @@ public void setMaxAge(Duration maxAge) { |
361 | 362 | this.maxAge = maxAge.toMillis(); |
362 | 363 | } |
363 | 364 |
|
| 365 | + /** |
| 366 | + * Copy properties of the instance and the given properties to create a new producer factory. |
| 367 | + * <p>If the {@link org.springframework.kafka.core.DefaultKafkaProducerFactory} makes a |
| 368 | + * copy of itself, the transaction id prefix is recovered from the properties. If |
| 369 | + * you want to change the ID config, add a new |
| 370 | + * {@link org.apache.kafka.clients.producer.ProducerConfig#TRANSACTIONAL_ID_CONFIG} |
| 371 | + * key to the override config.</p> |
| 372 | + * @param overrideProperties the properties to be applied to the new factory |
| 373 | + * @return {@link org.springframework.kafka.core.DefaultKafkaProducerFactory} with |
| 374 | + * properties applied |
| 375 | + */ |
| 376 | + @Override |
| 377 | + public ProducerFactory<K, V> copyWithConfigurationOverride(Map<String, Object> overrideProperties) { |
| 378 | + Map<String, Object> producerProperties = new HashMap<>(getConfigurationProperties()); |
| 379 | + producerProperties.putAll(overrideProperties); |
| 380 | + producerProperties = ensureExistingTransactionIdPrefixInProperties(producerProperties); |
| 381 | + DefaultKafkaProducerFactory<K, V> newFactory = |
| 382 | + new DefaultKafkaProducerFactory<>(producerProperties, |
| 383 | + getKeySerializerSupplier(), |
| 384 | + getValueSerializerSupplier()); |
| 385 | + newFactory.setPhysicalCloseTimeout((int) getPhysicalCloseTimeout().getSeconds()); |
| 386 | + newFactory.setProducerPerConsumerPartition(isProducerPerConsumerPartition()); |
| 387 | + newFactory.setProducerPerThread(isProducerPerThread()); |
| 388 | + for (ProducerPostProcessor<K, V> templatePostProcessor : getPostProcessors()) { |
| 389 | + newFactory.addPostProcessor(templatePostProcessor); |
| 390 | + } |
| 391 | + for (ProducerFactory.Listener<K, V> templateListener : getListeners()) { |
| 392 | + newFactory.addListener(templateListener); |
| 393 | + } |
| 394 | + return newFactory; |
| 395 | + } |
| 396 | + |
| 397 | + |
| 398 | + /** |
| 399 | + * Ensures that the returned properties map contains a transaction id prefix. |
| 400 | + * The {@link org.springframework.kafka.core.DefaultKafkaProducerFactory} |
| 401 | + * modifies the local properties copy, the txn key is removed and |
| 402 | + * stored locally in a property. To make a proper copy of the properties in a |
| 403 | + * new factory, the transactionId has to be reinserted prior use. |
| 404 | + * The incoming properties are checked for a transactionId key. If none is |
| 405 | + * there, the one existing in the factory is added. |
| 406 | + * @param producerProperties the properties to be used for the new factory |
| 407 | + * @return the producerProperties or a copy with the transaction ID set |
| 408 | + */ |
| 409 | + private Map<String, Object> ensureExistingTransactionIdPrefixInProperties(Map<String, Object> producerProperties) { |
| 410 | + String transactionIdPrefix = getTransactionIdPrefix(); |
| 411 | + if (StringUtils.hasText(transactionIdPrefix)) { |
| 412 | + if (!producerProperties.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) { |
| 413 | + Map<String, Object> producerPropertiesWithTxnId = new HashMap<>(producerProperties); |
| 414 | + producerPropertiesWithTxnId.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionIdPrefix); |
| 415 | + return producerPropertiesWithTxnId; |
| 416 | + } |
| 417 | + } |
| 418 | + |
| 419 | + return producerProperties; |
| 420 | + } |
| 421 | + |
364 | 422 | /** |
365 | 423 | * Add a listener. |
366 | 424 | * @param listener the listener. |
@@ -418,8 +476,8 @@ public void updateConfigs(Map<String, Object> updates) { |
418 | 476 | Assert.isTrue(entry.getValue() instanceof String, () -> "'" + ProducerConfig.TRANSACTIONAL_ID_CONFIG |
419 | 477 | + "' must be a String, not a " + entry.getClass().getName()); |
420 | 478 | Assert.isTrue(this.transactionIdPrefix != null |
421 | | - ? entry.getValue() != null |
422 | | - : entry.getValue() == null, |
| 479 | + ? entry.getValue() != null |
| 480 | + : entry.getValue() == null, |
423 | 481 | "Cannot change transactional capability"); |
424 | 482 | this.transactionIdPrefix = (String) entry.getValue(); |
425 | 483 | } |
@@ -695,7 +753,7 @@ boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout |
695 | 753 | BlockingQueue<CloseSafeProducer<K, V>> txIdCache = getCache(producerToRemove.txIdPrefix); |
696 | 754 | if (producerToRemove.epoch != this.epoch.get() |
697 | 755 | || (txIdCache != null && !txIdCache.contains(producerToRemove) |
698 | | - && !txIdCache.offer(producerToRemove))) { |
| 756 | + && !txIdCache.offer(producerToRemove))) { |
699 | 757 | producerToRemove.closeDelegate(timeout, this.listeners); |
700 | 758 | return true; |
701 | 759 | } |
@@ -943,7 +1001,7 @@ public void abortTransaction() throws ProducerFencedException { |
943 | 1001 | LOGGER.debug(() -> toString() + " abortTransaction()"); |
944 | 1002 | if (this.producerFailed != null) { |
945 | 1003 | LOGGER.debug(() -> "abortTransaction ignored - previous txFailed: " + this.producerFailed.getMessage() |
946 | | - + ": " + this); |
| 1004 | + + ": " + this); |
947 | 1005 | } |
948 | 1006 | else { |
949 | 1007 | try { |
|
0 commit comments