5959 */
6060public class DefaultKafkaProducerFactory <K , V > implements ProducerFactory <K , V >, Lifecycle , DisposableBean {
6161
62+ private static final int DEFAULT_PHYSICAL_CLOSE_TIMEOUT = 30 ;
63+
6264 private static final Log logger = LogFactory .getLog (DefaultKafkaProducerFactory .class );
6365
6466 private final Map <String , Object > configs ;
@@ -69,6 +71,8 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
6971
7072 private Serializer <V > valueSerializer ;
7173
74+ private int physicalCloseTimeout = DEFAULT_PHYSICAL_CLOSE_TIMEOUT ;
75+
7276 private volatile boolean running ;
7377
7478 public DefaultKafkaProducerFactory (Map <String , Object > configs ) {
@@ -90,12 +94,22 @@ public void setValueSerializer(Serializer<V> valueSerializer) {
9094 this .valueSerializer = valueSerializer ;
9195 }
9296
97+ /**
98+ * The time to wait when physically closing the producer (when {@link #stop()} or {@link #destroy()} is invoked.
99+ * Specified in seconds; default {@value #DEFAULT_PHYSICAL_CLOSE_TIMEOUT}.
100+ * @param physicalCloseTimeout the timeout in seconds.
101+ * @since 1.0.7
102+ */
103+ public void setPhysicalCloseTimeout (int physicalCloseTimeout ) {
104+ this .physicalCloseTimeout = physicalCloseTimeout ;
105+ }
106+
93107 @ Override
94108 public void destroy () throws Exception { //NOSONAR
95109 CloseSafeProducer <K , V > producer = this .producer ;
96110 this .producer = null ;
97111 if (producer != null ) {
98- producer .delegate .close ();
112+ producer .delegate .close (this . physicalCloseTimeout , TimeUnit . SECONDS );
99113 }
100114 }
101115
0 commit comments