Skip to content

Commit 0a140eb

Browse files
garyrussellartembilan
authored andcommitted
GH-908: Don't cache dedicated consumer producers
Fixes #908 Zombie-fenced producers, running on container threads with topic/partition/group transactional ids must not be added to he general producer cache for use by other arbitrary producer operations. These producers are maintained in their own cache, keyed by the transactional id. Add tests to verify these producers are not cached and that a producer used within a nested transaction is added to the cache. **cherry-pick to 2.1.x, 2.0.x; backport PR will be published for 1.3.x after review/merge**
1 parent 9e3bd8c commit 0a140eb

File tree

2 files changed

+179
-19
lines changed

2 files changed

+179
-19
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 64 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ Producer<K, V> createTransactionalProducerForPartition() {
264264
else {
265265
synchronized (this.consumerProducers) {
266266
if (!this.consumerProducers.containsKey(suffix)) {
267-
CloseSafeProducer<K, V> newProducer = doCreateTxProducer(suffix);
267+
CloseSafeProducer<K, V> newProducer = doCreateTxProducer(suffix, true);
268268
this.consumerProducers.put(suffix, newProducer);
269269
return newProducer;
270270
}
@@ -284,20 +284,22 @@ Producer<K, V> createTransactionalProducerForPartition() {
284284
protected Producer<K, V> createTransactionalProducer() {
285285
Producer<K, V> producer = this.cache.poll();
286286
if (producer == null) {
287-
return doCreateTxProducer("" + this.transactionIdSuffix.getAndIncrement());
287+
return doCreateTxProducer("" + this.transactionIdSuffix.getAndIncrement(), false);
288288
}
289289
else {
290290
return producer;
291291
}
292292
}
293293

294-
private CloseSafeProducer<K, V> doCreateTxProducer(String suffix) {
294+
private CloseSafeProducer<K, V> doCreateTxProducer(String suffix, boolean isConsumerProducer) {
295295
Producer<K, V> producer;
296296
Map<String, Object> configs = new HashMap<>(this.configs);
297297
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionIdPrefix + suffix);
298298
producer = new KafkaProducer<K, V>(configs, this.keySerializer, this.valueSerializer);
299299
producer.initTransactions();
300-
return new CloseSafeProducer<K, V>(producer, this.cache, this.consumerProducers);
300+
return new CloseSafeProducer<K, V>(producer, this.cache,
301+
isConsumerProducer ? this.consumerProducers : null,
302+
(String) configs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
301303
}
302304

303305
protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
@@ -330,6 +332,8 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
330332

331333
private final Map<String, CloseSafeProducer<K, V>> consumerProducers;
332334

335+
private final String txId;
336+
333337
private volatile boolean txFailed;
334338

335339
CloseSafeProducer(Producer<K, V> delegate) {
@@ -343,9 +347,17 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
343347

344348
CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache,
345349
Map<String, CloseSafeProducer<K, V>> consumerProducers) {
350+
351+
this(delegate, cache, consumerProducers, null);
352+
}
353+
354+
CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache,
355+
Map<String, CloseSafeProducer<K, V>> consumerProducers, String txId) {
356+
346357
this.delegate = delegate;
347358
this.cache = cache;
348359
this.consumerProducers = consumerProducers;
360+
this.txId = txId;
349361
}
350362

351363
@Override
@@ -380,10 +392,16 @@ public void initTransactions() {
380392

381393
@Override
382394
public void beginTransaction() throws ProducerFencedException {
395+
if (logger.isDebugEnabled()) {
396+
logger.debug("beginTransaction: " + this);
397+
}
383398
try {
384399
this.delegate.beginTransaction();
385400
}
386401
catch (RuntimeException e) {
402+
if (logger.isErrorEnabled()) {
403+
logger.error("beginTransaction failed: " + this, e);
404+
}
387405
this.txFailed = true;
388406
throw e;
389407
}
@@ -392,58 +410,85 @@ public void beginTransaction() throws ProducerFencedException {
392410
@Override
393411
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId)
394412
throws ProducerFencedException {
413+
395414
this.delegate.sendOffsetsToTransaction(offsets, consumerGroupId);
396415
}
397416

398417
@Override
399418
public void commitTransaction() throws ProducerFencedException {
419+
if (logger.isDebugEnabled()) {
420+
logger.debug("commitTransaction: " + this);
421+
}
400422
try {
401423
this.delegate.commitTransaction();
402424
}
403425
catch (RuntimeException e) {
426+
if (logger.isErrorEnabled()) {
427+
logger.error("commitTransaction failed: " + this, e);
428+
}
404429
this.txFailed = true;
405430
throw e;
406431
}
407432
}
408433

409434
@Override
410435
public void abortTransaction() throws ProducerFencedException {
436+
if (logger.isDebugEnabled()) {
437+
logger.debug("abortTransaction: " + this);
438+
}
411439
try {
412440
this.delegate.abortTransaction();
413441
}
414442
catch (RuntimeException e) {
443+
if (logger.isErrorEnabled()) {
444+
logger.error("Abort failed: " + this, e);
445+
}
415446
this.txFailed = true;
416447
throw e;
417448
}
418449
}
419450

420451
@Override
421452
public void close() {
453+
close(0, null);
454+
}
455+
456+
@Override
457+
public void close(long timeout, TimeUnit unit) {
422458
if (this.cache != null) {
423459
if (this.txFailed) {
424-
logger.warn("Error during transactional operation; producer removed from cache; possible cause: "
425-
+ "broker restarted during transaction");
426-
427-
this.delegate.close();
460+
if (logger.isWarnEnabled()) {
461+
logger.warn("Error during transactional operation; producer removed from cache; possible cause: "
462+
+ "broker restarted during transaction: " + this);
463+
}
464+
if (unit == null) {
465+
this.delegate.close();
466+
}
467+
else {
468+
this.delegate.close(timeout, unit);
469+
}
428470
if (this.consumerProducers != null) {
429471
removeConsumerProducer();
430472
}
431473
}
432474
else {
433-
synchronized (this) {
434-
if (!this.cache.contains(this)) {
435-
this.cache.offer(this);
475+
if (this.consumerProducers == null) { // dedicated consumer producers are not cached
476+
synchronized (this) {
477+
if (!this.cache.contains(this)
478+
&& !this.cache.offer(this)) {
479+
if (unit == null) {
480+
this.delegate.close();
481+
}
482+
else {
483+
this.delegate.close(timeout, unit);
484+
}
485+
}
436486
}
437487
}
438488
}
439489
}
440490
}
441491

442-
@Override
443-
public void close(long timeout, TimeUnit unit) {
444-
close();
445-
}
446-
447492
private void removeConsumerProducer() {
448493
synchronized (this.consumerProducers) {
449494
Iterator<Entry<String, CloseSafeProducer<K, V>>> iterator = this.consumerProducers.entrySet()
@@ -459,7 +504,9 @@ private void removeConsumerProducer() {
459504

460505
@Override
461506
public String toString() {
462-
return "CloseSafeProducer [delegate=" + this.delegate + "]";
507+
return "CloseSafeProducer [delegate=" + this.delegate + ""
508+
+ (this.txId != null ? ", txId=" + this.txId : "")
509+
+ "]";
463510
}
464511

465512
}

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java

Lines changed: 115 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,22 +20,52 @@
2020

2121
import java.util.Collections;
2222
import java.util.Map;
23+
import java.util.concurrent.BlockingQueue;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.TimeUnit;
2326

2427
import org.apache.kafka.clients.consumer.ConsumerConfig;
2528
import org.apache.kafka.clients.consumer.KafkaConsumer;
29+
import org.apache.kafka.clients.producer.ProducerConfig;
2630
import org.junit.Test;
31+
import org.junit.runner.RunWith;
32+
33+
import org.springframework.beans.factory.annotation.Autowired;
34+
import org.springframework.context.annotation.Configuration;
35+
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
36+
import org.springframework.kafka.listener.MessageListener;
37+
import org.springframework.kafka.listener.config.ContainerProperties;
38+
import org.springframework.kafka.support.SendResult;
39+
import org.springframework.kafka.test.context.EmbeddedKafka;
40+
import org.springframework.kafka.test.rule.KafkaEmbedded;
41+
import org.springframework.kafka.test.utils.KafkaTestUtils;
42+
import org.springframework.kafka.transaction.KafkaTransactionManager;
43+
import org.springframework.test.annotation.DirtiesContext;
44+
import org.springframework.test.context.junit4.SpringRunner;
45+
import org.springframework.util.concurrent.ListenableFuture;
2746

2847
/**
2948
* @author Gary Russell
3049
* @since 1.0.6
3150
*
3251
*/
52+
@EmbeddedKafka(topics = { "txCache1", "txCache2", "txCacheSendFromListener" },
53+
brokerProperties = {
54+
"transaction.state.log.replication.factor=1",
55+
"transaction.state.log.min.isr=1" }
56+
)
57+
@RunWith(SpringRunner.class)
58+
@DirtiesContext
3359
public class DefaultKafkaConsumerFactoryTests {
3460

61+
@Autowired
62+
private KafkaEmbedded embeddedKafka;
63+
3564
@Test
3665
public void testClientId() {
3766
Map<String, Object> configs = Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, "foo");
38-
DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<String, String>(configs) {
67+
DefaultKafkaConsumerFactory<String, String> factory =
68+
new DefaultKafkaConsumerFactory<String, String>(configs) {
3969

4070
@Override
4171
protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object> configs) {
@@ -47,4 +77,87 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
4777
factory.createConsumer("-1");
4878
}
4979

80+
@SuppressWarnings("unchecked")
81+
@Test
82+
public void testNestedTxProducerIsCached() throws Exception {
83+
Map<String, Object> producerProps = KafkaTestUtils.producerProps(this.embeddedKafka);
84+
producerProps.put(ProducerConfig.RETRIES_CONFIG, 1);
85+
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(producerProps);
86+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
87+
DefaultKafkaProducerFactory<Integer, String> pfTx = new DefaultKafkaProducerFactory<>(producerProps);
88+
pfTx.setTransactionIdPrefix("fooTx.");
89+
KafkaTemplate<Integer, String> templateTx = new KafkaTemplate<>(pfTx);
90+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka);
91+
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
92+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
93+
ContainerProperties containerProps = new ContainerProperties("txCache1");
94+
CountDownLatch latch = new CountDownLatch(1);
95+
containerProps.setMessageListener((MessageListener<Integer, String>) r -> {
96+
templateTx.executeInTransaction(t -> t.send("txCacheSendFromListener", "bar"));
97+
templateTx.executeInTransaction(t -> t.send("txCacheSendFromListener", "baz"));
98+
latch.countDown();
99+
});
100+
KafkaTransactionManager<Integer, String> tm = new KafkaTransactionManager<>(pfTx);
101+
containerProps.setTransactionManager(tm);
102+
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
103+
containerProps);
104+
container.start();
105+
try {
106+
ListenableFuture<SendResult<Integer, String>> future = template.send("txCache1", "foo");
107+
future.get();
108+
assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class)).hasSize(0);
109+
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
110+
assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", BlockingQueue.class)).hasSize(1);
111+
}
112+
finally {
113+
container.stop();
114+
pf.destroy();
115+
pfTx.destroy();
116+
}
117+
}
118+
119+
@SuppressWarnings("unchecked")
120+
@Test
121+
public void testContainerTxProducerIsNotCached() throws Exception {
122+
Map<String, Object> producerProps = KafkaTestUtils.producerProps(this.embeddedKafka);
123+
producerProps.put(ProducerConfig.RETRIES_CONFIG, 1);
124+
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(producerProps);
125+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
126+
DefaultKafkaProducerFactory<Integer, String> pfTx = new DefaultKafkaProducerFactory<>(producerProps);
127+
pfTx.setTransactionIdPrefix("fooTx.");
128+
KafkaTemplate<Integer, String> templateTx = new KafkaTemplate<>(pfTx);
129+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("txCache2Group", "false", this.embeddedKafka);
130+
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
131+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
132+
ContainerProperties containerProps = new ContainerProperties("txCache2");
133+
CountDownLatch latch = new CountDownLatch(1);
134+
containerProps.setMessageListener((MessageListener<Integer, String>) r -> {
135+
templateTx.send("txCacheSendFromListener", "bar");
136+
templateTx.send("txCacheSendFromListener", "baz");
137+
latch.countDown();
138+
});
139+
KafkaTransactionManager<Integer, String> tm = new KafkaTransactionManager<>(pfTx);
140+
containerProps.setTransactionManager(tm);
141+
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
142+
containerProps);
143+
container.start();
144+
try {
145+
ListenableFuture<SendResult<Integer, String>> future = template.send("txCache2", "foo");
146+
future.get();
147+
assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class)).hasSize(0);
148+
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
149+
assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", BlockingQueue.class)).hasSize(0);
150+
}
151+
finally {
152+
container.stop();
153+
pf.destroy();
154+
pfTx.destroy();
155+
}
156+
}
157+
158+
@Configuration
159+
public static class Config {
160+
161+
}
162+
50163
}

0 commit comments

Comments
 (0)