Skip to content

Commit fa8cbeb

Browse files
committed
Don't cache tx producers after reset()
Producers were incorrectly returned to the cache after a `reset()`. **I will back-port; conflicts expected**
1 parent a306c58 commit fa8cbeb

File tree

3 files changed

+68
-30
lines changed

3 files changed

+68
-30
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,6 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
124124

125125
private final ThreadLocal<CloseSafeProducer<K, V>> threadBoundProducers = new ThreadLocal<>();
126126

127-
private final ThreadLocal<Integer> threadBoundProducerEpochs = new ThreadLocal<>();
128-
129127
private final AtomicInteger epoch = new AtomicInteger();
130128

131129
private final AtomicInteger clientIdCounter = new AtomicInteger();
@@ -534,39 +532,38 @@ private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
534532
}
535533
}
536534
if (this.producerPerThread) {
537-
CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
538-
if (this.threadBoundProducerEpochs.get() == null) {
539-
this.threadBoundProducerEpochs.set(this.epoch.get());
540-
}
541-
if (tlProducer != null
542-
&& (this.epoch.get() != this.threadBoundProducerEpochs.get() || expire(tlProducer))) {
543-
closeThreadBoundProducer();
544-
tlProducer = null;
545-
}
546-
if (tlProducer == null) {
547-
tlProducer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
548-
this.physicalCloseTimeout, this.beanName);
549-
for (Listener<K, V> listener : this.listeners) {
550-
listener.producerAdded(tlProducer.clientId, tlProducer);
551-
}
552-
this.threadBoundProducers.set(tlProducer);
553-
this.threadBoundProducerEpochs.set(this.epoch.get());
554-
}
555-
return tlProducer;
535+
return getOrCreateThreadBoundProducer();
556536
}
557537
synchronized (this) {
558538
if (this.producer != null && expire(this.producer)) {
559539
this.producer = null;
560540
}
561541
if (this.producer == null) {
562542
this.producer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
563-
this.physicalCloseTimeout, this.beanName);
543+
this.physicalCloseTimeout, this.beanName, this.epoch.get());
564544
this.listeners.forEach(listener -> listener.producerAdded(this.producer.clientId, this.producer));
565545
}
566546
return this.producer;
567547
}
568548
}
569549

