Skip to content

Commit a7c453b

Browse files
garyrussellartembilan
authored andcommitted
GH-834: Remove transactional producers
Resolves #834 To solve the zombie fencing problem there is a producer for each group/topic/partition. Close these producers when a partition is revoked or the container stopped. **cherry-pick to 2.1.x, 2.0.x** I will backport to 1.3.x (without lambdas etc) after review/merge. * checkstyle # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java
1 parent d415392 commit a7c453b

File tree

4 files changed

+61
-9
lines changed

4 files changed

+61
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
8585

8686
private final BlockingQueue<CloseSafeProducer<K, V>> cache = new LinkedBlockingQueue<>();
8787

88-
private final Map<String, Producer<K, V>> consumerProducers = new HashMap<>();
88+
private final Map<String, CloseSafeProducer<K, V>> consumerProducers = new HashMap<>();
8989

9090
private volatile CloseSafeProducer<K, V> producer;
9191

@@ -202,8 +202,7 @@ public void destroy() throws Exception { //NOSONAR
202202
}
203203
synchronized (this.consumerProducers) {
204204
this.consumerProducers.forEach(
205-
(k, v) -> ((CloseSafeProducer<K, V>) v).delegate
206-
.close(this.physicalCloseTimeout, TimeUnit.SECONDS));
205+
(k, v) -> v.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS));
207206
this.consumerProducers.clear();
208207
}
209208
}
@@ -268,7 +267,7 @@ private Producer<K, V> createTransactionalProducerForPartition() {
268267
else {
269268
synchronized (this.consumerProducers) {
270269
if (!this.consumerProducers.containsKey(suffix)) {
271-
Producer<K, V> newProducer = doCreateTxProducer(suffix, this::removeConsumerProducer);
270+
CloseSafeProducer<K, V> newProducer = doCreateTxProducer(suffix, this::removeConsumerProducer);
272271
this.consumerProducers.put(suffix, newProducer);
273272
return newProducer;
274273
}
@@ -281,7 +280,7 @@ private Producer<K, V> createTransactionalProducerForPartition() {
281280

282281
private void removeConsumerProducer(CloseSafeProducer<K, V> producer) {
283282
synchronized (this.consumerProducers) {
284-
Iterator<Entry<String, Producer<K, V>>> iterator = this.consumerProducers.entrySet().iterator();
283+
Iterator<Entry<String, CloseSafeProducer<K, V>>> iterator = this.consumerProducers.entrySet().iterator();
285284
while (iterator.hasNext()) {
286285
if (iterator.next().getValue().equals(producer)) {
287286
iterator.remove();
@@ -307,7 +306,7 @@ protected Producer<K, V> createTransactionalProducer() {
307306
}
308307
}
309308

310-
private Producer<K, V> doCreateTxProducer(String suffix, Consumer<CloseSafeProducer<K, V>> remover) {
309+
private CloseSafeProducer<K, V> doCreateTxProducer(String suffix, Consumer<CloseSafeProducer<K, V>> remover) {
311310
Producer<K, V> producer;
312311
Map<String, Object> configs = new HashMap<>(this.configs);
313312
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionIdPrefix + suffix);
@@ -320,6 +319,18 @@ protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
320319
return this.cache;
321320
}
322321

322+
@Override
323+
public void closeProducerFor(String transactionIdSuffix) {
324+
if (this.producerPerConsumerPartition) {
325+
synchronized (this.consumerProducers) {
326+
CloseSafeProducer<K, V> removed = this.consumerProducers.remove(transactionIdSuffix);
327+
if (removed != null) {
328+
removed.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS);
329+
}
330+
}
331+
}
332+
}
333+
323334
/**
324335
* A wrapper class for the delegate.
325336
*

spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,27 @@
2828
*/
2929
public interface ProducerFactory<K, V> {
3030

31+
/**
32+
* Create a producer.
33+
* @return the producer.
34+
*/
3135
Producer<K, V> createProducer();
3236

37+
/**
38+
* Return true if the factory supports transactions.
39+
* @return true if transactional.
40+
*/
3341
default boolean transactionCapable() {
3442
return false;
3543
}
3644

45+
/**
46+
* Remove the specified producer from the cache and close it.
47+
* @param transactionIdSuffix the producer's transaction id suffix.
48+
* @since 1.3.8
49+
*/
50+
default void closeProducerFor(String transactionIdSuffix) {
51+
// NOSONAR
52+
}
53+
3754
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.springframework.kafka.KafkaException;
5555
import org.springframework.kafka.core.ConsumerFactory;
5656
import org.springframework.kafka.core.KafkaResourceHolder;
57+
import org.springframework.kafka.core.ProducerFactory;
5758
import org.springframework.kafka.core.ProducerFactoryUtils;
5859
import org.springframework.kafka.event.ConsumerPausedEvent;
5960
import org.springframework.kafka.event.ConsumerResumedEvent;
@@ -544,6 +545,9 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
544545
if (this.consumerAwareListener != null) {
545546
this.consumerAwareListener.onPartitionsRevokedAfterCommit(consumer, partitions);
546547
}
548+
if (ListenerConsumer.this.kafkaTxManager != null) {
549+
closeProducers(partitions);
550+
}
547551
}
548552

549553
@Override
@@ -768,6 +772,9 @@ public void run() {
768772
// No-op. Continue process
769773
}
770774
}
775+
else {
776+
closeProducers(getAssignedPartitions());
777+
}
771778
}
772779
else {
773780
ListenerConsumer.this.logger.error("No offset and no reset policy; stopping container");
@@ -1019,8 +1026,8 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
10191026
this.logger.trace("Processing " + record);
10201027
}
10211028
try {
1022-
TransactionSupport.setTransactionIdSuffix(
1023-
this.consumerGroupId + "." + record.topic() + "." + record.partition());
1029+
TransactionSupport
1030+
.setTransactionIdSuffix(zombieFenceTxIdSuffix(record.topic(), record.partition()));
10241031
this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {
10251032

10261033
@Override
@@ -1395,6 +1402,23 @@ public String toString() {
13951402
+ "]";
13961403
}
13971404

1405+
private void closeProducers(Collection<TopicPartition> partitions) {
1406+
ProducerFactory<?, ?> producerFactory = this.kafkaTxManager.getProducerFactory();
1407+
partitions.forEach(tp -> {
1408+
try {
1409+
producerFactory.closeProducerFor(zombieFenceTxIdSuffix(tp.topic(), tp.partition()));
1410+
}
1411+
catch (Exception e) {
1412+
this.logger.error("Failed to close producer with transaction id suffix: "
1413+
+ zombieFenceTxIdSuffix(tp.topic(), tp.partition()), e);
1414+
}
1415+
});
1416+
}
1417+
1418+
private String zombieFenceTxIdSuffix(String topic, int partition) {
1419+
return this.consumerGroupId + "." + topic + "." + partition;
1420+
}
1421+
13981422
private final class ConsumerAcknowledgment implements Acknowledgment {
13991423

14001424
private final ConsumerRecord<K, V> record;

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,9 +453,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
453453
assertThat(records.count()).isEqualTo(0);
454454
assertThat(consumer.position(new TopicPartition(topic1, 0))).isEqualTo(1);
455455
assertThat(transactionalId.get()).startsWith("rr.group.txTopic");
456+
assertThat(KafkaTestUtils.getPropertyValue(pf, "consumerProducers", Map.class)).isEmpty();
456457
logger.info("Stop testRollbackRecord");
457458
pf.destroy();
458-
assertThat(KafkaTestUtils.getPropertyValue(pf, "consumerProducers", Map.class)).isEmpty();
459459
consumer.close();
460460
}
461461

0 commit comments

Comments
 (0)