Skip to content

Commit 3a86d8d

Browse files
garyrussellartembilan
authored andcommitted
Don't cache tx producers after reset()
Producers were incorrectly returned to the cache after a `reset()`. **I will back-port; conflicts expected**
1 parent 2f2010d commit 3a86d8d

File tree

3 files changed

+52
-18
lines changed

3 files changed

+52
-18
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,6 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
124124

125125
private final ThreadLocal<CloseSafeProducer<K, V>> threadBoundProducers = new ThreadLocal<>();
126126

127-
private final ThreadLocal<Integer> threadBoundProducerEpochs = new ThreadLocal<>();
128-
129127
private final AtomicInteger epoch = new AtomicInteger();
130128

131129
private final AtomicInteger clientIdCounter = new AtomicInteger();
@@ -543,7 +541,7 @@ private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
543541
}
544542
if (this.producer == null) {
545543
this.producer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
546-
this.physicalCloseTimeout, this.beanName);
544+
this.physicalCloseTimeout, this.beanName, this.epoch.get());
547545
this.listeners.forEach(listener -> listener.producerAdded(this.producer.clientId, this.producer));
548546
}
549547
return this.producer;
@@ -552,22 +550,17 @@ private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
552550

553551
private Producer<K, V> getOrCreateThreadBoundProducer() {
554552
CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
555-
if (this.threadBoundProducerEpochs.get() == null) {
556-
this.threadBoundProducerEpochs.set(this.epoch.get());
557-
}
558-
if (tlProducer != null
559-
&& (this.epoch.get() != this.threadBoundProducerEpochs.get() || expire(tlProducer))) {
553+
if (tlProducer != null && (this.epoch.get() != tlProducer.epoch || expire(tlProducer))) {
560554
closeThreadBoundProducer();
561555
tlProducer = null;
562556
}
563557
if (tlProducer == null) {
564558
tlProducer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
565-
this.physicalCloseTimeout, this.beanName);
559+
this.physicalCloseTimeout, this.beanName, this.epoch.get());
566560
for (Listener<K, V> listener : this.listeners) {
567561
listener.producerAdded(tlProducer.clientId, tlProducer);
568562
}
569563
this.threadBoundProducers.set(tlProducer);
570-
this.threadBoundProducerEpochs.set(this.epoch.get());
571564
}
572565
return tlProducer;
573566
}
@@ -699,7 +692,9 @@ boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout
699692
else {
700693
synchronized (this.cache) {
701694
BlockingQueue<CloseSafeProducer<K, V>> txIdCache = getCache(producerToRemove.txIdPrefix);
702-
if (txIdCache != null && !txIdCache.contains(producerToRemove) && !txIdCache.offer(producerToRemove)) {
695+
if (producerToRemove.epoch != this.epoch.get()
696+
|| (txIdCache != null && !txIdCache.contains(producerToRemove)
697+
&& !txIdCache.offer(producerToRemove))) {
703698
producerToRemove.closeDelegate(timeout, this.listeners);
704699
return true;
705700
}
@@ -723,7 +718,8 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
723718
newProducer = createRawProducer(newProducerConfigs);
724719
newProducer.initTransactions();
725720
CloseSafeProducer<K, V> closeSafeProducer =
726-
new CloseSafeProducer<>(newProducer, remover, prefix, this.physicalCloseTimeout, this.beanName);
721+
new CloseSafeProducer<>(newProducer, remover, prefix, this.physicalCloseTimeout, this.beanName,
722+
this.epoch.get());
727723
this.listeners.forEach(listener -> listener.producerAdded(closeSafeProducer.clientId, closeSafeProducer));
728724
return closeSafeProducer;
729725
}
@@ -799,20 +795,22 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
799795

800796
final String clientId; // NOSONAR
801797

798+
final int epoch; // NOSONAR
799+
802800
private volatile Exception producerFailed;
803801

804802
volatile boolean closed; // NOSONAR
805803

806804
CloseSafeProducer(Producer<K, V> delegate,
807805
BiPredicate<CloseSafeProducer<K, V>, Duration> removeConsumerProducer, Duration closeTimeout,
808-
String factoryName) {
806+
String factoryName, int epoch) {
809807

810-
this(delegate, removeConsumerProducer, null, closeTimeout, factoryName);
808+
this(delegate, removeConsumerProducer, null, closeTimeout, factoryName, epoch);
811809
}
812810

813811
CloseSafeProducer(Producer<K, V> delegate,
814812
BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer, @Nullable String txIdPrefix,
815-
Duration closeTimeout, String factoryName) {
813+
Duration closeTimeout, String factoryName, int epoch) {
816814

817815
Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer");
818816
this.delegate = delegate;
@@ -830,6 +828,7 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
830828
}
831829
this.clientId = factoryName + "." + id;
832830
this.created = System.currentTimeMillis();
831+
this.epoch = epoch;
833832
LOGGER.debug(() -> "Created new Producer: " + this);
834833
}
835834

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,41 @@ protected Producer createRawProducer(Map configs) {
186186
verify(producer).close(any(Duration.class));
187187
}
188188

189+
@Test
190+
@SuppressWarnings({ "rawtypes", "unchecked" })
191+
void dontReturnToCacheAfterReset() {
192+
final Producer producer = mock(Producer.class);
193+
ApplicationContext ctx = mock(ApplicationContext.class);
194+
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
195+
196+
@Override
197+
protected Producer createRawProducer(Map configs) {
198+
return producer;
199+
}
200+
201+
};
202+
pf.setApplicationContext(ctx);
203+
pf.setTransactionIdPrefix("foo");
204+
Producer aProducer = pf.createProducer();
205+
assertThat(aProducer).isNotNull();
206+
aProducer.close();
207+
Producer bProducer = pf.createProducer();
208+
assertThat(bProducer).isSameAs(aProducer);
209+
bProducer.close();
210+
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull();
211+
Map<?, ?> cache = KafkaTestUtils.getPropertyValue(pf, "cache", Map.class);
212+
assertThat(cache.size()).isEqualTo(1);
213+
Queue queue = (Queue) cache.get("foo");
214+
assertThat(queue.size()).isEqualTo(1);
215+
bProducer = pf.createProducer();
216+
assertThat(bProducer).isSameAs(aProducer);
217+
assertThat(queue.size()).isEqualTo(0);
218+
pf.reset();
219+
bProducer.close();
220+
assertThat(queue.size()).isEqualTo(0);
221+
pf.destroy();
222+
}
223+
189224
@Test
190225
@SuppressWarnings({ "rawtypes", "unchecked" })
191226
void testThreadLocal() throws InterruptedException {

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ public Producer<String, String> createProducer(String txIdPrefixArg) {
388388
prod.closeDelegate(timeout, Collections.emptyList());
389389
return true;
390390
},
391-
Duration.ofSeconds(1), "factory");
391+
Duration.ofSeconds(1), "factory", 0);
392392
return closeSafeProducer;
393393
}
394394

@@ -422,14 +422,14 @@ public Producer<String, String> createProducer(String txIdPrefixArg) {
422422
BlockingQueue<CloseSafeProducer<String, String>> cache = new LinkedBlockingDeque<>(1);
423423
try {
424424
cache.put(new CloseSafeProducer<>(mock(Producer.class), this::removeProducer,
425-
Duration.ofSeconds(1), "factory"));
425+
Duration.ofSeconds(1), "factory", 0));
426426
}
427427
catch (@SuppressWarnings("unused") InterruptedException e) {
428428
Thread.currentThread().interrupt();
429429
}
430430
KafkaTestUtils.getPropertyValue(this, "cache", Map.class).put("foo", cache);
431431
CloseSafeProducer<String, String> closeSafeProducer = new CloseSafeProducer<>(producer,
432-
this::cacheReturner, "foo", Duration.ofSeconds(1), "factory");
432+
this::cacheReturner, "foo", Duration.ofSeconds(1), "factory", 0);
433433
return closeSafeProducer;
434434
}
435435

0 commit comments

Comments
 (0)