11/*
2- * Copyright 2016-2022 the original author or authors.
2+ * Copyright 2016-2023 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
@@ -740,6 +740,10 @@ private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
740740 return getOrCreateThreadBoundProducer ();
741741 }
742742 synchronized (this ) {
743+ if (this .producer != null && this .producer .closed ) {
744+ this .producer .closeDelegate (this .physicalCloseTimeout , this .listeners );
745+ this .producer = null ;
746+ }
743747 if (this .producer != null && expire (this .producer )) {
744748 this .producer = null ;
745749 }
@@ -754,7 +758,7 @@ private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
754758
755759 private Producer <K , V > getOrCreateThreadBoundProducer () {
756760 CloseSafeProducer <K , V > tlProducer = this .threadBoundProducers .get ();
757- if (tlProducer != null && (this .epoch .get () != tlProducer .epoch || expire (tlProducer ))) {
761+ if (tlProducer != null && (tlProducer . closed || this .epoch .get () != tlProducer .epoch || expire (tlProducer ))) {
758762 closeThreadBoundProducer ();
759763 tlProducer = null ;
760764 }
@@ -781,21 +785,11 @@ protected Producer<K, V> createKafkaProducer() {
781785 * Remove the single shared producer and a thread-bound instance if present.
782786 * @param producerToRemove the producer.
783787 * @param timeout the close timeout.
784- * @return always true.
788+ * @return true if the producer was closed .
785789 * @since 2.2.13
786790 */
787- protected final synchronized boolean removeProducer (CloseSafeProducer <K , V > producerToRemove , Duration timeout ) {
788- if (producerToRemove .closed ) {
789- if (producerToRemove .equals (this .producer )) {
790- this .producer = null ;
791- producerToRemove .closeDelegate (timeout , this .listeners );
792- }
793- this .threadBoundProducers .remove ();
794- return true ;
795- }
796- else {
797- return false ;
798- }
791+ protected final boolean removeProducer (CloseSafeProducer <K , V > producerToRemove , Duration timeout ) {
792+ return producerToRemove .closed ;
799793 }
800794
801795 /**
@@ -1139,7 +1133,12 @@ public void close(@Nullable Duration timeout) {
11391133 }
11401134
11411135 void closeDelegate (Duration timeout , List <Listener <K , V >> listeners ) {
1142- this .delegate .close (timeout == null ? this .closeTimeout : timeout );
1136+ try {
1137+ this .delegate .close (timeout == null ? this .closeTimeout : timeout );
1138+ }
1139+ catch (Exception ex ) {
1140+ LOGGER .warn (ex , () -> "Failed to close " + this .delegate );
1141+ }
11431142 listeners .forEach (listener -> listener .producerRemoved (this .clientId , this ));
11441143 this .closed = true ;
11451144 }
0 commit comments