Skip to content

Commit 7b214f8

Browse files
garyrussellartembilan
authored andcommitted
GH-834: Remove transactional producers
Resolves #834 To solve the zombie fencing problem there is a producer for each group/topic/partition. Close these producers when a partition is revoked or the container stopped. **cherry-pick to 2.1.x, 2.0.x** I will backport to 1.3.x (without lambdas etc) after review/merge. * checkstyle
1 parent 1783e8c commit 7b214f8

File tree

4 files changed

+61
-8
lines changed

4 files changed

+61
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
9595

9696
private final BlockingQueue<CloseSafeProducer<K, V>> cache = new LinkedBlockingQueue<>();
9797

98-
private final Map<String, Producer<K, V>> consumerProducers = new HashMap<>();
98+
private final Map<String, CloseSafeProducer<K, V>> consumerProducers = new HashMap<>();
9999

100100
private volatile CloseSafeProducer<K, V> producer;
101101

@@ -224,7 +224,7 @@ public void destroy() throws Exception { //NOSONAR
224224
}
225225
synchronized (this.consumerProducers) {
226226
this.consumerProducers.forEach(
227-
(k, v) -> ((CloseSafeProducer<K, V>) v).delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS));
227+
(k, v) -> v.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS));
228228
this.consumerProducers.clear();
229229
}
230230
}
@@ -316,7 +316,7 @@ private Producer<K, V> createTransactionalProducerForPartition() {
316316
else {
317317
synchronized (this.consumerProducers) {
318318
if (!this.consumerProducers.containsKey(suffix)) {
319-
Producer<K, V> newProducer = doCreateTxProducer(suffix, this::removeConsumerProducer);
319+
CloseSafeProducer<K, V> newProducer = doCreateTxProducer(suffix, this::removeConsumerProducer);
320320
this.consumerProducers.put(suffix, newProducer);
321321
return newProducer;
322322
}
@@ -329,7 +329,7 @@ private Producer<K, V> createTransactionalProducerForPartition() {
329329

330330
private void removeConsumerProducer(CloseSafeProducer<K, V> producer) {
331331
synchronized (this.consumerProducers) {
332-
Iterator<Entry<String, Producer<K, V>>> iterator = this.consumerProducers.entrySet().iterator();
332+
Iterator<Entry<String, CloseSafeProducer<K, V>>> iterator = this.consumerProducers.entrySet().iterator();
333333
while (iterator.hasNext()) {
334334
if (iterator.next().getValue().equals(producer)) {
335335
iterator.remove();
@@ -355,7 +355,7 @@ protected Producer<K, V> createTransactionalProducer() {
355355
}
356356
}
357357

358-
private Producer<K, V> doCreateTxProducer(String suffix, Consumer<CloseSafeProducer<K, V>> remover) {
358+
private CloseSafeProducer<K, V> doCreateTxProducer(String suffix, Consumer<CloseSafeProducer<K, V>> remover) {
359359
Producer<K, V> producer;
360360
Map<String, Object> configs = new HashMap<>(this.configs);
361361
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionIdPrefix + suffix);
@@ -368,6 +368,18 @@ protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
368368
return this.cache;
369369
}
370370

371+
@Override
372+
public void closeProducerFor(String transactionIdSuffix) {
373+
if (this.producerPerConsumerPartition) {
374+
synchronized (this.consumerProducers) {
375+
CloseSafeProducer<K, V> removed = this.consumerProducers.remove(transactionIdSuffix);
376+
if (removed != null) {
377+
removed.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS);
378+
}
379+
}
380+
}
381+
}
382+
371383
/**
372384
* A wrapper class for the delegate.
373385
*

spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,27 @@
2828
*/
2929
public interface ProducerFactory<K, V> {
3030

31+
/**
32+
* Create a producer.
33+
* @return the producer.
34+
*/
3135
Producer<K, V> createProducer();
3236

37+
/**
38+
* Return true if the factory supports transactions.
39+
* @return true if transactional.
40+
*/
3341
default boolean transactionCapable() {
3442
return false;
3543
}
3644

45+
/**
46+
* Remove the specified producer from the cache and close it.
47+
* @param transactionIdSuffix the producer's transaction id suffix.
48+
* @since 1.3.8
49+
*/
50+
default void closeProducerFor(String transactionIdSuffix) {
51+
// NOSONAR
52+
}
53+
3754
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.springframework.kafka.KafkaException;
5656
import org.springframework.kafka.core.ConsumerFactory;
5757
import org.springframework.kafka.core.KafkaResourceHolder;
58+
import org.springframework.kafka.core.ProducerFactory;
5859
import org.springframework.kafka.core.ProducerFactoryUtils;
5960
import org.springframework.kafka.event.ConsumerPausedEvent;
6061
import org.springframework.kafka.event.ConsumerResumedEvent;
@@ -562,6 +563,9 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
562563
if (this.consumerAwareListener != null) {
563564
this.consumerAwareListener.onPartitionsRevokedAfterCommit(consumer, partitions);
564565
}
566+
if (ListenerConsumer.this.kafkaTxManager != null) {
567+
closeProducers(partitions);
568+
}
565569
}
566570

567571
@Override
@@ -786,6 +790,9 @@ public void run() {
786790
// No-op. Continue process
787791
}
788792
}
793+
else {
794+
closeProducers(getAssignedPartitions());
795+
}
789796
}
790797
else {
791798
ListenerConsumer.this.logger.error("No offset and no reset policy; stopping container");
@@ -1064,8 +1071,8 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
10641071
this.logger.trace("Processing " + record);
10651072
}
10661073
try {
1067-
TransactionSupport.setTransactionIdSuffix(
1068-
this.consumerGroupId + "." + record.topic() + "." + record.partition());
1074+
TransactionSupport
1075+
.setTransactionIdSuffix(zombieFenceTxIdSuffix(record.topic(), record.partition()));
10691076
this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {
10701077

10711078
@Override
@@ -1447,6 +1454,23 @@ public String toString() {
14471454
+ "]";
14481455
}
14491456

1457+
private void closeProducers(Collection<TopicPartition> partitions) {
1458+
ProducerFactory<?, ?> producerFactory = this.kafkaTxManager.getProducerFactory();
1459+
partitions.forEach(tp -> {
1460+
try {
1461+
producerFactory.closeProducerFor(zombieFenceTxIdSuffix(tp.topic(), tp.partition()));
1462+
}
1463+
catch (Exception e) {
1464+
this.logger.error("Failed to close producer with transaction id suffix: "
1465+
+ zombieFenceTxIdSuffix(tp.topic(), tp.partition()), e);
1466+
}
1467+
});
1468+
}
1469+
1470+
private String zombieFenceTxIdSuffix(String topic, int partition) {
1471+
return this.consumerGroupId + "." + topic + "." + partition;
1472+
}
1473+
14501474
private final class ConsumerAcknowledgment implements Acknowledgment {
14511475

14521476
private final ConsumerRecord<K, V> record;

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,9 +473,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
473473
// depending on timing, the position might include the offset representing the commit in the log
474474
assertThat(consumer.position(new TopicPartition(topic1, 0))).isGreaterThanOrEqualTo(1L);
475475
assertThat(transactionalId.get()).startsWith("rr.group.txTopic");
476+
assertThat(KafkaTestUtils.getPropertyValue(pf, "consumerProducers", Map.class)).isEmpty();
476477
logger.info("Stop testRollbackRecord");
477478
pf.destroy();
478-
assertThat(KafkaTestUtils.getPropertyValue(pf, "consumerProducers", Map.class)).isEmpty();
479479
consumer.close();
480480
}
481481

0 commit comments

Comments
 (0)