2424import org .junit .jupiter .params .provider .Arguments ;
2525import org .junit .jupiter .params .provider .MethodSource ;
2626import org .mockito .ArgumentCaptor ;
27+ import org .reactivestreams .Publisher ;
2728import org .reactivestreams .Subscription ;
2829import reactor .core .Disposable ;
2930import reactor .core .Exceptions ;
30- import reactor .core .publisher .BaseSubscriber ;
31- import reactor .core .publisher .Flux ;
32- import reactor .core .publisher .FluxSink ;
33- import reactor .core .publisher .Mono ;
34- import reactor .core .publisher .SignalType ;
31+ import reactor .core .publisher .*;
3532import reactor .core .scheduler .Schedulers ;
3633import reactor .rabbitmq .ChannelCloseHandlers .SenderChannelCloseHandler ;
3734import reactor .test .StepVerifier ;
3835import reactor .util .function .Tuple2 ;
36+ import reactor .util .function .Tuple3 ;
3937import reactor .util .function .Tuples ;
4038
4139import java .io .IOException ;
5048import java .util .concurrent .atomic .AtomicLong ;
5149import java .util .concurrent .atomic .AtomicReference ;
5250import java .util .function .BiFunction ;
51+ import java .util .function .Function ;
5352import java .util .stream .IntStream ;
5453import java .util .stream .Stream ;
5554
@@ -77,11 +76,18 @@ public class RabbitFluxTests {
7776
7877 static Stream <Arguments > noAckAndManualAckFluxArguments () {
7978 return Stream .of (
80- of ((BiFunction <Receiver , String , Flux <Delivery >>) (receiver , queue ) -> receiver .consumeNoAck (queue )),
81- of ((BiFunction <Receiver , String , Flux <? extends Delivery >>) (receiver , queue ) -> receiver .consumeManualAck (queue ))
79+ of ((BiFunction <Receiver , String , Flux <Delivery >>) (receiver , queue ) -> receiver .consumeNoAck (queue )),
80+ of ((BiFunction <Receiver , String , Flux <? extends Delivery >>) (receiver , queue ) -> receiver .consumeManualAck (queue ))
8281 );
8382 }
8483
84+ public static Object [][] senderWithCustomChannelCloseHandlerPriorityArguments () {
85+ return new Object [][]{
86+ new Object []{10 , (Function <Tuple3 <Sender , Publisher <OutboundMessage >, SendOptions >, Publisher >) objects -> objects .getT1 ().send (objects .getT2 (), objects .getT3 ()), 0 },
87+ new Object []{10 , (Function <Tuple3 <Sender , Publisher <OutboundMessage >, SendOptions >, Publisher >) objects -> objects .getT1 ().sendWithPublishConfirms (objects .getT2 (), objects .getT3 ()), 10 }
88+ };
89+ }
90+
8591 @ BeforeEach
8692 public void init () throws Exception {
8793 ConnectionFactory connectionFactory = new ConnectionFactory ();
@@ -407,7 +413,8 @@ public void receiverErrorHandling(BiFunction<Receiver, String, Flux<? extends De
407413 receiver = RabbitFlux .createReceiver (new ReceiverOptions ().connectionMono (connectionMono ));
408414 Flux <? extends Delivery > flux = fluxFactory .apply (receiver , queue );
409415 AtomicBoolean errorHandlerCalled = new AtomicBoolean (false );
410- Disposable disposable = flux .subscribe (delivery -> { }, error -> errorHandlerCalled .set (true ));
416+ Disposable disposable = flux .subscribe (delivery -> {
417+ }, error -> errorHandlerCalled .set (true ));
411418 assertTrue (errorHandlerCalled .get ());
412419 disposable .dispose ();
413420 }
@@ -435,7 +442,8 @@ public void receiverFluxDisposedOnConnectionClose(BiFunction<Receiver, String, F
435442 Disposable subscription = flux .subscribe (msg -> {
436443 counter .incrementAndGet ();
437444 messageReceivedLatch .countDown ();
438- }, error -> {}, () -> completedLatch .countDown ());
445+ }, error -> {
446+ }, () -> completedLatch .countDown ());
439447
440448 assertTrue (messageReceivedLatch .await (1 , TimeUnit .SECONDS ));
441449 assertEquals (nbMessages , counter .get ());
@@ -485,12 +493,12 @@ public void senderRetryCreateChannel() throws Exception {
485493 sender = createSender (new SenderOptions ().connectionFactory (mockConnectionFactory ));
486494
487495 StepVerifier .create (sender .send (msgFlux ).retry (2 ))
488- .verifyComplete ();
496+ .verifyComplete ();
489497 verify (mockConnection , times (3 )).createChannel ();
490498
491499 StepVerifier .create (consume (queue , nbMessages ))
492- .expectNextCount (nbMessages )
493- .verifyComplete ();
500+ .expectNextCount (nbMessages )
501+ .verifyComplete ();
494502 }
495503
496504 @ Test
@@ -500,9 +508,9 @@ public void senderRetryNotWorkingWhenCreateChannelIsCached() throws Exception {
500508 Connection mockConnection = mock (Connection .class );
501509 Channel mockChannel = mock (Channel .class );
502510 when (mockConnection .createChannel ())
503- .thenThrow (new RuntimeException ("already closed exception" ))
504- .thenThrow (new RuntimeException ("already closed exception" ))
505- .thenReturn (mockChannel );
511+ .thenThrow (new RuntimeException ("already closed exception" ))
512+ .thenThrow (new RuntimeException ("already closed exception" ))
513+ .thenReturn (mockChannel );
506514
507515 Flux <OutboundMessage > msgFlux = Flux .range (0 , nbMessages ).map (i -> new OutboundMessage ("" , queue , "" .getBytes ()));
508516
@@ -512,8 +520,8 @@ public void senderRetryNotWorkingWhenCreateChannelIsCached() throws Exception {
512520 sender = createSender (senderOptions );
513521
514522 StepVerifier .create (sender .send (msgFlux ).retry (2 ))
515- .expectError (RabbitFluxException .class )
516- .verify ();
523+ .expectError (RabbitFluxException .class )
524+ .verify ();
517525
518526 verify (mockChannel , never ()).basicPublish (anyString (), anyString (), any (AMQP .BasicProperties .class ), any (byte [].class ));
519527 verify (mockChannel , never ()).close ();
@@ -541,17 +549,19 @@ public void senderWithCustomChannelCloseHandler() throws Exception {
541549 });
542550
543551 StepVerifier .create (sendTwice )
544- .verifyComplete ();
552+ .verifyComplete ();
545553 verify (channelCloseHandler , times (2 )).accept (SignalType .ON_COMPLETE , monoChannel .block ());
546554
547555 StepVerifier .create (consume (queue , nbMessages * 2 ))
548556 .expectNextCount (nbMessages * 2 )
549557 .verifyComplete ();
550558 }
551559
552- @ Test
553- public void senderWithCustomChannelCloseHandlerPriority () throws InterruptedException {
554- int nbMessages = 10 ;
560+ @ ParameterizedTest
561+ @ MethodSource ("senderWithCustomChannelCloseHandlerPriorityArguments" )
562+ public void senderWithCustomChannelCloseHandlerPriority (int nbMessages ,
563+ Function <Tuple3 <Sender , Publisher <OutboundMessage >, SendOptions >, Publisher > sendingCallback ,
564+ int expectedCount ) throws InterruptedException {
555565 Flux <OutboundMessage > msgFlux = Flux .range (0 , nbMessages ).map (i -> new OutboundMessage ("" , queue , "" .getBytes ()));
556566
557567 SenderChannelCloseHandler channelCloseHandlerInSenderOptions = mock (SenderChannelCloseHandler .class );
@@ -567,7 +577,10 @@ public void senderWithCustomChannelCloseHandlerPriority() throws InterruptedExce
567577 sender = createSender (senderOptions );
568578 SendOptions sendOptions = new SendOptions ().channelCloseHandler (channelCloseHandlerInSendOptions );
569579
570- StepVerifier .create (sender .send (msgFlux , sendOptions ))
580+ Publisher sendingResult = sendingCallback .apply (Tuples .of (sender , msgFlux , sendOptions ));
581+
582+ StepVerifier .create (sendingResult )
583+ .expectNextCount (expectedCount )
571584 .verifyComplete ();
572585
573586 assertTrue (latch .await (1 , TimeUnit .SECONDS ));
@@ -970,4 +983,5 @@ private Channel createChannel(Connection connection) {
970983 throw new RabbitFluxException (e );
971984 }
972985 }
986+
973987}
0 commit comments