Skip to content

Commit 5f57f4b

Browse files
garyrussellartembilan
authored andcommitted
GH-1196: Use close(Duration) instead of close()
Resolves #1196 Add `closeTimeout` to `KafkaTemplate` and `KafkaTransactionManager` (default 5s). Use a zero timeout if a transaction operation failed with a timeout. Deprecate 1.3.x public APIs
1 parent d1c5af0 commit 5f57f4b

File tree

8 files changed

+274
-47
lines changed

8 files changed

+274
-47
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 56 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package org.springframework.kafka.core;
1818

19+
import java.lang.reflect.InvocationTargetException;
20+
import java.lang.reflect.Method;
21+
import java.time.Duration;
1922
import java.util.Collections;
2023
import java.util.HashMap;
2124
import java.util.Iterator;
@@ -43,7 +46,9 @@
4346
import org.apache.kafka.common.PartitionInfo;
4447
import org.apache.kafka.common.TopicPartition;
4548
import org.apache.kafka.common.errors.ProducerFencedException;
49+
import org.apache.kafka.common.errors.TimeoutException;
4650
import org.apache.kafka.common.serialization.Serializer;
51+
import org.apache.kafka.common.utils.AppInfoParser;
4752

4853
import org.springframework.beans.factory.DisposableBean;
4954
import org.springframework.context.Lifecycle;
@@ -126,8 +131,10 @@ public void setValueSerializer(Serializer<V> valueSerializer) {
126131
}
127132

