Skip to content

Commit 7ec2ee7

Browse files
garyrussellartembilan
authored andcommitted
Close thread bound producers after reset
**cherry-pick to 2.4.x, 2.3.x**
1 parent ecdd058 commit 7ec2ee7

File tree

2 files changed

+42
-0
lines changed

2 files changed

+42
-0
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
123123

124124
private final ThreadLocal<CloseSafeProducer<K, V>> threadBoundProducers = new ThreadLocal<>();
125125

126+
private final ThreadLocal<Integer> threadBoundProducerEpochs = new ThreadLocal<>();
127+
128+
private final AtomicInteger epoch = new AtomicInteger();
129+
126130
private final AtomicInteger clientIdCounter = new AtomicInteger();
127131

128132
private final List<Listener<K, V>> listeners = new ArrayList<>();
@@ -418,6 +422,7 @@ public void destroy() {
418422
(k, v) -> v.closeDelegate(this.physicalCloseTimeout, this.listeners));
419423
this.consumerProducers.clear();
420424
}
425+
this.epoch.incrementAndGet();
421426
}
422427

423428
@Override
@@ -469,13 +474,21 @@ private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
469474
}
470475
if (this.producerPerThread) {
471476
CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
477+
if (this.threadBoundProducerEpochs.get() == null) {
478+
this.threadBoundProducerEpochs.set(this.epoch.get());
479+
}
480+
if (tlProducer != null && this.epoch.get() != this.threadBoundProducerEpochs.get()) {
481+
closeThreadBoundProducer();
482+
tlProducer = null;
483+
}
472484
if (tlProducer == null) {
473485
tlProducer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
474486
this.physicalCloseTimeout, this.beanName);
475487
for (Listener<K, V> listener : this.listeners) {
476488
listener.producerAdded(tlProducer.clientId, tlProducer);
477489
}
478490
this.threadBoundProducers.set(tlProducer);
491+
this.threadBoundProducerEpochs.set(this.epoch.get());
479492
}
480493
return tlProducer;
481494
}

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,35 @@ protected Producer createKafkaProducer() {
200200
verify(producer).close(any(Duration.class));
201201
}
202202

203+
@Test
204+
@SuppressWarnings({ "rawtypes", "unchecked" })
205+
void testThreadLocalReset() {
206+
Producer producer1 = mock(Producer.class);
207+
Producer producer2 = mock(Producer.class);
208+
ProducerFactory mockPf = mock(ProducerFactory.class);
209+
given(mockPf.createProducer()).willReturn(producer1, producer2);
210+
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
211+
212+
@Override
213+
protected Producer createKafkaProducer() {
214+
return mockPf.createProducer();
215+
}
216+
217+
};
218+
pf.setProducerPerThread(true);
219+
Producer aProducer = pf.createProducer();
220+
assertThat(aProducer).isNotNull();
221+
aProducer.close();
222+
Producer bProducer = pf.createProducer();
223+
assertThat(bProducer).isSameAs(aProducer);
224+
bProducer.close();
225+
pf.reset();
226+
bProducer = pf.createProducer();
227+
assertThat(bProducer).isNotSameAs(aProducer);
228+
bProducer.close();
229+
verify(producer1).close(any(Duration.class));
230+
}
231+
203232
@Test
204233
@SuppressWarnings({ "rawtypes", "unchecked" })
205234
void testCleanUpAfterTxFence() {

0 commit comments

Comments
 (0)