@@ -641,12 +641,7 @@ grow_then_shrink_coordinator_cluster(Config) ->
641641 ? assertEqual ({'queue.declare_ok' , Q , 0 , 0 },
642642 declare (Config , Server0 , Q , [{<<" x-queue-type" >>, longstr , <<" stream" >>}])),
643643
644- ok = rabbit_control_helper :command (stop_app , Server1 ),
645- ok = rabbit_control_helper :command (join_cluster , Server1 , [atom_to_list (Server0 )], []),
646- ok = rabbit_control_helper :command (start_app , Server1 ),
647- ok = rabbit_control_helper :command (stop_app , Server2 ),
648- ok = rabbit_control_helper :command (join_cluster , Server2 , [atom_to_list (Server0 )], []),
649- ok = rabbit_control_helper :command (start_app , Server2 ),
644+ _Config1 = rabbit_ct_broker_helpers :cluster_nodes (Config ),
650645
651646 rabbit_ct_helpers :await_condition (
652647 fun () ->
@@ -683,37 +678,35 @@ grow_coordinator_cluster(Config) ->
683678 Q = ? config (queue_name , Config ),
684679
685680 ? assertEqual ({'queue.declare_ok' , Q , 0 , 0 },
686- declare (Config , Server0 , Q , [{<<" x-queue-type" >>, longstr , <<" stream" >>}])),
681+ declare (Config , Server1 , Q , [{<<" x-queue-type" >>, longstr , <<" stream" >>}])),
687682
688- ok = rabbit_control_helper :command (stop_app , Server1 ),
689- ok = rabbit_control_helper :command (join_cluster , Server1 , [atom_to_list (Server0 )], []),
690- rabbit_control_helper :command (start_app , Server1 ),
683+ Config1 = rabbit_ct_broker_helpers :cluster_nodes (Config , Server1 , [Server0 ]),
691684 % % at this point there _probably_ won't be a stream coordinator member on
692685 % % Server1
693686
694687 % % check we can add a new stream replica for the previously declare stream
695688 ? assertEqual (ok ,
696- rpc :call (Server1 , rabbit_stream_queue , add_replica ,
697- [<<" /" >>, Q , Server1 ])),
689+ rpc :call (Server0 , rabbit_stream_queue , add_replica ,
690+ [<<" /" >>, Q , Server0 ])),
698691 % % also check we can declare a new stream when calling Server1
699692 Q2 = unicode :characters_to_binary ([Q , <<" _2" >>]),
700693 ? assertEqual ({'queue.declare_ok' , Q2 , 0 , 0 },
701- declare (Config , Server1 , Q2 , [{<<" x-queue-type" >>, longstr , <<" stream" >>}])),
694+ declare (Config1 , Server0 , Q2 , [{<<" x-queue-type" >>, longstr , <<" stream" >>}])),
702695
703696 % % wait until the stream coordinator detects there is a new rabbit node
704697 % % and adds a new member on the new node
705698 rabbit_ct_helpers :await_condition (
706699 fun () ->
707- case rpc :call (Server0 , ra , members ,
708- [{rabbit_stream_coordinator , Server0 }]) of
700+ case rpc :call (Server1 , ra , members ,
701+ [{rabbit_stream_coordinator , Server1 }]) of
709702 {_ , Members , _ } ->
710703 Nodes = lists :sort ([N || {_ , N } <- Members ]),
711704 lists :sort ([Server0 , Server1 ]) == Nodes ;
712705 _ ->
713706 false
714707 end
715708 end , 60000 ),
716- rabbit_ct_broker_helpers :rpc (Config , 0 , ? MODULE , delete_testcase_queue , [Q ]).
709+ rabbit_ct_broker_helpers :rpc (Config1 , 1 , ? MODULE , delete_testcase_queue , [Q ]).
717710
718711shrink_coordinator_cluster (Config ) ->
719712 [Server0 , Server1 , Server2 ] =
@@ -979,19 +972,17 @@ consume_without_local_replica(Config) ->
979972 rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
980973 Q = ? config (queue_name , Config ),
981974 ? assertEqual ({'queue.declare_ok' , Q , 0 , 0 },
982- declare (Config , Server0 , Q , [{<<" x-queue-type" >>, longstr , <<" stream" >>}])),
975+ declare (Config , Server1 , Q , [{<<" x-queue-type" >>, longstr , <<" stream" >>}])),
983976 % % Add another node to the cluster, but it won't have a replica
984- ok = rabbit_control_helper :command (stop_app , Server1 ),
985- ok = rabbit_control_helper :command (join_cluster , Server1 , [atom_to_list (Server0 )], []),
986- rabbit_control_helper :command (start_app , Server1 ),
977+ Config1 = rabbit_ct_broker_helpers :cluster_nodes (Config , Server1 , [Server0 ]),
987978 timer :sleep (1000 ),
988979
989- Ch1 = rabbit_ct_client_helpers :open_channel (Config , Server1 ),
980+ Ch1 = rabbit_ct_client_helpers :open_channel (Config1 , Server0 ),
990981 qos (Ch1 , 10 , false ),
991982 ? assertExit ({{shutdown , {server_initiated_close , 406 , _ }}, _ },
992983 amqp_channel :subscribe (Ch1 , # 'basic.consume' {queue = Q , consumer_tag = <<" ctag" >>},
993984 self ())),
994- rabbit_ct_broker_helpers :rpc (Config , 0 , ? MODULE , delete_testcase_queue , [Q ]).
985+ rabbit_ct_broker_helpers :rpc (Config1 , 1 , ? MODULE , delete_testcase_queue , [Q ]).
995986
996987consume (Config ) ->
997988 [Server | _ ] = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
0 commit comments