|
55 | 55 |
|
56 | 56 | import static org.junit.jupiter.api.Assertions.*; |
57 | 57 | import static org.junit.jupiter.params.provider.Arguments.of; |
58 | | -import static org.mockito.Matchers.anyBoolean; |
59 | | -import static org.mockito.Matchers.anyLong; |
| 58 | +import static org.mockito.ArgumentMatchers.anyBoolean; |
| 59 | +import static org.mockito.ArgumentMatchers.anyLong; |
60 | 60 | import static org.mockito.Mockito.*; |
61 | 61 | import static reactor.rabbitmq.RabbitFlux.createReceiver; |
62 | 62 | import static reactor.rabbitmq.RabbitFlux.createSender; |
@@ -550,20 +550,28 @@ public void senderWithCustomChannelCloseHandler() throws Exception { |
550 | 550 | } |
551 | 551 |
|
552 | 552 | @Test |
553 | | - public void senderWithCustomChannelCloseHandlerPriority() { |
| 553 | + public void senderWithCustomChannelCloseHandlerPriority() throws InterruptedException { |
554 | 554 | int nbMessages = 10; |
555 | 555 | Flux<OutboundMessage> msgFlux = Flux.range(0, nbMessages).map(i -> new OutboundMessage("", queue, "".getBytes())); |
556 | 556 |
|
557 | 557 | SenderChannelCloseHandler channelCloseHandlerInSenderOptions = mock(SenderChannelCloseHandler.class); |
558 | 558 | SenderChannelCloseHandler channelCloseHandlerInSendOptions = mock(SenderChannelCloseHandler.class); |
559 | 559 |
|
| 560 | + CountDownLatch latch = new CountDownLatch(1); |
| 561 | + doAnswer(answer -> { |
| 562 | + latch.countDown(); |
| 563 | + return null; |
| 564 | + }).when(channelCloseHandlerInSendOptions).accept(any(SignalType.class), any(Channel.class)); |
| 565 | + |
560 | 566 | SenderOptions senderOptions = new SenderOptions().channelCloseHandler(channelCloseHandlerInSenderOptions); |
561 | 567 | sender = createSender(senderOptions); |
562 | 568 | SendOptions sendOptions = new SendOptions().channelCloseHandler(channelCloseHandlerInSendOptions); |
563 | 569 |
|
564 | 570 | StepVerifier.create(sender.send(msgFlux, sendOptions)) |
565 | 571 | .verifyComplete(); |
566 | 572 |
|
| 573 | + assertTrue(latch.await(1, TimeUnit.SECONDS)); |
| 574 | + |
567 | 575 | verify(channelCloseHandlerInSenderOptions, never()).accept(any(SignalType.class), any(Channel.class)); |
568 | 576 | verify(channelCloseHandlerInSendOptions, times(1)).accept(any(SignalType.class), any(Channel.class)); |
569 | 577 | } |
@@ -611,7 +619,8 @@ public void publishConfirmsErrorWhilePublishing() throws Exception { |
611 | 619 |
|
612 | 620 | doNothing() |
613 | 621 | .doThrow(new IOException("simulated error while publishing")) |
614 | | - .when(mockChannel).basicPublish(anyString(), anyString(), any(AMQP.BasicProperties.class), any(byte[].class)); |
| 622 | + .when(mockChannel).basicPublish(anyString(), anyString(), nullable(AMQP.BasicProperties.class), any(byte[].class)); |
| 623 | + |
615 | 624 |
|
616 | 625 | int nbMessages = 10; |
617 | 626 | Flux<OutboundMessage> msgFlux = Flux.range(0, nbMessages).map(i -> new OutboundMessage("", queue, "".getBytes())); |
|
0 commit comments