550+
private Producer<K, V> getOrCreateThreadBoundProducer() {
551+
CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
552+
if (tlProducer != null && (this.epoch.get() != tlProducer.epoch || expire(tlProducer))) {
553+
closeThreadBoundProducer();
554+
tlProducer = null;
555+
}
556+
if (tlProducer == null) {
557+
tlProducer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
558+
this.physicalCloseTimeout, this.beanName, this.epoch.get());
559+
for (Listener<K, V> listener : this.listeners) {
560+
listener.producerAdded(tlProducer.clientId, tlProducer);
561+
}
562+
this.threadBoundProducers.set(tlProducer);
563+
}
564+
return tlProducer;
565+
}
566+
570567
/**
571568
* Subclasses must return a raw producer which will be wrapped in a {@link CloseSafeProducer}.
572569
* @return the producer.
@@ -694,7 +691,9 @@ boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout
694691
else {
695692
synchronized (this.cache) {
696693
BlockingQueue<CloseSafeProducer<K, V>> txIdCache = getCache(producerToRemove.txIdPrefix);
697-
if (txIdCache != null && !txIdCache.contains(producerToRemove) && !txIdCache.offer(producerToRemove)) {
694+
if (producerToRemove.epoch != this.epoch.get()
695+
|| (txIdCache != null && !txIdCache.contains(producerToRemove)
696+
&& !txIdCache.offer(producerToRemove))) {
698697
producerToRemove.closeDelegate(timeout, this.listeners);
699698
return true;
700699
}
@@ -718,7 +717,8 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
718717
newProducer = createRawProducer(newProducerConfigs);
719718
newProducer.initTransactions();
720719
CloseSafeProducer<K, V> closeSafeProducer =
721-
new CloseSafeProducer<>(newProducer, remover, prefix, this.physicalCloseTimeout, this.beanName);
720+
new CloseSafeProducer<>(newProducer, remover, prefix, this.physicalCloseTimeout, this.beanName,
721+
this.epoch.get());
722722
this.listeners.forEach(listener -> listener.producerAdded(closeSafeProducer.clientId, closeSafeProducer));
723723
return closeSafeProducer;
724724
}
@@ -794,20 +794,22 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
794794

795795
final String clientId; // NOSONAR
796796

797+
final int epoch; // NOSONAR
798+
797799
private volatile Exception producerFailed;
798800

799801
volatile boolean closed; // NOSONAR
800802

801803
CloseSafeProducer(Producer<K, V> delegate,
802804
BiPredicate<CloseSafeProducer<K, V>, Duration> removeConsumerProducer, Duration closeTimeout,
803-
String factoryName) {
805+
String factoryName, int epoch) {
804806

805-
this(delegate, removeConsumerProducer, null, closeTimeout, factoryName);
807+
this(delegate, removeConsumerProducer, null, closeTimeout, factoryName, epoch);
806808
}
807809

808810
CloseSafeProducer(Producer<K, V> delegate,
809811
BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer, @Nullable String txIdPrefix,
810-
Duration closeTimeout, String factoryName) {
812+
Duration closeTimeout, String factoryName, int epoch) {
811813

812814
Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer");
813815
this.delegate = delegate;
@@ -825,6 +827,7 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
825827
}
826828
this.clientId = factoryName + "." + id;
827829
this.created = System.currentTimeMillis();
830+
this.epoch = epoch;
828831
LOGGER.debug(() -> "Created new Producer: " + this);
829832
}
830833

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,41 @@ protected Producer createRawProducer(Map configs) {
186186
verify(producer).close(any(Duration.class));
187187
}
188188

189+
@Test
190+
@SuppressWarnings({ "rawtypes", "unchecked" })
191+
void dontReturnToCacheAfterReset() {
192+
final Producer producer = mock(Producer.class);
193+
ApplicationContext ctx = mock(ApplicationContext.class);
194+
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
195+
196+
@Override
197+
protected Producer createRawProducer(Map configs) {
198+
return producer;
199+
}
200+
201+
};
202+
pf.setApplicationContext(ctx);
203+
pf.setTransactionIdPrefix("foo");
204+
Producer aProducer = pf.createProducer();
205+
assertThat(aProducer).isNotNull();
206+
aProducer.close();
207+
Producer bProducer = pf.createProducer();
208+
assertThat(bProducer).isSameAs(aProducer);
209+
bProducer.close();
210+
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull();
211+
Map<?, ?> cache = KafkaTestUtils.getPropertyValue(pf, "cache", Map.class);
212+
assertThat(cache.size()).isEqualTo(1);
213+
Queue queue = (Queue) cache.get("foo");
214+
assertThat(queue.size()).isEqualTo(1);
215+
bProducer = pf.createProducer();
216+
assertThat(bProducer).isSameAs(aProducer);
217+
assertThat(queue.size()).isEqualTo(0);
218+
pf.reset();
219+
bProducer.close();
220+
assertThat(queue.size()).isEqualTo(0);
221+
pf.destroy();
222+
}
223+
189224
@Test
190225
@SuppressWarnings({ "rawtypes", "unchecked" })
191226
void testThreadLocal() throws InterruptedException {

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ public Producer<String, String> createProducer(String txIdPrefixArg) {
388388
prod.closeDelegate(timeout, Collections.emptyList());
389389
return true;
390390
},
391-
Duration.ofSeconds(1), "factory");
391+
Duration.ofSeconds(1), "factory", 0);
392392
return closeSafeProducer;
393393
}
394394

@@ -422,14 +422,14 @@ public Producer<String, String> createProducer(String txIdPrefixArg) {
422422
BlockingQueue<CloseSafeProducer<String, String>> cache = new LinkedBlockingDeque<>(1);
423423
try {
424424
cache.put(new CloseSafeProducer<>(mock(Producer.class), this::removeProducer,
425-
Duration.ofSeconds(1), "factory"));
425+
Duration.ofSeconds(1), "factory", 0));
426426
}
427427
catch (@SuppressWarnings("unused") InterruptedException e) {
428428
Thread.currentThread().interrupt();
429429
}
430430
KafkaTestUtils.getPropertyValue(this, "cache", Map.class).put("foo", cache);
431431
CloseSafeProducer<String, String> closeSafeProducer = new CloseSafeProducer<>(producer,
432-
this::cacheReturner, "foo", Duration.ofSeconds(1), "factory");
432+
this::cacheReturner, "foo", Duration.ofSeconds(1), "factory", 0);
433433
return closeSafeProducer;
434434
}
435435

0 commit comments

Comments
 (0)