128133
/**
129-
* The time to wait when physically closing the producer (when {@link #stop()} or {@link #destroy()} is invoked).
130-
* Specified in seconds; default {@value #DEFAULT_PHYSICAL_CLOSE_TIMEOUT}.
134+
* The time to wait when physically closing the producer via the factory rather than
135+
* closing the producer itself (when {@link #reset()}, {@link #destroy() or
136+
* #closeProducerFor(String)} are invoked). Specified in seconds; default
137+
* {@link #DEFAULT_PHYSICAL_CLOSE_TIMEOUT}.
131138
* @param physicalCloseTimeout the timeout in seconds.
132139
* @since 1.0.7
133140
*/
@@ -195,7 +202,7 @@ public boolean transactionCapable() {
195202

196203
@SuppressWarnings("resource")
197204
@Override
198-
public void destroy() throws Exception { //NOSONAR
205+
public void destroy() {
199206
CloseSafeProducer<K, V> producer = this.producer;
200207
this.producer = null;
201208
if (producer != null) {
@@ -352,6 +359,25 @@ public void closeProducerFor(String transactionIdSuffix) {
352359
*/
353360
protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
354361

362+
private static final Duration CLOSE_TIMEOUT_AFTER_TX_TIMEOUT = Duration.ofMillis(0);
363+
364+
private static final Method CLOSE_WITH_DURATION;
365+
366+
static {
367+
Method method = null;
368+
String clientVersion = AppInfoParser.getVersion();
369+
try {
370+
if (!clientVersion.startsWith("1.") && !clientVersion.startsWith("2.0.")
371+
&& !clientVersion.startsWith("2.1.")) {
372+
method = KafkaProducer.class.getDeclaredMethod("close", Duration.class);
373+
}
374+
}
375+
catch (NoSuchMethodException e) {
376+
logger.error("Failed to get close(Duration) method for version: " + clientVersion, e);
377+
}
378+
CLOSE_WITH_DURATION = method;
379+
}
380+
355381
private final Producer<K, V> delegate;
356382

357383
private final BlockingQueue<CloseSafeProducer<K, V>> cache;
@@ -360,7 +386,7 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
360386

361387
private final String txId;
362388

363-
private volatile boolean txFailed;
389+
private volatile Exception txFailed;
364390

365391
CloseSafeProducer(Producer<K, V> delegate) {
366392
this(delegate, null, null);
@@ -428,7 +454,7 @@ public void beginTransaction() throws ProducerFencedException {
428454
if (logger.isErrorEnabled()) {
429455
logger.error("beginTransaction failed: " + this, e);
430456
}
431-
this.txFailed = true;
457+
this.txFailed = e;
432458
throw e;
433459
}
434460
}
@@ -452,7 +478,7 @@ public void commitTransaction() throws ProducerFencedException {
452478
if (logger.isErrorEnabled()) {
453479
logger.error("commitTransaction failed: " + this, e);
454480
}
455-
this.txFailed = true;
481+
this.txFailed = e;
456482
throw e;
457483
}
458484
}
@@ -469,7 +495,7 @@ public void abortTransaction() throws ProducerFencedException {
469495
if (logger.isErrorEnabled()) {
470496
logger.error("Abort failed: " + this, e);
471497
}
472-
this.txFailed = true;
498+
this.txFailed = e;
473499
throw e;
474500
}
475501
}
@@ -482,17 +508,16 @@ public void close() {
482508
@Override
483509
public void close(long timeout, @Nullable TimeUnit unit) {
484510
if (this.cache != null) {
485-
if (this.txFailed) {
511+
Duration closeTimeout = this.txFailed instanceof TimeoutException || unit == null
512+
? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT
513+
: Duration.ofMillis(unit.toMillis(timeout));
514+
if (this.txFailed != null) {
486515
if (logger.isWarnEnabled()) {
487-
logger.warn("Error during transactional operation; producer removed from cache; possible cause: "
488-
+ "broker restarted during transaction: " + this);
489-
}
490-
if (unit == null) {
491-
this.delegate.close();
492-
}
493-
else {
494-
this.delegate.close(timeout, unit);
516+
logger.warn("Error during transactional operation; producer removed from cache; "
517+
+ "possible cause: "
518+
+ "broker restarted during transaction: " + this);
495519
}
520+
closeDelegate(closeTimeout);
496521
if (this.removeConsumerProducer != null) {
497522
this.removeConsumerProducer.accept(this);
498523
}
@@ -502,19 +527,28 @@ public void close(long timeout, @Nullable TimeUnit unit) {
502527
synchronized (this) {
503528
if (!this.cache.contains(this)
504529
&& !this.cache.offer(this)) {
505-
if (unit == null) {
506-
this.delegate.close();
507-
}
508-
else {
509-
this.delegate.close(timeout, unit);
510-
}
530+
closeDelegate(closeTimeout);
511531
}
512532
}
513533
}
514534
}
515535
}
516536
}
517537

538+
private void closeDelegate(Duration closeTimeout) {
539+
if (CLOSE_WITH_DURATION != null) {
540+
try {
541+
CLOSE_WITH_DURATION.invoke(this.delegate, closeTimeout);
542+
}
543+
catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
544+
logger.error("Failed to invoke close(Duration) with reflection", e);
545+
}
546+
}
547+
else {
548+
this.delegate.close(closeTimeout.toMillis(), TimeUnit.MILLISECONDS);
549+
}
550+
}
551+
518552
@Override
519553
public String toString() {
520554
return "CloseSafeProducer [delegate=" + this.delegate + ""

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaResourceHolder.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@
1616

1717
package org.springframework.kafka.core;
1818

19+
import java.time.Duration;
20+
import java.util.concurrent.TimeUnit;
21+
1922
import org.apache.kafka.clients.producer.Producer;
2023

2124
import org.springframework.transaction.support.ResourceHolderSupport;
25+
import org.springframework.util.Assert;
2226

2327
/**
2428
* Kafka resource holder, wrapping a Kafka producer. KafkaTransactionManager binds instances of this
@@ -33,12 +37,41 @@ public class KafkaResourceHolder<K, V> extends ResourceHolderSupport {
3337

3438
private final Producer<K, V> producer;
3539

40+
private final Duration closeTimeout;
41+
3642
/**
3743
* Construct an instance for the producer.
3844
* @param producer the producer.
3945
*/
4046
public KafkaResourceHolder(Producer<K, V> producer) {
47+
this(producer, ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT);
48+
}
49+
50+
/**
51+
* Construct an instance for the producer.
52+
* @param producer the producer.
53+
* @param closeTimeout the close timeout.
54+
* @deprecated in favor of {@link #KafkaResourceHolder(Producer, Duration)}
55+
* @since 1.3.11
56+
*/
57+
@Deprecated
58+
public KafkaResourceHolder(Producer<K, V> producer, long closeTimeout) {
59+
Assert.notNull(producer, "'producer' cannot be null");
60+
Assert.notNull(closeTimeout, "'closeTimeout' cannot be null");
61+
this.producer = producer;
62+
this.closeTimeout = Duration.ofMillis(closeTimeout);
63+
}
64+
65+
/**
66+
* Construct an instance for the producer.
67+
* @param producer the producer.
68+
* @param closeTimeout the close timeout.
69+
*/
70+
public KafkaResourceHolder(Producer<K, V> producer, Duration closeTimeout) {
71+
Assert.notNull(producer, "'producer' cannot be null");
72+
Assert.notNull(closeTimeout, "'closeTimeout' cannot be null");
4173
this.producer = producer;
74+
this.closeTimeout = closeTimeout;
4275
}
4376

4477
public Producer<K, V> getProducer() {
@@ -50,7 +83,7 @@ public void commit() {
5083
}
5184

5285
public void close() {
53-
this.producer.close();
86+
this.producer.close(this.closeTimeout.toMillis(), TimeUnit.MILLISECONDS);
5487
}
5588

5689
public void rollback() {

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616

1717
package org.springframework.kafka.core;
1818

19+
import java.time.Duration;
1920
import java.util.List;
2021
import java.util.Map;
22+
import java.util.concurrent.TimeUnit;
2123

2224
import org.apache.commons.logging.Log;
2325
import org.apache.commons.logging.LogFactory;
@@ -77,6 +79,7 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V> {
7779

7880
private volatile ProducerListener<K, V> producerListener = new LoggingProducerListener<K, V>();
7981

82+
private Duration closeTimeout = ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT;
8083

8184
/**
8285
* Create an instance using the supplied producer factory and autoFlush false.
@@ -157,6 +160,27 @@ public boolean isTransactional() {
157160
return this.transactional;
158161
}
159162

163+
/**
164+
* Set the maximum time to wait when closing a producer; default 5 seconds.
165+
* @param closeTimeout the close timeout.
166+
* @deprecated in favor of {@link #setCloseTimeout(Duration)}.
167+
* @since 1.3.11
168+
*/
169+
@Deprecated
170+
public void setCloseTimeout(long closeTimeout) {
171+
setCloseTimeout(Duration.ofMillis(closeTimeout));
172+
}
173+
174+
/**
175+
* Set the maximum time to wait when closing a producer; default 5 seconds.
176+
* @param closeTimeout the close timeout.
177+
* @since 2.1.14
178+
*/
179+
public void setCloseTimeout(Duration closeTimeout) {
180+
Assert.notNull(closeTimeout, "'closeTimeout' cannot be null");
181+
this.closeTimeout = closeTimeout;
182+
}
183+
160184
@Override
161185
public ListenableFuture<SendResult<K, V>> sendDefault(V data) {
162186
return send(this.defaultTopic, data);
@@ -342,9 +366,9 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
342366
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
343367
}
344368

345-
protected void closeProducer(Producer<K, V> producer, boolean inLocalTx) {
346-
if (!inLocalTx) {
347-
producer.close();
369+
protected void closeProducer(Producer<K, V> producer, boolean inTx) {
370+
if (!inTx) {
371+
producer.close(this.closeTimeout.toMillis(), TimeUnit.MILLISECONDS);
348372
}
349373
}
350374

@@ -422,7 +446,7 @@ private Producer<K, V> getTheProducer() {
422446
return producer;
423447
}
424448
KafkaResourceHolder<K, V> holder = ProducerFactoryUtils
425-
.getTransactionalResourceHolder(this.producerFactory);
449+
.getTransactionalResourceHolder(this.producerFactory, this.closeTimeout);
426450
return holder.getProducer();
427451
}
428452
else {

spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package org.springframework.kafka.core;
1818

19+
import java.time.Duration;
20+
import java.util.concurrent.TimeUnit;
21+
1922
import org.apache.kafka.clients.producer.Producer;
2023

2124
import org.springframework.transaction.support.ResourceHolderSynchronization;
@@ -34,6 +37,11 @@
3437
*/
3538
public final class ProducerFactoryUtils {
3639

40+
/**
41+
* The default close timeout (5 seconds).
42+
*/
43+
public static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(5);
44+
3745
private static ThreadLocal<String> groupIds = new ThreadLocal<>();
3846

3947
private ProducerFactoryUtils() {
@@ -50,6 +58,38 @@ private ProducerFactoryUtils() {
5058
public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(
5159
final ProducerFactory<K, V> producerFactory) {
5260

61+
return getTransactionalResourceHolder(producerFactory, DEFAULT_CLOSE_TIMEOUT);
62+
}
63+
64+
/**
65+
* Obtain a Producer that is synchronized with the current transaction, if any.
66+
* @param producerFactory the ProducerFactory to obtain a Channel for
67+
* @param closeTimeout the producer close timeout.
68+
* @param <K> the key type.
69+
* @param <V> the value type.
70+
* @return the resource holder.
71+
* @deprecated in favor of {@link #getTransactionalResourceHolder(ProducerFactory, Duration)}
72+
* @since 1.3.11
73+
*/
74+
@Deprecated
75+
public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(
76+
final ProducerFactory<K, V> producerFactory, long closeTimeout) {
77+
78+
return getTransactionalResourceHolder(producerFactory, Duration.ofMillis(closeTimeout));
79+
}
80+
81+
/**
82+
* Obtain a Producer that is synchronized with the current transaction, if any.
83+
* @param producerFactory the ProducerFactory to obtain a Channel for
84+
* @param closeTimeout the producer close timeout.
85+
* @param <K> the key type.
86+
* @param <V> the value type.
87+
* @return the resource holder.
88+
* @since 2.1.14
89+
*/
90+
public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(
91+
final ProducerFactory<K, V> producerFactory, Duration closeTimeout) {
92+
5393
Assert.notNull(producerFactory, "ProducerFactory must not be null");
5494

5595
@SuppressWarnings("unchecked")
@@ -62,19 +102,19 @@ public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(
62102
producer.beginTransaction();
63103
}
64104
catch (RuntimeException e) {
65-
producer.close();
105+
producer.close(closeTimeout.toMillis(), TimeUnit.MILLISECONDS);
66106
throw e;
67107
}
68108

69-
resourceHolder = new KafkaResourceHolder<K, V>(producer);
109+
resourceHolder = new KafkaResourceHolder<K, V>(producer, closeTimeout);
70110
bindResourceToTransaction(resourceHolder, producerFactory);
71111
}
72112
return resourceHolder;
73113
}
74114

75115
public static <K, V> void releaseResources(KafkaResourceHolder<K, V> resourceHolder) {
76116
if (resourceHolder != null) {
77-
resourceHolder.getProducer().close();
117+
resourceHolder.close();
78118
}
79119
}
80120

0 commit comments

Comments
 (0)