4141import java .util .Objects ;
4242import java .util .concurrent .ConcurrentNavigableMap ;
4343import java .util .concurrent .ConcurrentSkipListMap ;
44+ import java .util .concurrent .ExecutorService ;
45+ import java .util .concurrent .Executors ;
4446import java .util .concurrent .atomic .AtomicBoolean ;
4547import java .util .concurrent .atomic .AtomicReference ;
4648import java .util .function .BiConsumer ;
@@ -77,6 +79,8 @@ public class Sender implements AutoCloseable {
7779
7880 private final boolean privateConnectionSubscriptionScheduler ;
7981
82+ private final ExecutorService channelCloseThreadPool = Executors .newCachedThreadPool ();
83+
8084 public Sender () {
8185 this (new SenderOptions ());
8286 }
@@ -111,9 +115,6 @@ public Mono<Void> send(Publisher<OutboundMessage> messages) {
111115
112116 public Mono <Void > send (Publisher <OutboundMessage > messages , SendOptions options ) {
113117 options = options == null ? new SendOptions () : options ;
114- // TODO using a pool of channels?
115- // would be much more efficient if send is called very often
116- // less useful if seldom called, only for long or infinite message flux
117118 final Mono <? extends Channel > currentChannelMono = getChannelMono (options );
118119 final BiConsumer <SendContext , Exception > exceptionHandler = options .getExceptionHandler ();
119120 final BiConsumer <SignalType , Channel > channelCloseHandler = getChannelCloseHandler (options );
@@ -142,14 +143,10 @@ public Flux<OutboundMessageResult> sendWithPublishConfirms(Publisher<OutboundMes
142143 }
143144
144145 public Flux <OutboundMessageResult > sendWithPublishConfirms (Publisher <OutboundMessage > messages , SendOptions options ) {
145- options = options == null ? new SendOptions () : options ;
146- // TODO using a pool of channels?
147- // would be much more efficient if send is called very often
148- // less useful if seldom called, only for long or infinite message flux
146+ SendOptions sendOptions = options == null ? new SendOptions () : options ;
149147 final Mono <? extends Channel > currentChannelMono = getChannelMono (options );
150148 final BiConsumer <SignalType , Channel > channelCloseHandler = getChannelCloseHandler (options );
151149
152- SendOptions sendOptions = options ;
153150 return currentChannelMono .map (channel -> {
154151 try {
155152 channel .confirmSelect ();
@@ -158,7 +155,17 @@ public Flux<OutboundMessageResult> sendWithPublishConfirms(Publisher<OutboundMes
158155 }
159156 return channel ;
160157 })
161- .flatMapMany (channel -> new PublishConfirmOperator (messages , channel , channelCloseHandler , sendOptions ));
158+ .flatMapMany (channel -> Flux .from (new PublishConfirmOperator (messages , channel , sendOptions )).doFinally (signalType -> {
159+ // channel close is no longer a responsibility of PublishConfirmOperator, not sure whether it is a correct approach
160+ // added to avoid creating threads inside PublishConfirmOperator, which make ChannelPool useless
161+ if (signalType == SignalType .ON_ERROR ) {
162+ channelCloseHandler .accept (signalType , channel );
163+ } else {
164+ // confirmation listeners are executed in the IO reading thread
165+ // so we need to complete in another thread
166+ channelCloseThreadPool .execute (() -> channelCloseHandler .accept (signalType , channel ));
167+ }
168+ }));
162169 }
163170
164171 // package-protected for testing
@@ -403,6 +410,7 @@ public void close() {
403410 if (this .privateResourceManagementScheduler ) {
404411 this .resourceManagementScheduler .dispose ();
405412 }
413+ channelCloseThreadPool .shutdown ();
406414 }
407415
408416 public static class SendContext {
@@ -471,18 +479,15 @@ private static class PublishConfirmOperator
471479
472480 private final SendOptions options ;
473481
474- private final BiConsumer <SignalType , Channel > channelCloseHandler ;
475-
476- public PublishConfirmOperator (Publisher <OutboundMessage > source , Channel channel , BiConsumer <SignalType , Channel > channelCloseHandler , SendOptions options ) {
482+ public PublishConfirmOperator (Publisher <OutboundMessage > source , Channel channel , SendOptions options ) {
477483 super (Flux .from (source ));
478484 this .channel = channel ;
479- this .channelCloseHandler = channelCloseHandler ;
480485 this .options = options ;
481486 }
482487
483488 @ Override
484489 public void subscribe (CoreSubscriber <? super OutboundMessageResult > actual ) {
485- source .subscribe (new PublishConfirmSubscriber (channel , channelCloseHandler , actual , options ));
490+ source .subscribe (new PublishConfirmSubscriber (channel , actual , options ));
486491 }
487492 }
488493
@@ -501,11 +506,8 @@ private static class PublishConfirmSubscriber implements
501506
502507 private final BiConsumer <SendContext , Exception > exceptionHandler ;
503508
504- private final BiConsumer <SignalType , Channel > channelCloseHandler ;
505-
506- private PublishConfirmSubscriber (Channel channel , BiConsumer <SignalType , Channel > channelCloseHandler , Subscriber <? super OutboundMessageResult > subscriber , SendOptions options ) {
509+ private PublishConfirmSubscriber (Channel channel , Subscriber <? super OutboundMessageResult > subscriber , SendOptions options ) {
507510 this .channel = channel ;
508- this .channelCloseHandler = channelCloseHandler ;
509511 this .subscriber = subscriber ;
510512 this .exceptionHandler = options .getExceptionHandler ();
511513 }
@@ -546,12 +548,7 @@ private void handleAckNack(long deliveryTag, boolean multiple, boolean ack) {
546548 }
547549 }
548550 if (unconfirmed .size () == 0 ) {
549- // FIXME create new thread only if needed, likely in maybeComplete()
550- new Thread (() -> {
551- // confirmation listeners are executed in the IO reading thread
552- // so we need to complete in another thread
553- maybeComplete ();
554- }).start ();
551+ maybeComplete ();
555552 }
556553 }
557554 });
@@ -588,7 +585,6 @@ public void onNext(OutboundMessage message) {
588585 public void onError (Throwable throwable ) {
589586 if (state .compareAndSet (SubscriberState .ACTIVE , SubscriberState .COMPLETE ) ||
590587 state .compareAndSet (SubscriberState .OUTBOUND_DONE , SubscriberState .COMPLETE )) {
591- channelCloseHandler .accept (SignalType .ON_ERROR , channel );
592588 // complete the flux state
593589 subscriber .onError (throwable );
594590 } else if (firstException .compareAndSet (null , throwable ) && state .get () == SubscriberState .COMPLETE ) {
@@ -619,7 +615,6 @@ private void handleError(Exception e, OutboundMessageResult result) {
619615 private void maybeComplete () {
620616 boolean done = state .compareAndSet (SubscriberState .OUTBOUND_DONE , SubscriberState .COMPLETE );
621617 if (done ) {
622- channelCloseHandler .accept (SignalType .ON_COMPLETE , channel );
623618 subscriber .onComplete ();
624619 }
625620 }
0 commit comments