1- // Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
1+ // Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
22// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
33//
44// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4343import java .util .concurrent .atomic .AtomicBoolean ;
4444import java .util .concurrent .atomic .AtomicLong ;
4545import java .util .concurrent .atomic .AtomicReference ;
46+ import java .util .concurrent .locks .Lock ;
47+ import java .util .concurrent .locks .ReentrantLock ;
4648import java .util .function .Function ;
4749import java .util .function .ToLongFunction ;
4850import org .slf4j .Logger ;
@@ -56,7 +58,6 @@ class StreamProducer implements Producer {
5658 private static final ConfirmationHandler NO_OP_CONFIRMATION_HANDLER = confirmationStatus -> {};
5759 private final long id ;
5860 private final MessageAccumulator accumulator ;
59- private final ToLongFunction <Message > accumulatorPublishSequenceFunction ;
6061 // FIXME investigate a more optimized data structure to handle pending messages
6162 private final ConcurrentMap <Long , AccumulatedEntity > unconfirmedMessages ;
6263 private final int batchSize ;
@@ -79,6 +80,7 @@ class StreamProducer implements Producer {
7980 private volatile Status status ;
8081 private volatile ScheduledFuture <?> confirmTimeoutFuture ;
8182 private final short publishVersion ;
83+ private final Lock lock = new ReentrantLock ();
8284
8385 @ SuppressFBWarnings ("CT_CONSTRUCTOR_THROW" )
8486 StreamProducer (
@@ -110,7 +112,7 @@ class StreamProducer implements Producer {
110112 this .closingCallback = environment .registerProducer (this , name , this .stream );
111113 final Client .OutboundEntityWriteCallback delegateWriteCallback ;
112114 AtomicLong publishingSequence = new AtomicLong (computeFirstValueOfPublishingSequence ());
113- this . accumulatorPublishSequenceFunction =
115+ ToLongFunction < Message > accumulatorPublishSequenceFunction =
114116 msg -> {
115117 if (msg .hasPublishingId ()) {
116118 return msg .getPublishingId ();
@@ -491,76 +493,80 @@ void unavailable() {
491493 }
492494
493495 void running () {
494- synchronized (this ) {
495- LOGGER .debug (
496- "Recovering producer with {} unconfirmed message(s) and {} accumulated message(s)" ,
497- this .unconfirmedMessages .size (),
498- this .accumulator .size ());
499- if (this .retryOnRecovery ) {
500- LOGGER .debug ("Re-publishing {} unconfirmed message(s)" , this .unconfirmedMessages .size ());
501- if (!this .unconfirmedMessages .isEmpty ()) {
502- Map <Long , AccumulatedEntity > messagesToResend = new TreeMap <>(this .unconfirmedMessages );
503- this .unconfirmedMessages .clear ();
504- Iterator <Entry <Long , AccumulatedEntity >> resendIterator =
505- messagesToResend .entrySet ().iterator ();
506- while (resendIterator .hasNext ()) {
507- List <Object > messages = new ArrayList <>(this .batchSize );
508- int batchCount = 0 ;
509- while (batchCount != this .batchSize ) {
510- Object accMessage =
511- resendIterator .hasNext () ? resendIterator .next ().getValue () : null ;
512- if (accMessage == null ) {
513- break ;
496+ this .executeInLock (
497+ () -> {
498+ LOGGER .debug (
499+ "Recovering producer with {} unconfirmed message(s) and {} accumulated message(s)" ,
500+ this .unconfirmedMessages .size (),
501+ this .accumulator .size ());
502+ if (this .retryOnRecovery ) {
503+ LOGGER .debug (
504+ "Re-publishing {} unconfirmed message(s)" , this .unconfirmedMessages .size ());
505+ if (!this .unconfirmedMessages .isEmpty ()) {
506+ Map <Long , AccumulatedEntity > messagesToResend =
507+ new TreeMap <>(this .unconfirmedMessages );
508+ this .unconfirmedMessages .clear ();
509+ Iterator <Entry <Long , AccumulatedEntity >> resendIterator =
510+ messagesToResend .entrySet ().iterator ();
511+ while (resendIterator .hasNext ()) {
512+ List <Object > messages = new ArrayList <>(this .batchSize );
513+ int batchCount = 0 ;
514+ while (batchCount != this .batchSize ) {
515+ Object accMessage =
516+ resendIterator .hasNext () ? resendIterator .next ().getValue () : null ;
517+ if (accMessage == null ) {
518+ break ;
519+ }
520+ messages .add (accMessage );
521+ batchCount ++;
522+ }
523+ client .publishInternal (
524+ this .publishVersion ,
525+ this .publisherId ,
526+ messages ,
527+ this .writeCallback ,
528+ this .publishSequenceFunction );
529+ }
530+ }
531+ } else {
532+ LOGGER .debug (
533+ "Skipping republishing of {} unconfirmed messages" ,
534+ this .unconfirmedMessages .size ());
535+ Map <Long , AccumulatedEntity > messagesToFail = new TreeMap <>(this .unconfirmedMessages );
536+ this .unconfirmedMessages .clear ();
537+ for (AccumulatedEntity accumulatedEntity : messagesToFail .values ()) {
538+ try {
539+ int permits =
540+ accumulatedEntity
541+ .confirmationCallback ()
542+ .handle (false , CODE_PUBLISH_CONFIRM_TIMEOUT );
543+ this .unconfirmedMessagesSemaphore .release (permits );
544+ } catch (Exception e ) {
545+ LOGGER .debug ("Error while nack-ing outbound message: {}" , e .getMessage ());
546+ this .unconfirmedMessagesSemaphore .release (1 );
514547 }
515- messages .add (accMessage );
516- batchCount ++;
517548 }
518- client .publishInternal (
519- this .publishVersion ,
520- this .publisherId ,
521- messages ,
522- this .writeCallback ,
523- this .publishSequenceFunction );
524549 }
525- }
526- } else {
527- LOGGER .debug (
528- "Skipping republishing of {} unconfirmed messages" , this .unconfirmedMessages .size ());
529- Map <Long , AccumulatedEntity > messagesToFail = new TreeMap <>(this .unconfirmedMessages );
530- this .unconfirmedMessages .clear ();
531- for (AccumulatedEntity accumulatedEntity : messagesToFail .values ()) {
532- try {
533- int permits =
534- accumulatedEntity
535- .confirmationCallback ()
536- .handle (false , CODE_PUBLISH_CONFIRM_TIMEOUT );
537- this .unconfirmedMessagesSemaphore .release (permits );
538- } catch (Exception e ) {
539- LOGGER .debug ("Error while nack-ing outbound message: {}" , e .getMessage ());
540- this .unconfirmedMessagesSemaphore .release (1 );
550+ this .accumulator .flush (true );
551+ int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore .availablePermits ();
552+ if (toRelease > 0 ) {
553+ unconfirmedMessagesSemaphore .release (toRelease );
554+ if (!unconfirmedMessagesSemaphore .tryAcquire (this .unconfirmedMessages .size ())) {
555+ LOGGER .debug (
556+ "Could not acquire {} permit(s) for message republishing" ,
557+ this .unconfirmedMessages .size ());
558+ }
541559 }
542- }
543- }
544- this .accumulator .flush (true );
545- int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore .availablePermits ();
546- if (toRelease > 0 ) {
547- unconfirmedMessagesSemaphore .release (toRelease );
548- if (!unconfirmedMessagesSemaphore .tryAcquire (this .unconfirmedMessages .size ())) {
549- LOGGER .debug (
550- "Could not acquire {} permit(s) for message republishing" ,
551- this .unconfirmedMessages .size ());
552- }
553- }
554- }
560+ });
555561 this .status = Status .RUNNING ;
556562 }
557563
558- synchronized void setClient (Client client ) {
559- this .client = client ;
564+ void setClient (Client client ) {
565+ this .executeInLock (() -> this . client = client ) ;
560566 }
561567
562- synchronized void setPublisherId (byte publisherId ) {
563- this .publisherId = publisherId ;
568+ void setPublisherId (byte publisherId ) {
569+ this .executeInLock (() -> this . publisherId = publisherId ) ;
564570 }
565571
566572 Status status () {
@@ -646,4 +652,21 @@ public int fragmentLength(Object entity) {
646652 }
647653 }
648654 }
655+
656+ void lock () {
657+ this .lock .lock ();
658+ }
659+
660+ void unlock () {
661+ this .lock .unlock ();
662+ }
663+
664+ private void executeInLock (Runnable action ) {
665+ this .lock ();
666+ try {
667+ action .run ();
668+ } finally {
669+ this .unlock ();
670+ }
671+ }
649672}
0 commit comments