Skip to content

Commit f195b62

Browse files
garyrussellartembilan
authored andcommitted
GH-1615: Add maxAge to Producers
Resolves #1615 **cherry-pick to 2.5.x** # Conflicts: # src/reference/asciidoc/kafka.adoc # src/reference/asciidoc/whats-new.adoc
1 parent e815ace commit f195b62

File tree

3 files changed

+72
-13
lines changed

3 files changed

+72
-13
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
152152

153153
private String clientIdPrefix;
154154

155+
private long maxAge;
156+
155157
private volatile CloseSafeProducer<K, V> producer;
156158

157159
/**
@@ -349,6 +351,16 @@ public List<ProducerPostProcessor<K, V>> getPostProcessors() {
349351
return Collections.unmodifiableList(this.postProcessors);
350352
}
351353

354+
/**
355+
* Set the maximum age for a producer; useful when using transactions and the broker
356+
* might expire a {@code transactional.id} due to inactivity.
357+
* @param maxAge the maxAge to set
358+
* @since 2.5.8
359+
*/
360+
public void setMaxAge(Duration maxAge) {
361+
this.maxAge = maxAge.toMillis();
362+
}
363+
352364
/**
353365
* Add a listener.
354366
* @param listener the listener.
@@ -498,7 +510,8 @@ private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
498510
if (this.threadBoundProducerEpochs.get() == null) {
499511
this.threadBoundProducerEpochs.set(this.epoch.get());
500512
}
501-
if (tlProducer != null && this.epoch.get() != this.threadBoundProducerEpochs.get()) {
513+
if (tlProducer != null
514+
&& (this.epoch.get() != this.threadBoundProducerEpochs.get() || expire(tlProducer))) {
502515
closeThreadBoundProducer();
503516
tlProducer = null;
504517
}
@@ -514,6 +527,9 @@ private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
514527
return tlProducer;
515528
}
516529
synchronized (this) {
530+
if (this.producer != null && expire(this.producer)) {
531+
this.producer = null;
532+
}
517533
if (this.producer == null) {
518534
this.producer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
519535
this.physicalCloseTimeout, this.beanName);
@@ -552,14 +568,15 @@ protected Producer<K, V> createTransactionalProducerForPartition(String txIdPref
552568
}
553569
else {
554570
synchronized (this.consumerProducers) {
555-
if (!this.consumerProducers.containsKey(suffix)) {
571+
CloseSafeProducer<K, V> consumerProducer = this.consumerProducers.get(suffix);
572+
if (consumerProducer == null || expire(consumerProducer)) {
556573
CloseSafeProducer<K, V> newProducer = doCreateTxProducer(txIdPrefix, suffix,
557574
this::removeConsumerProducer);
558575
this.consumerProducers.put(suffix, newProducer);
559576
return newProducer;
560577
}
561578
else {
562-
return this.consumerProducers.get(suffix);
579+
return consumerProducer;
563580
}
564581
}
565582
}
@@ -616,7 +633,15 @@ protected Producer<K, V> createTransactionalProducer() {
616633
protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
617634
BlockingQueue<CloseSafeProducer<K, V>> queue = getCache(txIdPrefix);
618635
Assert.notNull(queue, () -> "No cache found for " + txIdPrefix);
619-
Producer<K, V> cachedProducer = queue.poll();
636+
CloseSafeProducer<K, V> cachedProducer = queue.poll();
637+
while (cachedProducer != null) {
638+
if (expire(cachedProducer)) {
639+
cachedProducer = queue.poll();
640+
}
641+
else {
642+
break;
643+
}
644+
}
620645
if (cachedProducer == null) {
621646
return doCreateTxProducer(txIdPrefix, "" + this.transactionIdSuffix.getAndIncrement(), this::cacheReturner);
622647
}
@@ -625,6 +650,14 @@ protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
625650
}
626651
}
627652

653+
private boolean expire(CloseSafeProducer<K, V> producer) {
654+
boolean expired = this.maxAge > 0 && System.currentTimeMillis() - producer.created > this.maxAge;
655+
if (expired) {
656+
producer.closeDelegate(this.physicalCloseTimeout, this.listeners);
657+
}
658+
return expired;
659+
}
660+
628661
boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout) {
629662
if (producerToRemove.closed) {
630663
producerToRemove.closeDelegate(timeout, this.listeners);
@@ -727,6 +760,8 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
727760

728761
final String txIdPrefix; // NOSONAR
729762

763+
final long created;
764+
730765
private final Duration closeTimeout;
731766

732767
final String clientId; // NOSONAR
@@ -761,6 +796,7 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
761796
id = "unknown";
762797
}
763798
this.clientId = factoryName + "." + id;
799+
this.created = System.currentTimeMillis();
764800
LOGGER.debug(() -> "Created new Producer: " + this);
765801
}
766802

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.mockito.BDDMockito.willThrow;
2626
import static org.mockito.Mockito.inOrder;
2727
import static org.mockito.Mockito.mock;
28+
import static org.mockito.Mockito.times;
2829
import static org.mockito.Mockito.verify;
2930

3031
import java.time.Duration;
@@ -122,9 +123,9 @@ protected Producer createRawProducer(Map configs) {
122123

123124
@Test
124125
@SuppressWarnings({ "rawtypes", "unchecked" })
125-
void testResetSingle() {
126+
void testResetSingle() throws InterruptedException {
126127
final Producer producer = mock(Producer.class);
127-
ProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
128+
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
128129

129130
@Override
130131
protected Producer createRawProducer(Map configs) {
@@ -134,13 +135,19 @@ protected Producer createRawProducer(Map configs) {
134135
};
135136
Producer aProducer = pf.createProducer();
136137
assertThat(aProducer).isNotNull();
138+
Producer bProducer = pf.createProducer();
139+
assertThat(bProducer).isSameAs(aProducer);
137140
aProducer.close(ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT);
138141
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNotNull();
142+
pf.setMaxAge(Duration.ofMillis(10));
143+
Thread.sleep(50);
144+
aProducer = pf.createProducer();
145+
assertThat(aProducer).isNotSameAs(bProducer);
139146
Map<?, ?> cache = KafkaTestUtils.getPropertyValue(pf, "cache", Map.class);
140147
assertThat(cache.size()).isEqualTo(0);
141148
pf.reset();
142149
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull();
143-
verify(producer).close(any(Duration.class));
150+
verify(producer, times(2)).close(any(Duration.class));
144151
}
145152

146153
@Test
@@ -161,28 +168,34 @@ protected Producer createRawProducer(Map configs) {
161168
Producer aProducer = pf.createProducer();
162169
assertThat(aProducer).isNotNull();
163170
aProducer.close();
171+
Producer bProducer = pf.createProducer();
172+
assertThat(bProducer).isSameAs(aProducer);
173+
bProducer.close();
164174
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull();
165175
Map<?, ?> cache = KafkaTestUtils.getPropertyValue(pf, "cache", Map.class);
166176
assertThat(cache.size()).isEqualTo(1);
167177
Queue queue = (Queue) cache.get("foo");
168178
assertThat(queue.size()).isEqualTo(1);
179+
pf.setMaxAge(Duration.ofMillis(10));
180+
Thread.sleep(50);
181+
aProducer = pf.createProducer();
182+
assertThat(aProducer).isNotSameAs(bProducer);
169183
pf.onApplicationEvent(new ContextStoppedEvent(ctx));
170184
assertThat(queue.size()).isEqualTo(0);
171185
verify(producer).close(any(Duration.class));
172186
}
173187

174188
@Test
175189
@SuppressWarnings({ "rawtypes", "unchecked" })
176-
void testThreadLocal() {
190+
void testThreadLocal() throws InterruptedException {
177191
final Producer producer = mock(Producer.class);
192+
AtomicBoolean created = new AtomicBoolean();
178193
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
179194

180-
boolean created;
181-
182195
@Override
183196
protected Producer createKafkaProducer() {
184-
assertThat(this.created).isFalse();
185-
this.created = true;
197+
assertThat(created.get()).isFalse();
198+
created.set(true);
186199
return producer;
187200
}
188201

@@ -196,9 +209,14 @@ protected Producer createKafkaProducer() {
196209
bProducer.close();
197210
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull();
198211
assertThat(KafkaTestUtils.getPropertyValue(pf, "threadBoundProducers", ThreadLocal.class).get()).isNotNull();
212+
pf.setMaxAge(Duration.ofMillis(10));
213+
Thread.sleep(50);
214+
created.set(false);
215+
aProducer = pf.createProducer();
216+
assertThat(aProducer).isNotSameAs(bProducer);
199217
pf.closeThreadBoundProducer();
200218
assertThat(KafkaTestUtils.getPropertyValue(pf, "threadBoundProducers", ThreadLocal.class).get()).isNull();
201-
verify(producer).close(any(Duration.class));
219+
verify(producer, times(3)).close(any(Duration.class));
202220
}
203221

204222
@Test

src/reference/asciidoc/kafka.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3030,6 +3030,11 @@ This is `true` by default since version 2.5 when transactions are enabled with `
30303030

30313031
Also see <<transaction-id-prefix>>.
30323032

3033+
IMPORTANT: Starting with version 2.5.8, you can now configure the `maxAge` property on the producer factory.
3034+
This is useful when using transactional producers that might lay idle for the broker's `transactional.id.expiration.ms`.
3035+
With current `kafka-clients`, this can cause a `ProducerFencedException` without a rebalance.
3036+
By setting the `maxAge` to less than `transactional.id.expiration.ms`, the factory will refresh the producer if it is past it's max age.
3037+
30333038
===== Using `KafkaTransactionManager`
30343039

30353040
The `KafkaTransactionManager` is an implementation of Spring Framework's `PlatformTransactionManager`.

0 commit comments

Comments
 (0)