diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 159973a804f8..88826c5ef00b 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -6299,27 +6299,10 @@ ra_name(Q) -> wait_for_local_member(<<"stream">>, QName, Config) -> %% If it is a stream we need to wait until there is a local member %% on the node we want to subscribe from before proceeding. - rabbit_ct_helpers:await_condition( - fun() -> rpc(Config, 0, ?MODULE, has_local_member, - [rabbit_misc:r(<<"/">>, queue, QName)]) - end, 30_000); + ok = queue_utils:wait_for_local_stream_member(0, <<"/">>, QName, Config); wait_for_local_member(_, _, _) -> ok. -has_local_member(QName) -> - case rabbit_amqqueue:lookup(QName) of - {ok, Q} -> - #{name := StreamId} = amqqueue:get_type_state(Q), - case rabbit_stream_coordinator:local_pid(StreamId) of - {ok, Pid} -> - is_process_alive(Pid); - {error, _} -> - false - end; - {error, _} -> - false - end. - -spec find_event(Type, Props, Events) -> Ret when Type :: atom(), Props :: proplists:proplist(), diff --git a/deps/rabbit/test/queue_type_SUITE.erl b/deps/rabbit/test/queue_type_SUITE.erl index 80ba120db31d..6de4a29d2fc4 100644 --- a/deps/rabbit/test/queue_type_SUITE.erl +++ b/deps/rabbit/test/queue_type_SUITE.erl @@ -240,11 +240,7 @@ stream(Config) -> SubCh = rabbit_ct_client_helpers:open_channel(Config, 2), qos(SubCh, 10, false), - %% wait for local replica - rabbit_ct_helpers:await_condition( - fun() -> - queue_utils:has_local_stream_member(Config, 2, QName, <<"/">>) - end, 60000), + ok = queue_utils:wait_for_local_stream_member(2, <<"/">>, QName, Config), try amqp_channel:subscribe( diff --git a/deps/rabbit/test/queue_utils.erl b/deps/rabbit/test/queue_utils.erl index 3fbf143aeceb..cbd3d1555a93 100644 --- a/deps/rabbit/test/queue_utils.erl +++ b/deps/rabbit/test/queue_utils.erl @@ -14,7 +14,7 @@ ra_name/1, fifo_machines_use_same_version/1, fifo_machines_use_same_version/2, - has_local_stream_member/4, + wait_for_local_stream_member/4, has_local_stream_member_rpc/1 ]). @@ -170,11 +170,13 @@ fifo_machines_use_same_version(Config, Nodenames) || Nodename <- Nodenames], lists:all(fun(V) -> V =:= MachineAVersion end, OtherMachinesVersions). -has_local_stream_member(Config, Node, QName, VHost) -> - QRes = rabbit_misc:r(VHost, queue, QName), - rabbit_ct_broker_helpers:rpc(Config, Node, ?MODULE, - has_local_stream_member_rpc, - [QRes]). +wait_for_local_stream_member(Node, Vhost, QNameBin, Config) -> + QName = rabbit_misc:queue_resource(Vhost, QNameBin), + rabbit_ct_helpers:await_condition( + fun() -> + rabbit_ct_broker_helpers:rpc( + Config, Node, ?MODULE, has_local_stream_member_rpc, [QName]) + end, 60_000). has_local_stream_member_rpc(QName) -> case rabbit_amqqueue:lookup(QName) of @@ -183,9 +185,9 @@ has_local_stream_member_rpc(QName) -> case rabbit_stream_coordinator:local_pid(StreamId) of {ok, Pid} -> is_process_alive(Pid); - _ -> + {error, _} -> false end; - _Err -> + {error, _} -> false end. diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 79d8ab617eb4..2111e8e51cbf 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -1732,6 +1732,7 @@ consume_from_replica(Config) -> Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server3), qos(Ch2, 10, false), + ok = queue_utils:wait_for_local_stream_member(Server3, <<"/">>, Q, Config), subscribe(Ch2, Q, false, 0), receive_batch(Ch2, 0, 99), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).