4646import com .rabbitmq .client .amqp .Resource ;
4747import com .rabbitmq .client .amqp .impl .TestUtils .DisabledIfNotCluster ;
4848import com .rabbitmq .client .amqp .impl .TestUtils .Sync ;
49+ import java .time .Duration ;
4950import java .util .List ;
5051import java .util .Set ;
5152import java .util .concurrent .ConcurrentHashMap ;
5960import org .junit .jupiter .api .TestInfo ;
6061import org .junit .jupiter .params .ParameterizedTest ;
6162import org .junit .jupiter .params .provider .EnumSource ;
63+ import org .slf4j .Logger ;
64+ import org .slf4j .LoggerFactory ;
6265
6366@ DisabledIfNotCluster
6467public class ClusterTest {
6568
69+ private static final Logger LOGGER = LoggerFactory .getLogger (ClusterTest .class );
70+
6671 static final String [] URIS =
6772 new String [] {"amqp://localhost:5672" , "amqp://localhost:5673" , "amqp://localhost:5674" };
6873 static final BackOffDelayPolicy BACK_OFF_DELAY_POLICY = BackOffDelayPolicy .fixed (ofMillis (100 ));
@@ -339,9 +344,13 @@ void consumeFromQuorumQueueWhenLeaderIsPaused() throws InterruptedException {
339344
340345 assertThat (initialFollowers ).isNotEmpty ();
341346
347+ LOGGER .info ("Pausing node {}" , initialLeader );
348+
342349 Cli .pauseNode (initialLeader );
343350 nodePaused = true ;
344351
352+ LOGGER .info ("Node {} paused" , initialLeader );
353+
345354 publisher .publish (publisher .message ().messageId (2L ), ctx -> publishSync .down ());
346355
347356 assertThat (publishSync ).completes (ofSeconds (20 ));
@@ -351,6 +360,9 @@ void consumeFromQuorumQueueWhenLeaderIsPaused() throws InterruptedException {
351360 assertThat (messageIds ).containsExactlyInAnyOrder (1L , 2L );
352361 consumeSync .reset ();
353362
363+ LOGGER .info ("Waiting for topology update" );
364+ long start = System .nanoTime ();
365+
354366 waitAtMost (
355367 ofSeconds (60 ),
356368 () -> initialFollowers .contains (mgmt .queueInfo (q ).leader ()),
@@ -361,8 +373,14 @@ void consumeFromQuorumQueueWhenLeaderIsPaused() throws InterruptedException {
361373 + "queue info "
362374 + mgmt .queueInfo (q ));
363375
376+ LOGGER .info (
377+ "Topology updated after {} second(s)" ,
378+ Duration .ofNanos (System .nanoTime () - start ).toSeconds ());
379+
380+ LOGGER .info ("Unpausing node {}" , initialLeader );
364381 Cli .unpauseNode (initialLeader );
365382 nodePaused = false ;
383+ LOGGER .info ("Node {} unpaused" , initialLeader );
366384
367385 publisher .publish (publisher .message ().messageId (3L ), ctx -> publishSync .down ());
368386 assertThat (publishSync ).completes ();
0 commit comments