Skip to content

Commit c33b0b3

Browse files
committed
GH-1727: Close Producer if initTransactions Fails
Resolves #1727
1 parent e48b639 commit c33b0b3

File tree

1 file changed

+16
-2
lines changed

1 file changed

+16
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -58,6 +58,7 @@
5858
import org.springframework.context.ApplicationListener;
5959
import org.springframework.context.event.ContextStoppedEvent;
6060
import org.springframework.core.log.LogAccessor;
61+
import org.springframework.kafka.KafkaException;
6162
import org.springframework.kafka.support.TransactionSupport;
6263
import org.springframework.lang.Nullable;
6364
import org.springframework.util.Assert;
@@ -716,7 +717,20 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
716717
}
717718
checkBootstrap(newProducerConfigs);
718719
newProducer = createRawProducer(newProducerConfigs);
719-
newProducer.initTransactions();
720+
try {
721+
newProducer.initTransactions();
722+
}
723+
catch (RuntimeException ex) {
724+
try {
725+
newProducer.close(this.physicalCloseTimeout);
726+
}
727+
catch (RuntimeException ex2) {
728+
KafkaException newEx = new KafkaException("initTransactions() failed and then close() failed", ex);
729+
newEx.addSuppressed(ex2);
730+
throw newEx; // NOSONAR - lost stack trace
731+
}
732+
throw new KafkaException("initTransactions() failed", ex);
733+
}
720734
CloseSafeProducer<K, V> closeSafeProducer =
721735
new CloseSafeProducer<>(newProducer, remover, prefix, this.physicalCloseTimeout, this.beanName,
722736
this.epoch.get());

0 commit comments

Comments
 (0)