File tree Expand file tree Collapse file tree 2 files changed +18
-4
lines changed
src/test/java/com/rabbitmq/client/amqp/impl Expand file tree Collapse file tree 2 files changed +18
-4
lines changed Original file line number Diff line number Diff line change @@ -50,7 +50,7 @@ private Cli() {}
5050 public static String rabbitmqctlCommand () {
5151 String rabbitmqCtl = System .getProperty ("rabbitmqctl.bin" );
5252 if (rabbitmqCtl == null ) {
53- rabbitmqCtl = "DOCKER: rabbitmq" ;
53+ rabbitmqCtl = DOCKER_PREFIX + " rabbitmq" ;
5454 }
5555 if (rabbitmqCtl .startsWith (DOCKER_PREFIX )) {
5656 String containerId = rabbitmqCtl .split (":" )[1 ];
@@ -226,6 +226,19 @@ static void deleteQuorumQueueMember(String queue, String node) {
226226 rabbitmqQueues (" delete_member " + queue + " " + node );
227227 }
228228
229+ static String quorumStatus (String queue , String node ) {
230+ String containerId = DOCKER_NODES_TO_CONTAINERS .get (node );
231+ Assert .notNull (containerId , "Container ID for node " + node );
232+ String cmd = rabbitmqQueuesCommand ();
233+ for (String value : DOCKER_NODES_TO_CONTAINERS .values ()) {
234+ if (cmd .contains ("docker exec " + value )) {
235+ cmd = cmd .replace ("docker exec " + value , "docker exec " + containerId );
236+ }
237+ }
238+ System .out .println (cmd );
239+ return executeCommand (cmd + " quorum_status --formatter erlang " + queue ).output ();
240+ }
241+
229242 static void addStreamMember (String stream , String node ) {
230243 rabbitmqStreams (" add_replica " + stream + " " + node );
231244 }
Original file line number Diff line number Diff line change @@ -299,10 +299,12 @@ void publishConsumeQuorumQueueWhenLeaderChanges() {
299299 }
300300
301301 @ Test
302- void consumeFromQuorumQueueWhenLeaderIsPaused () {
302+ void consumeFromQuorumQueueWhenLeaderIsPaused () throws InterruptedException {
303303 management .queue (q ).type (QUORUM ).declare ();
304304 Management .QueueInfo queueInfo = queueInfo ();
305305 String initialLeader = queueInfo .leader ();
306+ List <String > initialFollowers =
307+ queueInfo .members ().stream ().filter (n -> !n .equals (initialLeader )).collect (toList ());
306308 boolean nodePaused = false ;
307309 try {
308310 AmqpConnection consumeConnection = connection (b -> b .affinity ().queue (q ).operation (CONSUME ));
@@ -335,8 +337,6 @@ void consumeFromQuorumQueueWhenLeaderIsPaused() {
335337 assertThat (messageIds ).containsExactlyInAnyOrder (1L );
336338 consumeSync .reset ();
337339
338- List <String > initialFollowers =
339- queueInfo .members ().stream ().filter (n -> !n .equals (initialLeader )).collect (toList ());
340340 assertThat (initialFollowers ).isNotEmpty ();
341341
342342 Cli .pauseNode (initialLeader );
@@ -384,6 +384,7 @@ void consumeFromQuorumQueueWhenLeaderIsPaused() {
384384 if (nodePaused ) {
385385 Cli .unpauseNode (initialLeader );
386386 }
387+ System .out .println (Cli .quorumStatus (q , initialFollowers .get (0 )));
387388 management .queueDelete (q );
388389 }
389390 }
You can’t perform that action at this time.
0 commit comments