3030import reactor .core .publisher .Flux ;
3131import reactor .core .publisher .FluxOperator ;
3232import reactor .core .publisher .Mono ;
33+ import reactor .core .publisher .SignalType ;
3334import reactor .core .publisher .Operators ;
3435import reactor .core .scheduler .Scheduler ;
3536import reactor .core .scheduler .Schedulers ;
3637
3738import java .io .IOException ;
3839import java .util .Iterator ;
3940import java .util .Map ;
41+ import java .util .Objects ;
4042import java .util .concurrent .ConcurrentNavigableMap ;
4143import java .util .concurrent .ConcurrentSkipListMap ;
4244import java .util .concurrent .atomic .AtomicBoolean ;
4345import java .util .concurrent .atomic .AtomicReference ;
4446import java .util .function .BiConsumer ;
4547import java .util .function .Function ;
4648import java .util .function .Supplier ;
49+ import java .util .stream .Stream ;
4750
4851/**
4952 * Reactive abstraction to create resources and send messages.
@@ -58,6 +61,10 @@ public class Sender implements AutoCloseable {
5861
5962 private final Mono <? extends Connection > connectionMono ;
6063
64+ private final Mono <? extends Channel > channelMono ;
65+
66+ private final BiConsumer <SignalType , Channel > channelCloseHandler ;
67+
6168 private final AtomicBoolean hasConnection = new AtomicBoolean (false );
6269
6370 private final Mono <? extends Channel > resourceManagementChannelMono ;
@@ -83,6 +90,10 @@ public Sender(SenderOptions options) {
8390 .doOnSubscribe (c -> hasConnection .set (true ))
8491 .subscribeOn (this .connectionSubscriptionScheduler )
8592 .cache ();
93+ this .channelMono = options .getChannelMono ();
94+ this .channelCloseHandler = options .getChannelCloseHandler () == null ?
95+ new ChannelCloseHandlers .SenderChannelCloseHandler () :
96+ options .getChannelCloseHandler ();
8697 this .privateResourceManagementScheduler = options .getResourceManagementScheduler () == null ;
8798 this .resourceManagementScheduler = options .getResourceManagementScheduler () == null ?
8899 createScheduler ("rabbitmq-sender-resource-creation" ) : options .getResourceManagementScheduler ();
@@ -102,8 +113,9 @@ public Mono<Void> send(Publisher<OutboundMessage> messages, SendOptions options)
102113 // TODO using a pool of channels?
103114 // would be much more efficient if send is called very often
104115 // less useful if seldom called, only for long or infinite message flux
105- final Mono <Channel > currentChannelMono = connectionMono . map ( CHANNEL_CREATION_FUNCTION ). cache ();
116+ final Mono <? extends Channel > currentChannelMono = getChannelMono ();
106117 final BiConsumer <SendContext , Exception > exceptionHandler = options .getExceptionHandler ();
118+
107119 return currentChannelMono .flatMapMany (channel ->
108120 Flux .from (messages )
109121 .doOnNext (message -> {
@@ -119,17 +131,7 @@ public Mono<Void> send(Publisher<OutboundMessage> messages, SendOptions options)
119131 }
120132 })
121133 .doOnError (e -> LOGGER .warn ("Send failed with exception {}" , e ))
122- .doFinally (st -> {
123- int channelNumber = channel .getChannelNumber ();
124- LOGGER .info ("closing channel {} by signal {}" , channelNumber , st );
125- try {
126- if (channel .isOpen () && channel .getConnection ().isOpen ()) {
127- channel .close ();
128- }
129- } catch (Exception e ) {
130- LOGGER .warn ("Channel {} didn't close normally: {}" , channelNumber , e .getMessage ());
131- }
132- })
134+ .doFinally (st -> channelCloseHandler .accept (st , channel ))
133135 ).then ();
134136 }
135137
@@ -141,17 +143,21 @@ public Flux<OutboundMessageResult> sendWithPublishConfirms(Publisher<OutboundMes
141143 // TODO using a pool of channels?
142144 // would be much more efficient if send is called very often
143145 // less useful if seldom called, only for long or infinite message flux
144- final Mono <Channel > channelMono = connectionMono .map (CHANNEL_CREATION_FUNCTION )
145- .map (channel -> {
146+ final Mono <? extends Channel > currentChannelMono = getChannelMono ();
147+
148+ return currentChannelMono .map (channel -> {
146149 try {
147150 channel .confirmSelect ();
148151 } catch (IOException e ) {
149152 throw new RabbitFluxException ("Error while setting publisher confirms on channel" , e );
150153 }
151154 return channel ;
152- });
155+ })
156+ .flatMapMany (channel -> new PublishConfirmOperator (messages , channel , channelCloseHandler , options ));
157+ }
153158
154- return channelMono .flatMapMany (channel -> new PublishConfirmOperator (messages , channel , options ));
159+ private Mono <? extends Channel > getChannelMono () {
160+ return channelMono != null ? channelMono : connectionMono .map (CHANNEL_CREATION_FUNCTION );
155161 }
156162
157163 public RpcClient rpcClient (String exchange , String routingKey ) {
@@ -452,15 +458,18 @@ private static class PublishConfirmOperator
452458
453459 private final SendOptions options ;
454460
455- public PublishConfirmOperator (Publisher <OutboundMessage > source , Channel channel , SendOptions options ) {
461+ private final BiConsumer <SignalType , Channel > channelCloseHandler ;
462+
463+ public PublishConfirmOperator (Publisher <OutboundMessage > source , Channel channel , BiConsumer <SignalType , Channel > channelCloseHandler , SendOptions options ) {
456464 super (Flux .from (source ));
457465 this .channel = channel ;
466+ this .channelCloseHandler = channelCloseHandler ;
458467 this .options = options ;
459468 }
460469
461470 @ Override
462471 public void subscribe (CoreSubscriber <? super OutboundMessageResult > actual ) {
463- source .subscribe (new PublishConfirmSubscriber (channel , actual , options ));
472+ source .subscribe (new PublishConfirmSubscriber (channel , channelCloseHandler , actual , options ));
464473 }
465474 }
466475
@@ -479,8 +488,11 @@ private static class PublishConfirmSubscriber implements
479488
480489 private final BiConsumer <SendContext , Exception > exceptionHandler ;
481490
482- private PublishConfirmSubscriber (Channel channel , Subscriber <? super OutboundMessageResult > subscriber , SendOptions options ) {
491+ private final BiConsumer <SignalType , Channel > channelCloseHandler ;
492+
493+ private PublishConfirmSubscriber (Channel channel , BiConsumer <SignalType , Channel > channelCloseHandler , Subscriber <? super OutboundMessageResult > subscriber , SendOptions options ) {
483494 this .channel = channel ;
495+ this .channelCloseHandler = channelCloseHandler ;
484496 this .subscriber = subscriber ;
485497 this .exceptionHandler = options .getExceptionHandler ();
486498 }
@@ -563,7 +575,7 @@ public void onNext(OutboundMessage message) {
563575 public void onError (Throwable throwable ) {
564576 if (state .compareAndSet (SubscriberState .ACTIVE , SubscriberState .COMPLETE ) ||
565577 state .compareAndSet (SubscriberState .OUTBOUND_DONE , SubscriberState .COMPLETE )) {
566- closeResources ( );
578+ channelCloseHandler . accept ( SignalType . ON_ERROR , channel );
567579 // complete the flux state
568580 subscriber .onError (throwable );
569581 } else if (firstException .compareAndSet (null , throwable ) && state .get () == SubscriberState .COMPLETE ) {
@@ -594,21 +606,11 @@ private void handleError(Exception e, OutboundMessageResult result) {
594606 private void maybeComplete () {
595607 boolean done = state .compareAndSet (SubscriberState .OUTBOUND_DONE , SubscriberState .COMPLETE );
596608 if (done ) {
597- closeResources ( );
609+ channelCloseHandler . accept ( SignalType . ON_COMPLETE , channel );
598610 subscriber .onComplete ();
599611 }
600612 }
601613
602- private void closeResources () {
603- try {
604- if (channel .isOpen ()) {
605- channel .close ();
606- }
607- } catch (Exception e ) {
608- // not much we can do here
609- }
610- }
611-
612614 public <T > boolean checkComplete (T t ) {
613615 boolean complete = state .get () == SubscriberState .COMPLETE ;
614616 if (complete && firstException .get () == null ) {
0 commit comments