Skip to content

Commit 960287e

Browse files
committed
GH-1727: Close Producer if initTransactions Fails
Resolves #1727
1 parent 2d5dfc8 commit 960287e

File tree

1 file changed

+16
-2
lines changed

1 file changed

+16
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -58,6 +58,7 @@
5858
import org.springframework.context.ApplicationListener;
5959
import org.springframework.context.event.ContextStoppedEvent;
6060
import org.springframework.core.log.LogAccessor;
61+
import org.springframework.kafka.KafkaException;
6162
import org.springframework.kafka.support.TransactionSupport;
6263
import org.springframework.lang.Nullable;
6364
import org.springframework.util.Assert;
@@ -715,7 +716,20 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
715716
}
716717
checkBootstrap(newProducerConfigs);
717718
newProducer = createRawProducer(newProducerConfigs);
718-
newProducer.initTransactions();
719+
try {
720+
newProducer.initTransactions();
721+
}
722+
catch (RuntimeException ex) {
723+
try {
724+
newProducer.close(this.physicalCloseTimeout);
725+
}
726+
catch (RuntimeException ex2) {
727+
KafkaException newEx = new KafkaException("initTransactions() failed and then close() failed", ex);
728+
newEx.addSuppressed(ex2);
729+
throw newEx; // NOSONAR - lost stack trace
730+
}
731+
throw new KafkaException("initTransactions() failed", ex);
732+
}
719733
CloseSafeProducer<K, V> closeSafeProducer =
720734
new CloseSafeProducer<>(newProducer, remover, prefix, this.physicalCloseTimeout, this.beanName,
721735
this.epoch.get());

0 commit comments

Comments
 (0)