Skip to content

Commit 9c9a200

Browse files
garyrussellartembilan
authored andcommitted
GH-834: Remove transactional producers
Resolves #834 To solve the zombie fencing problem there is a producer for each group/topic/partition. Close these producers when a partition is revoked or the container stopped.
1 parent 7517916 commit 9c9a200

File tree

4 files changed

+52
-12
lines changed

4 files changed

+52
-12
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ bin
1111
build
1212
out
1313
target
14+
.DS_Store

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
8383

8484
private final BlockingQueue<CloseSafeProducer<K, V>> cache = new LinkedBlockingQueue<>();
8585

86-
private final Map<String, Producer<K, V>> consumerProducers = new HashMap<>();
86+
private final Map<String, CloseSafeProducer<K, V>> consumerProducers = new HashMap<>();
8787

8888
private volatile CloseSafeProducer<K, V> producer;
8989

@@ -187,8 +187,8 @@ public void destroy() throws Exception { //NOSONAR
187187
producer = this.cache.poll();
188188
}
189189
synchronized (this.consumerProducers) {
190-
for (Entry<String, Producer<K, V>> entry : this.consumerProducers.entrySet()) {
191-
((CloseSafeProducer<K, V>) entry.getValue()).delegate
190+
for (Entry<String, CloseSafeProducer<K, V>> entry : this.consumerProducers.entrySet()) {
191+
entry.getValue().delegate
192192
.close(this.physicalCloseTimeout, TimeUnit.SECONDS);
193193
}
194194
this.consumerProducers.clear();
@@ -255,7 +255,7 @@ private Producer<K, V> createTransactionalProducerForPartition() {
255255
else {
256256
synchronized (this.consumerProducers) {
257257
if (!this.consumerProducers.containsKey(suffix)) {
258-
Producer<K, V> newProducer = doCreateTxProducer(suffix);
258+
CloseSafeProducer<K, V> newProducer = doCreateTxProducer(suffix);
259259
this.consumerProducers.put(suffix, newProducer);
260260
return newProducer;
261261
}
@@ -282,7 +282,7 @@ protected Producer<K, V> createTransactionalProducer() {
282282
}
283283
}
284284

285-
private Producer<K, V> doCreateTxProducer(String suffix) {
285+
private CloseSafeProducer<K, V> doCreateTxProducer(String suffix) {
286286
Producer<K, V> producer;
287287
Map<String, Object> configs = new HashMap<>(this.configs);
288288
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionIdPrefix + suffix);
@@ -295,6 +295,17 @@ protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
295295
return this.cache;
296296
}
297297

298+
public void closeProducerFor(String transactionIdSuffix) {
299+
if (this.producerPerConsumerPartition) {
300+
synchronized (this.consumerProducers) {
301+
CloseSafeProducer<K, V> removed = this.consumerProducers.remove(transactionIdSuffix);
302+
if (removed != null) {
303+
removed.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS);
304+
}
305+
}
306+
}
307+
}
308+
298309
/**
299310
* A wrapper class for the delegate.
300311
*
@@ -308,7 +319,7 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
308319

309320
private final BlockingQueue<CloseSafeProducer<K, V>> cache;
310321

311-
private final Map<String, Producer<K, V>> consumerProducers;
322+
private final Map<String, CloseSafeProducer<K, V>> consumerProducers;
312323

313324
private volatile boolean txFailed;
314325

@@ -322,7 +333,7 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
322333
}
323334

324335
CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache,
325-
Map<String, Producer<K, V>> consumerProducers) {
336+
Map<String, CloseSafeProducer<K, V>> consumerProducers) {
326337
this.delegate = delegate;
327338
this.cache = cache;
328339
this.consumerProducers = consumerProducers;
@@ -426,7 +437,8 @@ public void close(long timeout, TimeUnit unit) {
426437

427438
private void removeConsumerProducer() {
428439
synchronized (this.consumerProducers) {
429-
Iterator<Entry<String, Producer<K, V>>> iterator = this.consumerProducers.entrySet().iterator();
440+
Iterator<Entry<String, CloseSafeProducer<K, V>>> iterator = this.consumerProducers.entrySet()
441+
.iterator();
430442
while (iterator.hasNext()) {
431443
if (iterator.next().getValue().equals(this)) {
432444
iterator.remove();

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,6 +50,7 @@
5050
import org.springframework.core.task.SimpleAsyncTaskExecutor;
5151
import org.springframework.kafka.KafkaException;
5252
import org.springframework.kafka.core.ConsumerFactory;
53+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
5354
import org.springframework.kafka.core.KafkaResourceHolder;
5455
import org.springframework.kafka.core.ProducerFactoryUtils;
5556
import org.springframework.kafka.event.ListenerContainerIdleEvent;
@@ -489,6 +490,9 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
489490
getContainerProperties().getConsumerRebalanceListener().onPartitionsRevoked(partitions);
490491
// Wait until now to commit, in case the user listener added acks
491492
commitPendingAcks();
493+
if (ListenerConsumer.this.kafkaTxManager != null) {
494+
closeProducers(partitions);
495+
}
492496
}
493497

494498
@Override
@@ -680,6 +684,9 @@ public void run() {
680684
// No-op. Continue process
681685
}
682686
}
687+
else {
688+
closeProducers(getAssignedPartitions());
689+
}
683690
}
684691
else {
685692
ListenerConsumer.this.logger.error("No offset and no reset policy; stopping container");
@@ -895,8 +902,8 @@ private void innvokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
895902
this.logger.trace("Processing " + record);
896903
}
897904
try {
898-
TransactionSupport.setTransactionIdSuffix(
899-
this.consumerGroupId + "." + record.topic() + "." + record.partition());
905+
TransactionSupport
906+
.setTransactionIdSuffix(zombieFenceTxIdSuffix(record.topic(), record.partition()));
900907
this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {
901908

902909
@Override
@@ -1228,6 +1235,26 @@ public void seekToEnd(String topic, int partition) {
12281235
this.seeks.add(new TopicPartitionInitialOffset(topic, partition, SeekPosition.END));
12291236
}
12301237

1238+
private void closeProducers(Collection<TopicPartition> partitions) {
1239+
if (this.kafkaTxManager.getProducerFactory() instanceof DefaultKafkaProducerFactory) {
1240+
DefaultKafkaProducerFactory<?, ?> producerFactory =
1241+
(DefaultKafkaProducerFactory<?, ?>) this.kafkaTxManager.getProducerFactory();
1242+
for (TopicPartition tp : partitions) {
1243+
try {
1244+
producerFactory.closeProducerFor(zombieFenceTxIdSuffix(tp.topic(), tp.partition()));
1245+
}
1246+
catch (Exception e) {
1247+
this.logger.error("Failed to close producer with transaction id suffix: "
1248+
+ zombieFenceTxIdSuffix(tp.topic(), tp.partition()), e);
1249+
}
1250+
}
1251+
}
1252+
}
1253+
1254+
private String zombieFenceTxIdSuffix(String topic, int partition) {
1255+
return this.consumerGroupId + "." + topic + "." + partition;
1256+
}
1257+
12311258
private final class ConsumerAcknowledgment implements Acknowledgment {
12321259

12331260
private final ConsumerRecord<K, V> record;

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,9 +443,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
443443
assertThat(records.count()).isEqualTo(0);
444444
assertThat(consumer.position(new TopicPartition(topic1, 0))).isEqualTo(1);
445445
assertThat(transactionalId.get()).startsWith("rr.group.txTopic");
446+
assertThat(KafkaTestUtils.getPropertyValue(pf, "consumerProducers", Map.class)).isEmpty();
446447
logger.info("Stop testRollbackRecord");
447448
pf.destroy();
448-
assertThat(KafkaTestUtils.getPropertyValue(pf, "consumerProducers", Map.class)).isEmpty();
449449
consumer.close();
450450
}
451451

0 commit comments

Comments
 (0)