@@ -78,6 +78,7 @@ class StreamProducer implements Producer {
7878 entity -> ((AccumulatedEntity ) entity ).publishingId ();
7979 private final long enqueueTimeoutMs ;
8080 private final boolean blockOnMaxUnconfirmed ;
81+ private final boolean retryOnRecovery ;
8182 private volatile Client client ;
8283 private volatile byte publisherId ;
8384 private volatile Status status ;
@@ -95,6 +96,7 @@ class StreamProducer implements Producer {
9596 int maxUnconfirmedMessages ,
9697 Duration confirmTimeout ,
9798 Duration enqueueTimeout ,
99+ boolean retryOnRecovery ,
98100 Function <Message , String > filterValueExtractor ,
99101 StreamEnvironment environment ) {
100102 if (filterValueExtractor != null && !environment .filteringSupported ()) {
@@ -107,6 +109,7 @@ class StreamProducer implements Producer {
107109 this .name = name ;
108110 this .stream = stream ;
109111 this .enqueueTimeoutMs = enqueueTimeout .toMillis ();
112+ this .retryOnRecovery = retryOnRecovery ;
110113 this .blockOnMaxUnconfirmed = enqueueTimeout .isZero ();
111114 this .closingCallback = environment .registerProducer (this , name , this .stream );
112115 final Client .OutboundEntityWriteCallback delegateWriteCallback ;
@@ -504,43 +507,58 @@ void unavailable() {
504507
505508 void running () {
506509 synchronized (this ) {
507- LOGGER . debug (
508- "Re-publishing {} unconfirmed message(s) and {} accumulated message(s)" ,
509- this . unconfirmedMessages . size () ,
510- this .accumulator .size ());
511- if (! this .unconfirmedMessages . isEmpty ()) {
512- Map < Long , AccumulatedEntity > messagesToResend = new TreeMap <>( this . unconfirmedMessages );
510+ if (! this . retryOnRecovery ) {
511+ LOGGER . debug (
512+ "Skip to republish {} unconfirmed message(s) and re-publishing {} accumulated message(s)" ,
513+ this .unconfirmedMessages .size (),
514+ this .accumulator . size ());
515+
513516 this .unconfirmedMessages .clear ();
514- Iterator <Entry <Long , AccumulatedEntity >> resendIterator =
517+ int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore .availablePermits ();
518+ if (toRelease > 0 ) {
519+ unconfirmedMessagesSemaphore .release (toRelease );
520+ }
521+
522+ publishBatch (false );
523+ } else {
524+ LOGGER .debug (
525+ "Re-publishing {} unconfirmed message(s) and {} accumulated message(s)" ,
526+ this .unconfirmedMessages .size (),
527+ this .accumulator .size ());
528+ if (!this .unconfirmedMessages .isEmpty ()) {
529+ Map <Long , AccumulatedEntity > messagesToResend = new TreeMap <>(this .unconfirmedMessages );
530+ this .unconfirmedMessages .clear ();
531+ Iterator <Entry <Long , AccumulatedEntity >> resendIterator =
515532 messagesToResend .entrySet ().iterator ();
516- while (resendIterator .hasNext ()) {
517- List <Object > messages = new ArrayList <>(this .batchSize );
518- int batchCount = 0 ;
519- while (batchCount != this .batchSize ) {
520- Object accMessage = resendIterator .hasNext () ? resendIterator .next ().getValue () : null ;
521- if (accMessage == null ) {
522- break ;
533+ while (resendIterator .hasNext ()) {
534+ List <Object > messages = new ArrayList <>(this .batchSize );
535+ int batchCount = 0 ;
536+ while (batchCount != this .batchSize ) {
537+ Object accMessage = resendIterator .hasNext () ? resendIterator .next ().getValue () : null ;
538+ if (accMessage == null ) {
539+ break ;
540+ }
541+ messages .add (accMessage );
542+ batchCount ++;
523543 }
524- messages .add (accMessage );
525- batchCount ++;
544+ client .publishInternal (
545+ this .publishVersion ,
546+ this .publisherId ,
547+ messages ,
548+ this .writeCallback ,
549+ this .publishSequenceFunction );
526550 }
527- client .publishInternal (
528- this .publishVersion ,
529- this .publisherId ,
530- messages ,
531- this .writeCallback ,
532- this .publishSequenceFunction );
533551 }
534- }
535- publishBatch ( false );
536-
537- int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore . availablePermits ();
538- if ( toRelease > 0 ) {
539- unconfirmedMessagesSemaphore .release ( toRelease );
540- if (! unconfirmedMessagesSemaphore . tryAcquire ( this . unconfirmedMessages . size ())) {
541- LOGGER . debug (
542- "Could not acquire {} permit(s) for message republishing" ,
543- this . unconfirmedMessages . size ());
552+ publishBatch ( false );
553+
554+ int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore . availablePermits ();
555+ if ( toRelease > 0 ) {
556+ unconfirmedMessagesSemaphore . release ( toRelease );
557+ if (! unconfirmedMessagesSemaphore .tryAcquire ( this . unconfirmedMessages . size ())) {
558+ LOGGER . debug (
559+ "Could not acquire {} permit(s) for message republishing" ,
560+ this . unconfirmedMessages . size ());
561+ }
544562 }
545563 }
546564 }
0 commit comments