4141import org .apache .kafka .common .MetricName ;
4242import org .apache .kafka .common .PartitionInfo ;
4343import org .apache .kafka .common .TopicPartition ;
44+ import org .apache .kafka .common .errors .OutOfOrderSequenceException ;
4445import org .apache .kafka .common .errors .ProducerFencedException ;
4546import org .apache .kafka .common .errors .TimeoutException ;
4647import org .apache .kafka .common .serialization .Serializer ;
@@ -190,15 +191,18 @@ public boolean transactionCapable() {
190191 @ SuppressWarnings ("resource" )
191192 @ Override
192193 public void destroy () {
193- CloseSafeProducer <K , V > producer = this .producer ;
194- this .producer = null ;
195- if (producer != null ) {
196- producer .delegate .close (this .physicalCloseTimeout , TimeUnit .SECONDS );
194+ CloseSafeProducer <K , V > producerToClose ;
195+ synchronized (this ) {
196+ producerToClose = this .producer ;
197+ this .producer = null ;
198+ }
199+ if (producerToClose != null ) {
200+ producerToClose .delegate .close (this .physicalCloseTimeout , TimeUnit .SECONDS );
197201 }
198202 producer = this .cache .poll ();
199203 while (producer != null ) {
200204 try {
201- producer .delegate .close (this .physicalCloseTimeout , TimeUnit .SECONDS );
205+ producerToClose .delegate .close (this .physicalCloseTimeout , TimeUnit .SECONDS );
202206 }
203207 catch (Exception e ) {
204208 logger .error ("Exception while closing producer" , e );
@@ -247,14 +251,13 @@ public Producer<K, V> createProducer() {
247251 return createTransactionalProducer ();
248252 }
249253 }
250- if (this .producer == null ) {
251- synchronized (this ) {
252- if (this .producer == null ) {
253- this .producer = new CloseSafeProducer <K , V >(createKafkaProducer ());
254- }
254+ synchronized (this ) {
255+ if (this .producer == null ) {
256+ this .producer = new CloseSafeProducer <K , V >(createKafkaProducer (), standardProducerRemover (),
257+ this .physicalCloseTimeout );
255258 }
259+ return this .producer ;
256260 }
257- return this .producer ;
258261 }
259262
260263 /**
@@ -293,6 +296,19 @@ Producer<K, V> createTransactionalProducerForPartition() {
293296 }
294297 }
295298
299+ /**
300+ * Remove the single shared producer if present.
301+ * @param producerToRemove the producer;
302+ * @since 1.3.11
303+ */
304+ protected final synchronized void removeProducer (
305+ @ SuppressWarnings ("unused" ) CloseSafeProducer <K , V > producerToRemove ) {
306+
307+ if (producerToRemove .equals (this .producer )) {
308+ this .producer = null ;
309+ }
310+ }
311+
296312 /**
297313 * Subclasses must return a producer from the {@link #getCache()} or a
298314 * new raw producer wrapped in a {@link CloseSafeProducer}.
@@ -319,9 +335,46 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String suffix, boolean isCons
319335 }
320336 producer = new KafkaProducer <K , V >(configs , this .keySerializer , this .valueSerializer );
321337 producer .initTransactions ();
322- return new CloseSafeProducer <K , V >(producer , this .cache ,
323- isConsumerProducer ? this .consumerProducers : null ,
324- (String ) configs .get (ProducerConfig .TRANSACTIONAL_ID_CONFIG ));
338+ Remover <K , V > remover = isConsumerProducer
339+ ? consumerProducerRemover ()
340+ : null ;
341+ return new CloseSafeProducer <K , V >(producer , this .cache , remover ,
342+ (String ) configs .get (ProducerConfig .TRANSACTIONAL_ID_CONFIG ), this .physicalCloseTimeout );
343+ }
344+
345+ private Remover <K , V > standardProducerRemover () {
346+ return new Remover <K , V >() {
347+
348+ @ Override
349+ public void remove (CloseSafeProducer <K , V > producer ) {
350+ removeProducer (producer );
351+ }
352+
353+ };
354+ }
355+
356+ private Remover <K , V > consumerProducerRemover () {
357+ return new Remover <K , V >() {
358+
359+ @ Override
360+ public void remove (CloseSafeProducer <K , V > producer ) {
361+ removeConsumerProducer (producer );
362+ }
363+
364+ };
365+ }
366+
367+ private void removeConsumerProducer (CloseSafeProducer <K , V > producer ) {
368+ synchronized (this .consumerProducers ) {
369+ Iterator <Entry <String , CloseSafeProducer <K , V >>> iterator = this .consumerProducers .entrySet ()
370+ .iterator ();
371+ while (iterator .hasNext ()) {
372+ if (iterator .next ().getValue ().equals (producer )) {
373+ iterator .remove ();
374+ break ;
375+ }
376+ }
377+ }
325378 }
326379
327380 protected BlockingQueue <CloseSafeProducer <K , V >> getCache () {
@@ -339,6 +392,19 @@ public void closeProducerFor(String transactionIdSuffix) {
339392 }
340393 }
341394
395+ /**
396+ * Internal interface to remove a failed producer.
397+ *
398+ * @param <K> the key type.
399+ * @param <V> the value type.
400+ *
401+ */
402+ interface Remover <K , V > {
403+
404+ void remove (CloseSafeProducer <K , V > producer );
405+
406+ }
407+
342408 /**
343409 * A wrapper class for the delegate.
344410 *
@@ -352,34 +418,46 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
352418
353419 private final BlockingQueue <CloseSafeProducer <K , V >> cache ;
354420
355- private final Map <String , CloseSafeProducer <K , V >> consumerProducers ;
356-
357421 private final String txId ;
358422
359- private volatile Exception txFailed ;
423+ private final Remover <K , V > remover ;
424+
425+ private final int closeTimeout ;
426+
427+ private volatile Exception producerFailed ;
428+
429+ private volatile boolean closed ;
430+
431+ CloseSafeProducer (Producer <K , V > delegate , Remover <K , V > remover , int closeTimeout ) {
360432
361- CloseSafeProducer (Producer <K , V > delegate ) {
362- this (delegate , null , null );
433+ this (delegate , null , remover , null , closeTimeout );
363434 Assert .isTrue (!(delegate instanceof CloseSafeProducer ), "Cannot double-wrap a producer" );
364435 }
365436
366- CloseSafeProducer (Producer <K , V > delegate , BlockingQueue <CloseSafeProducer <K , V >> cache ) {
367- this (delegate , cache , null );
437+ CloseSafeProducer (Producer <K , V > delegate , BlockingQueue <CloseSafeProducer <K , V >> cache ,
438+ int closeTimeout ) {
439+
440+ this (delegate , cache , null , closeTimeout );
368441 }
369442
370443 CloseSafeProducer (Producer <K , V > delegate , BlockingQueue <CloseSafeProducer <K , V >> cache ,
371- Map < String , CloseSafeProducer < K , V >> consumerProducers ) {
444+ Remover < K , V > remover , int closeTimeout ) {
372445
373- this (delegate , cache , consumerProducers , null );
446+ this (delegate , cache , remover , null , closeTimeout );
374447 }
375448
376449 CloseSafeProducer (Producer <K , V > delegate , BlockingQueue <CloseSafeProducer <K , V >> cache ,
377- Map < String , CloseSafeProducer < K , V >> consumerProducers , String txId ) {
450+ Remover < K , V > remover , String txId , int closeTimeout ) {
378451
379452 this .delegate = delegate ;
380453 this .cache = cache ;
381- this .consumerProducers = consumerProducers ;
454+ this .remover = remover ;
382455 this .txId = txId ;
456+ this .closeTimeout = closeTimeout ;
457+ }
458+
459+ Producer <K , V > getDelegate () {
460+ return this .delegate ;
383461 }
384462
385463 @ Override
@@ -388,8 +466,19 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
388466 }
389467
390468 @ Override
391- public Future <RecordMetadata > send (ProducerRecord <K , V > record , Callback callback ) {
392- return this .delegate .send (record , callback );
469+ public Future <RecordMetadata > send (ProducerRecord <K , V > record , final Callback callback ) {
470+ return this .delegate .send (record , new Callback () {
471+
472+ @ Override
473+ public void onCompletion (RecordMetadata metadata , Exception exception ) {
474+ if (exception instanceof OutOfOrderSequenceException ) {
475+ CloseSafeProducer .this .producerFailed = exception ;
476+ close (CloseSafeProducer .this .closeTimeout , TimeUnit .MILLISECONDS );
477+ }
478+ callback .onCompletion (metadata , exception );
479+ }
480+
481+ });
393482 }
394483
395484 @ Override
@@ -424,7 +513,7 @@ public void beginTransaction() throws ProducerFencedException {
424513 if (logger .isErrorEnabled ()) {
425514 logger .error ("beginTransaction failed: " + this , e );
426515 }
427- this .txFailed = e ;
516+ this .producerFailed = e ;
428517 throw e ;
429518 }
430519 }
@@ -448,7 +537,7 @@ public void commitTransaction() throws ProducerFencedException {
448537 if (logger .isErrorEnabled ()) {
449538 logger .error ("commitTransaction failed: " + this , e );
450539 }
451- this .txFailed = e ;
540+ this .producerFailed = e ;
452541 throw e ;
453542 }
454543 }
@@ -465,7 +554,7 @@ public void abortTransaction() throws ProducerFencedException {
465554 if (logger .isErrorEnabled ()) {
466555 logger .error ("Abort failed: " + this , e );
467556 }
468- this .txFailed = e ;
557+ this .producerFailed = e ;
469558 throw e ;
470559 }
471560 }
@@ -477,26 +566,27 @@ public void close() {
477566
478567 @ Override
479568 public void close (long timeout , TimeUnit unit ) {
480- if (this .cache != null ) {
481- long closeTimeout = this .txFailed instanceof TimeoutException || unit == null
482- ? 0L
483- : timeout ;
484- if (this .txFailed != null ) {
569+ if (!this .closed ) {
570+ if (this .producerFailed != null ) {
485571 if (logger .isWarnEnabled ()) {
486572 logger .warn ("Error during transactional operation; producer removed from cache; "
487573 + "possible cause: "
488574 + "broker restarted during transaction: " + this );
489575 }
490- this .delegate .close (closeTimeout , unit );
491- if (this .consumerProducers != null ) {
492- removeConsumerProducer ();
576+ this .closed = true ;
577+ this .delegate .close (this .producerFailed instanceof TimeoutException || unit == null
578+ ? 0L
579+ : timeout , unit );
580+ if (this .remover != null ) {
581+ this .remover .remove (this );
493582 }
494583 }
495584 else {
496- if (this .consumerProducers == null ) { // dedicated consumer producers are not cached
585+ if (this .cache != null && this . remover == null ) { // dedicated consumer producers are not cached
497586 synchronized (this ) {
498587 if (!this .cache .contains (this )
499588 && !this .cache .offer (this )) {
589+ this .closed = true ;
500590 this .delegate .close (closeTimeout , unit );
501591 }
502592 }
@@ -505,19 +595,6 @@ public void close(long timeout, TimeUnit unit) {
505595 }
506596 }
507597
508- private void removeConsumerProducer () {
509- synchronized (this .consumerProducers ) {
510- Iterator <Entry <String , CloseSafeProducer <K , V >>> iterator = this .consumerProducers .entrySet ()
511- .iterator ();
512- while (iterator .hasNext ()) {
513- if (iterator .next ().getValue ().equals (this )) {
514- iterator .remove ();
515- break ;
516- }
517- }
518- }
519- }
520-
521598 @ Override
522599 public String toString () {
523600 return "CloseSafeProducer [delegate=" + this .delegate + ""
0 commit comments