@@ -507,34 +507,23 @@ void unavailable() {
507507
508508 void running () {
509509 synchronized (this ) {
510- if (!this .retryOnRecovery ) {
511- LOGGER .debug (
512- "Skip to republish {} unconfirmed message(s) and re-publishing {} accumulated message(s)" ,
513- this .unconfirmedMessages .size (),
514- this .accumulator .size ());
515-
516- this .unconfirmedMessages .clear ();
517- int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore .availablePermits ();
518- if (toRelease > 0 ) {
519- unconfirmedMessagesSemaphore .release (toRelease );
520- }
521-
522- publishBatch (false );
523- } else {
524- LOGGER .debug (
525- "Re-publishing {} unconfirmed message(s) and {} accumulated message(s)" ,
526- this .unconfirmedMessages .size (),
527- this .accumulator .size ());
510+ LOGGER .debug (
511+ "Recovering producer with {} unconfirmed message(s) and {} accumulated message(s)" ,
512+ this .unconfirmedMessages .size (),
513+ this .accumulator .size ());
514+ if (this .retryOnRecovery ) {
515+ LOGGER .debug ("Re-publishing {} unconfirmed message(s)" , this .unconfirmedMessages .size ());
528516 if (!this .unconfirmedMessages .isEmpty ()) {
529517 Map <Long , AccumulatedEntity > messagesToResend = new TreeMap <>(this .unconfirmedMessages );
530518 this .unconfirmedMessages .clear ();
531519 Iterator <Entry <Long , AccumulatedEntity >> resendIterator =
532- messagesToResend .entrySet ().iterator ();
520+ messagesToResend .entrySet ().iterator ();
533521 while (resendIterator .hasNext ()) {
534522 List <Object > messages = new ArrayList <>(this .batchSize );
535523 int batchCount = 0 ;
536524 while (batchCount != this .batchSize ) {
537- Object accMessage = resendIterator .hasNext () ? resendIterator .next ().getValue () : null ;
525+ Object accMessage =
526+ resendIterator .hasNext () ? resendIterator .next ().getValue () : null ;
538527 if (accMessage == null ) {
539528 break ;
540529 }
@@ -549,18 +538,34 @@ void running() {
549538 this .publishSequenceFunction );
550539 }
551540 }
552- publishBatch (false );
553-
554- int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore .availablePermits ();
555- if (toRelease > 0 ) {
556- unconfirmedMessagesSemaphore .release (toRelease );
557- if (!unconfirmedMessagesSemaphore .tryAcquire (this .unconfirmedMessages .size ())) {
558- LOGGER .debug (
559- "Could not acquire {} permit(s) for message republishing" ,
560- this .unconfirmedMessages .size ());
541+ } else {
542+ LOGGER .debug (
543+ "Skipping republishing of {} unconfirmed messages" , this .unconfirmedMessages .size ());
544+ Map <Long , AccumulatedEntity > messagesToFail = new TreeMap <>(this .unconfirmedMessages );
545+ this .unconfirmedMessages .clear ();
546+ for (AccumulatedEntity accumulatedEntity : messagesToFail .values ()) {
547+ try {
548+ int permits =
549+ accumulatedEntity
550+ .confirmationCallback ()
551+ .handle (false , CODE_PUBLISH_CONFIRM_TIMEOUT );
552+ this .unconfirmedMessagesSemaphore .release (permits );
553+ } catch (Exception e ) {
554+ LOGGER .debug ("Error while nack-ing outbound message: {}" , e .getMessage ());
555+ this .unconfirmedMessagesSemaphore .release (1 );
561556 }
562557 }
563558 }
559+ publishBatch (false );
560+ int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore .availablePermits ();
561+ if (toRelease > 0 ) {
562+ unconfirmedMessagesSemaphore .release (toRelease );
563+ if (!unconfirmedMessagesSemaphore .tryAcquire (this .unconfirmedMessages .size ())) {
564+ LOGGER .debug (
565+ "Could not acquire {} permit(s) for message republishing" ,
566+ this .unconfirmedMessages .size ());
567+ }
568+ }
564569 }
565570 this .status = Status .RUNNING ;
566571 }
0 commit comments