@@ -802,27 +802,31 @@ public void publishConfirmsErrorWhilePublishing() throws Exception {
802802 when (mockChannel .getNextPublishSeqNo ()).thenAnswer (invocation -> publishSequence .incrementAndGet ());
803803 when (mockChannel .isOpen ()).thenReturn (true );
804804
805+ CountDownLatch serverPublishConfirmLatch = new CountDownLatch (1 );
805806 doNothing ()
806- .doThrow (new IOException ("simulated error while publishing" ))
807+ .doAnswer (answer -> {
808+ // see https://github.com/reactor/reactor-rabbitmq/pull/67#issuecomment-472789735
809+ serverPublishConfirmLatch .await (5 , TimeUnit .SECONDS );
810+ throw new IOException ("simulated error while publishing" );
811+ })
812+ .doThrow ()
807813 .when (mockChannel ).basicPublish (anyString (), anyString (), nullable (AMQP .BasicProperties .class ), any (byte [].class ));
808814
809815
810816 int nbMessages = 10 ;
811817 Flux <OutboundMessage > msgFlux = Flux .range (0 , nbMessages ).map (i -> new OutboundMessage ("" , queue , "" .getBytes ()));
812- int nbMessagesAckNack = 1 ; // why it was 2, shouldn't it be 1 ??
818+ int nbMessagesAckNack = 1 + 1 ; // first published message confirmed + "fake" confirmation because of sending failure
813819 CountDownLatch confirmLatch = new CountDownLatch (nbMessagesAckNack );
814820 sender = createSender (new SenderOptions ().connectionFactory (mockConnectionFactory ));
815821 sender .sendWithPublishConfirms (msgFlux , new SendOptions ().exceptionHandler ((ctx , e ) -> {
816822 throw new RabbitFluxException (e );
817- })) // Before change: (onNext -> onError -> onNext )
818- // After change (maxInFlight): (onNext -> onError)
819- .subscribe (outboundMessageResult -> {
820- if (outboundMessageResult .getOutboundMessage () != null ) {
821- confirmLatch .countDown ();
822- }
823- },
824- error -> {
825- });
823+ })).subscribe (outboundMessageResult -> {
824+ if (outboundMessageResult .getOutboundMessage () != null ) {
825+ confirmLatch .countDown ();
826+ }
827+ },
828+ error -> {
829+ });
826830
827831 // have to wait a bit the subscription propagates and add the confirm listener
828832 Thread .sleep (100L );
@@ -834,6 +838,7 @@ public void publishConfirmsErrorWhilePublishing() throws Exception {
834838 ExecutorService ioExecutor = Executors .newSingleThreadExecutor ();
835839 ioExecutor .submit (() -> {
836840 confirmListener .handleAck (1 , false );
841+ serverPublishConfirmLatch .countDown ();
837842 return null ;
838843 });
839844
0 commit comments