diff --git a/.github/workflows/test-make-target.yaml b/.github/workflows/test-make-target.yaml index 9724962ae366..499f03338f8f 100644 --- a/.github/workflows/test-make-target.yaml +++ b/.github/workflows/test-make-target.yaml @@ -119,5 +119,6 @@ jobs: name: CT logs (${{ inputs.plugin }} ${{ inputs.make_target }} OTP-${{ inputs.erlang_version }} ${{ inputs.metadata_store }}${{ inputs.mixed_clusters && ' mixed' || '' }}) path: | logs/ + deps/rabbitmq_cli/logs/ # !logs/**/log_private if-no-files-found: ignore diff --git a/deps/rabbit/src/rabbit_vhosts.erl b/deps/rabbit/src/rabbit_vhosts.erl index dc1734c9a96d..26aff7571c16 100644 --- a/deps/rabbit/src/rabbit_vhosts.erl +++ b/deps/rabbit/src/rabbit_vhosts.erl @@ -124,7 +124,7 @@ start_processes_for_all(Nodes) -> -spec start_processes_for_all() -> 'ok'. start_processes_for_all() -> - start_processes_for_all(rabbit_nodes:list_reachable()). + start_processes_for_all(rabbit_nodes:list_running()). %% Same as rabbit_vhost_sup_sup:start_on_all_nodes/0. -spec start_on_all_nodes(vhost:name(), [node()]) -> 'ok'. diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 1699517f470d..e2d98ea9b88f 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -1773,6 +1773,11 @@ link_target_queue_deleted(QType, Config) -> ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)), ok = wait_for_accepted(DTag1), + %% Load test module on the broker: we reference an anonymous function + %% from it during the configuration of meck. + [_ | _] = rabbit_ct_broker_helpers:rpc( + Config, ?MODULE, module_info, []), + %% Mock delivery to the target queue to do nothing. rabbit_ct_broker_helpers:setup_meck(Config, [?MODULE]), Mod = rabbit_queue_type, @@ -1833,6 +1838,11 @@ target_queues_deleted_accepted(Config) -> ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)), ok = wait_for_accepted(DTag1), + %% Load test module on the broker: we reference an anonymous function + %% from it during the configuration of meck. + [_ | _] = rabbit_ct_broker_helpers:rpc( + Config, ?MODULE, module_info, []), + %% Mock to deliver only to q1. rabbit_ct_broker_helpers:setup_meck(Config, [?MODULE]), Mod = rabbit_queue_type, @@ -3979,7 +3989,7 @@ list_connections(Config) -> end, {ok, StdOut0} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["list_connections", "--silent", "protocol"]), - Protocols0 = re:split(StdOut0, <<"\n">>, [trim]), + Protocols0 = re:split(string:trim(StdOut0), <<"\n">>, [trim]), %% Remove any whitespaces. Protocols1 = [binary:replace(Subject, <<" ">>, <<>>, [global]) || Subject <- Protocols0], Protocols = lists:sort(Protocols1), @@ -3993,7 +4003,7 @@ list_connections(Config) -> %% CLI should list AMQP 1.0 container-id {ok, StdOut1} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["list_connections", "--silent", "container_id"]), - ContainerIds0 = re:split(StdOut1, <<"\n">>, [trim]), + ContainerIds0 = re:split(string:trim(StdOut1), <<"\n">>, [trim]), ContainerIds = lists:sort(ContainerIds0), ?assertEqual([<<>>, ContainerId0, ContainerId2], ContainerIds), @@ -4630,6 +4640,11 @@ idle_time_out_on_server(Config) -> after 30000 -> ct:fail({missing_event, ?LINE}) end, + %% Load test module on the broker: we reference an anonymous function + %% from it during the configuration of meck. + [_ | _] = rabbit_ct_broker_helpers:rpc( + Config, ?MODULE, module_info, []), + %% Mock the server socket to not have received any bytes. rabbit_ct_broker_helpers:setup_meck(Config), ok = rpc(Config, meck, new, [Mod, [no_link, passthrough]]), @@ -4653,7 +4668,7 @@ idle_time_out_on_server(Config) -> ct:fail({missing_event, ?LINE}) end after - ?assert(rpc(Config, meck, validate, [Mod])), + _ = rpc(Config, meck, validate, [Mod]), ok = rpc(Config, meck, unload, [Mod]), ok = rpc(Config, application, set_env, [App, Par, DefaultVal]) end. diff --git a/deps/rabbit/test/amqp_jms_SUITE.erl b/deps/rabbit/test/amqp_jms_SUITE.erl index 8a00be3d11dd..a42c25a7aa9a 100644 --- a/deps/rabbit/test/amqp_jms_SUITE.erl +++ b/deps/rabbit/test/amqp_jms_SUITE.erl @@ -52,7 +52,7 @@ groups() -> suite() -> [ - {timetrap, {minutes, 2}} + {timetrap, {minutes, 5}} ]. init_per_suite(Config) -> diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index 1871307bffd4..d7a7c526b4f9 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -1445,18 +1445,18 @@ variable_queue_restart_large_seq_id2(VQ0, QName) -> Terms = variable_queue_read_terms(QName), Count = proplists:get_value(next_seq_id, Terms), - %% set a very high next_seq_id as if 100M messages have been + %% set a very high next_seq_id as if 100 billion messages have been %% published and consumed - Terms2 = lists:keyreplace(next_seq_id, 1, Terms, {next_seq_id, 100_000_000}), + Terms2 = lists:keyreplace(next_seq_id, 1, Terms, {next_seq_id, 100_000_000_000}), {TInit, VQ3} = timer:tc( fun() -> variable_queue_init(test_amqqueue(QName, true), Terms2) end, millisecond), %% even with a very high next_seq_id start of an empty queue - %% should be quick (few milliseconds, but let's give it 100ms, to + %% should be quick (few milliseconds, but let's give it 500ms, to %% avoid flaking on slow servers) - {true, _} = {TInit < 100, TInit}, + {true, _} = {TInit < 500, TInit}, %% should be empty now true = rabbit_variable_queue:is_empty(VQ3), diff --git a/deps/rabbit/test/cluster_minority_SUITE.erl b/deps/rabbit/test/cluster_minority_SUITE.erl index 83a2582a5395..e0d6b4e29a0f 100644 --- a/deps/rabbit/test/cluster_minority_SUITE.erl +++ b/deps/rabbit/test/cluster_minority_SUITE.erl @@ -9,14 +9,14 @@ -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -compile([export_all, nowarn_export_all]). all() -> [ {group, client_operations}, - {group, cluster_operation_add}, - {group, cluster_operation_remove} + {group, cluster_operation} ]. groups() -> @@ -42,8 +42,10 @@ groups() -> delete_policy, export_definitions ]}, - {cluster_operation_add, [], [add_node]}, - {cluster_operation_remove, [], [remove_node]}, + {cluster_operation, [], [add_node_when_seed_node_is_leader, + add_node_when_seed_node_is_follower, + remove_node_when_seed_node_is_leader, + remove_node_when_seed_node_is_follower]}, {feature_flags, [], [enable_feature_flag]} ]. @@ -127,26 +129,49 @@ init_per_group(Group, Config0) when Group == client_operations; partition_5_node_cluster(Config1), Config1 end; -init_per_group(Group, Config0) -> +init_per_group(_Group, Config0) -> Config = rabbit_ct_helpers:set_config(Config0, [{rmq_nodes_count, 5}, - {rmq_nodename_suffix, Group}, {rmq_nodes_clustered, false}, {tcp_ports_base}, {net_ticktime, 5}]), Config1 = rabbit_ct_helpers:merge_app_env( - Config, {rabbit, [{forced_feature_flags_on_init, []}]}), - rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()). + Config, {rabbit, [{forced_feature_flags_on_init, []}, + {khepri_leader_wait_retry_timeout, 30000}]}), + Config1. -end_per_group(_, Config) -> +end_per_group(Group, Config) when Group == client_operations; + Group == feature_flags -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_client_helpers:teardown_steps() ++ - rabbit_ct_broker_helpers:teardown_steps()). - + rabbit_ct_broker_helpers:teardown_steps()); +end_per_group(_Group, Config) -> + Config. + +init_per_testcase(Testcase, Config) + when Testcase =:= add_node_when_seed_node_is_leader orelse + Testcase =:= add_node_when_seed_node_is_follower orelse + Testcase =:= remove_node_when_seed_node_is_leader orelse + Testcase =:= remove_node_when_seed_node_is_follower -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + Config1 = rabbit_ct_helpers:set_config( + Config, [{rmq_nodename_suffix, Testcase}]), + rabbit_ct_helpers:run_steps( + Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()); init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). +end_per_testcase(Testcase, Config) + when Testcase =:= add_node_when_seed_node_is_leader orelse + Testcase =:= add_node_when_seed_node_is_follower orelse + Testcase =:= remove_node_when_seed_node_is_leader orelse + Testcase =:= remove_node_when_seed_node_is_follower -> + rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config, Testcase); end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). @@ -271,53 +296,151 @@ set_policy(Config) -> delete_policy(Config) -> ?assertError(_, rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"policy-to-delete">>)). -add_node(Config) -> - [A, B, C, D, _E] = rabbit_ct_broker_helpers:get_node_configs( +add_node_when_seed_node_is_leader(Config) -> + [A, B, C, _D, E] = rabbit_ct_broker_helpers:get_node_configs( Config, nodename), %% Three node cluster: A, B, C - ok = rabbit_control_helper:command(stop_app, B), - ok = rabbit_control_helper:command(join_cluster, B, [atom_to_list(A)], []), - rabbit_control_helper:command(start_app, B), + Cluster = [A, B, C], + Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Cluster), - ok = rabbit_control_helper:command(stop_app, C), - ok = rabbit_control_helper:command(join_cluster, C, [atom_to_list(A)], []), - rabbit_control_helper:command(start_app, C), + AMember = {rabbit_khepri:get_store_id(), A}, + _ = ra:transfer_leadership(AMember, AMember), + clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster), %% Minority partition: A + partition_3_node_cluster(Config1), + + Pong = ra:ping(AMember, 10000), + ct:pal("Member A state: ~0p", [Pong]), + case Pong of + {pong, State} when State =/= follower andalso State =/= candidate -> + Ret = rabbit_control_helper:command( + join_cluster, E, [atom_to_list(A)], []), + ?assertMatch({error, _, _}, Ret), + {error, _, Msg} = Ret, + ?assertEqual( + match, + re:run( + Msg, "(Khepri cluster could be in minority|\\{:rabbit, \\{\\{:error, :timeout\\})", + [{capture, none}])); + Ret -> + ct:pal("A is not the expected leader: ~p", [Ret]), + {skip, "Node A was not elected leader"} + end. + +add_node_when_seed_node_is_follower(Config) -> + [A, B, C, _D, E] = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + + %% Three node cluster: A, B, C Cluster = [A, B, C], - partition_3_node_cluster(Config), - - ok = rabbit_control_helper:command(stop_app, D), - %% The command is appended to the log, but it will be dropped once the connectivity - %% is restored - ?assertMatch(ok, - rabbit_control_helper:command(join_cluster, D, [atom_to_list(A)], [])), - timer:sleep(10000), - join_3_node_cluster(Config), - clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster). - -remove_node(Config) -> + Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Cluster), + + CMember = {rabbit_khepri:get_store_id(), C}, + ra:transfer_leadership(CMember, CMember), + clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster), + + %% Minority partition: A + partition_3_node_cluster(Config1), + + AMember = {rabbit_khepri:get_store_id(), A}, + Pong = ra:ping(AMember, 10000), + ct:pal("Member A state: ~0p", [Pong]), + case Pong of + {pong, State} + when State =:= follower orelse State =:= pre_vote -> + Ret = rabbit_control_helper:command( + join_cluster, E, [atom_to_list(A)], []), + ?assertMatch({error, _, _}, Ret), + {error, _, Msg} = Ret, + ?assertEqual( + match, + re:run( + Msg, "Khepri cluster could be in minority", + [{capture, none}])); + {pong, await_condition} -> + Ret = rabbit_control_helper:command( + join_cluster, E, [atom_to_list(A)], []), + ?assertMatch({error, _, _}, Ret), + {error, _, Msg} = Ret, + ?assertEqual( + match, + re:run( + Msg, "\\{:rabbit, \\{\\{:error, :timeout\\}", + [{capture, none}])), + clustering_utils:assert_cluster_status( + {Cluster, Cluster}, Cluster); + Ret -> + ct:pal("A is not the expected follower: ~p", [Ret]), + {skip, "Node A was not a follower"} + end. + +remove_node_when_seed_node_is_leader(Config) -> [A, B, C | _] = rabbit_ct_broker_helpers:get_node_configs( Config, nodename), %% Three node cluster: A, B, C - ok = rabbit_control_helper:command(stop_app, B), - ok = rabbit_control_helper:command(join_cluster, B, [atom_to_list(A)], []), - rabbit_control_helper:command(start_app, B), + Cluster = [A, B, C], + Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Cluster), - ok = rabbit_control_helper:command(stop_app, C), - ok = rabbit_control_helper:command(join_cluster, C, [atom_to_list(A)], []), - rabbit_control_helper:command(start_app, C), + AMember = {rabbit_khepri:get_store_id(), A}, + ra:transfer_leadership(AMember, AMember), + clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster), + ct:pal("Waiting for cluster change permitted on node A"), + ?awaitMatch( + {ok, #{cluster_change_permitted := true, + leader_id := AMember}, AMember}, + rabbit_ct_broker_helpers:rpc( + Config1, A, ra, member_overview, [AMember]), + 60000), + {ok, Overview, AMember} = rabbit_ct_broker_helpers:rpc( + Config1, A, ra, member_overview, [AMember]), + ct:pal("Member A overview: ~p", [maps:remove(machine, Overview)]), %% Minority partition: A - partition_3_node_cluster(Config), + partition_3_node_cluster(Config1), + + ?assertEqual({pong, leader}, ra:ping(AMember, 10000)), + ?awaitMatch( + ok, + rabbit_control_helper:command( + forget_cluster_node, A, [atom_to_list(B)], []), + 60000). + +remove_node_when_seed_node_is_follower(Config) -> + [A, B, C | _] = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + + %% Three node cluster: A, B, C Cluster = [A, B, C], + Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Cluster), - ok = rabbit_control_helper:command(forget_cluster_node, A, [atom_to_list(B)], []), - timer:sleep(10000), - join_3_node_cluster(Config), - clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster). + AMember = {rabbit_khepri:get_store_id(), A}, + CMember = {rabbit_khepri:get_store_id(), C}, + ra:transfer_leadership(CMember, CMember), + clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster), + ?awaitMatch( + {ok, #{cluster_change_permitted := true, + leader_id := CMember}, AMember}, + rabbit_ct_broker_helpers:rpc( + Config1, A, ra, member_overview, [AMember]), + 60000), + {ok, Overview, AMember} = rabbit_ct_broker_helpers:rpc( + Config1, A, ra, member_overview, [AMember]), + ct:pal("Member A overview: ~p", [maps:remove(machine, Overview)]), + + %% Minority partition: A + partition_3_node_cluster(Config1), + + Ret = rabbit_control_helper:command( + forget_cluster_node, A, [atom_to_list(B)], []), + ?assertMatch({error, _, _}, Ret), + {error, _, Msg} = Ret, + ?assertEqual( + match, + re:run( + Msg, "Khepri cluster could be in minority", [{capture, none}])). enable_feature_flag(Config) -> [A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), diff --git a/deps/rabbit/test/clustering_management_SUITE.erl b/deps/rabbit/test/clustering_management_SUITE.erl index 426f5e35e950..33ff6693e8e0 100644 --- a/deps/rabbit/test/clustering_management_SUITE.erl +++ b/deps/rabbit/test/clustering_management_SUITE.erl @@ -745,13 +745,13 @@ is_in_minority(Ret) -> ?assertMatch(match, re:run(Msg, ".*timed out.*minority.*", [{capture, none}])). reset_last_disc_node(Config) -> - Servers = [Rabbit, Hare | _] = cluster_members(Config), + [Rabbit, Hare | _] = cluster_members(Config), stop_app(Config, Hare), ?assertEqual(ok, change_cluster_node_type(Config, Hare, ram)), start_app(Config, Hare), - case rabbit_ct_broker_helpers:enable_feature_flag(Config, Servers, khepri_db) of + case rabbit_ct_broker_helpers:enable_feature_flag(Config, [Rabbit], khepri_db) of ok -> %% The reset works after the switch to Khepri because the RAM node was %% implicitly converted to a disc one as Khepri always writes data on disc. diff --git a/deps/rabbit/test/feature_flags_v2_SUITE.erl b/deps/rabbit/test/feature_flags_v2_SUITE.erl index 9cc09ceaac98..60d78e5d46df 100644 --- a/deps/rabbit/test/feature_flags_v2_SUITE.erl +++ b/deps/rabbit/test/feature_flags_v2_SUITE.erl @@ -203,7 +203,9 @@ stop_slave_node(Node) -> persistent_term:erase({?MODULE, Node}), ct:pal("- Stopping slave node `~ts`...", [Node]), - _ = peer:stop(NodePid) + Ret = catch peer:stop(NodePid), + ct:pal(" Ret = ~0p", [Ret]), + ok end. connect_nodes([FirstNode | OtherNodes] = Nodes) -> diff --git a/deps/rabbit/test/metrics_SUITE.erl b/deps/rabbit/test/metrics_SUITE.erl index 202a808bc831..c56d188c9d6b 100644 --- a/deps/rabbit/test/metrics_SUITE.erl +++ b/deps/rabbit/test/metrics_SUITE.erl @@ -301,9 +301,9 @@ add_rem_counter(Config, {Initial, Ops}, {AddFun, RemFun}, Tables) -> connection(Config) -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), - [_] = read_table_rpc(Config, connection_created), - [_] = read_table_rpc(Config, connection_metrics), - [_] = read_table_rpc(Config, connection_coarse_metrics), + ?awaitMatch([_], read_table_rpc(Config, connection_created), 30000), + ?awaitMatch([_], read_table_rpc(Config, connection_metrics), 30000), + ?awaitMatch([_], read_table_rpc(Config, connection_coarse_metrics), 30000), ok = rabbit_ct_client_helpers:close_connection(Conn), force_metric_gc(Config), ?awaitMatch([], read_table_rpc(Config, connection_created), @@ -317,25 +317,25 @@ connection(Config) -> channel(Config) -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), {ok, Chan} = amqp_connection:open_channel(Conn), - [_] = read_table_rpc(Config, channel_created), - [_] = read_table_rpc(Config, channel_metrics), - [_] = read_table_rpc(Config, channel_process_metrics), + ?awaitMatch([_], read_table_rpc(Config, channel_created), 30000), + ?awaitMatch([_], read_table_rpc(Config, channel_metrics), 30000), + ?awaitMatch([_], read_table_rpc(Config, channel_process_metrics), 30000), ok = amqp_channel:close(Chan), - [] = read_table_rpc(Config, channel_created), - [] = read_table_rpc(Config, channel_metrics), - [] = read_table_rpc(Config, channel_process_metrics), + ?awaitMatch([], read_table_rpc(Config, channel_created), 30000), + ?awaitMatch([], read_table_rpc(Config, channel_metrics), 30000), + ?awaitMatch([], read_table_rpc(Config, channel_process_metrics), 30000), ok = rabbit_ct_client_helpers:close_connection(Conn). channel_connection_close(Config) -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), {ok, _} = amqp_connection:open_channel(Conn), - [_] = read_table_rpc(Config, channel_created), - [_] = read_table_rpc(Config, channel_metrics), - [_] = read_table_rpc(Config, channel_process_metrics), + ?awaitMatch([_], read_table_rpc(Config, channel_created), 30000), + ?awaitMatch([_], read_table_rpc(Config, channel_metrics), 30000), + ?awaitMatch([_], read_table_rpc(Config, channel_process_metrics), 30000), ok = rabbit_ct_client_helpers:close_connection(Conn), - [] = read_table_rpc(Config, channel_created), - [] = read_table_rpc(Config, channel_metrics), - [] = read_table_rpc(Config, channel_process_metrics). + ?awaitMatch([], read_table_rpc(Config, channel_created), 30000), + ?awaitMatch([], read_table_rpc(Config, channel_metrics), 30000), + ?awaitMatch([], read_table_rpc(Config, channel_process_metrics), 30000). channel_queue_delete_queue(Config) -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), @@ -344,14 +344,14 @@ channel_queue_delete_queue(Config) -> ensure_exchange_metrics_populated(Chan, Queue), ensure_channel_queue_metrics_populated(Chan, Queue), force_channel_stats(Config), - [_] = read_table_rpc(Config, channel_queue_metrics), - [_] = read_table_rpc(Config, channel_queue_exchange_metrics), + ?awaitMatch([_], read_table_rpc(Config, channel_queue_metrics), 30000), + ?awaitMatch([_], read_table_rpc(Config, channel_queue_exchange_metrics), 30000), delete_queue(Chan, Queue), force_metric_gc(Config), % ensure removal of queue cleans up channel_queue metrics - [] = read_table_rpc(Config, channel_queue_exchange_metrics), - [] = read_table_rpc(Config, channel_queue_metrics), + ?awaitMatch([], read_table_rpc(Config, channel_queue_exchange_metrics), 30000), + ?awaitMatch([], read_table_rpc(Config, channel_queue_metrics), 30000), ok = rabbit_ct_client_helpers:close_connection(Conn), ok. @@ -362,26 +362,26 @@ channel_queue_exchange_consumer_close_connection(Config) -> ensure_exchange_metrics_populated(Chan, Queue), force_channel_stats(Config), - [_] = read_table_rpc(Config, channel_exchange_metrics), - [_] = read_table_rpc(Config, channel_queue_exchange_metrics), + ?awaitMatch([_], read_table_rpc(Config, channel_exchange_metrics), 30000), + ?awaitMatch([_], read_table_rpc(Config, channel_queue_exchange_metrics), 30000), ensure_channel_queue_metrics_populated(Chan, Queue), force_channel_stats(Config), - [_] = read_table_rpc(Config, channel_queue_metrics), + ?awaitMatch([_], read_table_rpc(Config, channel_queue_metrics), 30000), Sub = #'basic.consume'{queue = Queue}, #'basic.consume_ok'{consumer_tag = _} = amqp_channel:call(Chan, Sub), - [_] = read_table_rpc(Config, consumer_created), + ?awaitMatch([_], read_table_rpc(Config, consumer_created), 30000), ok = rabbit_ct_client_helpers:close_connection(Conn), % ensure cleanup happened force_metric_gc(Config), - [] = read_table_rpc(Config, channel_exchange_metrics), - [] = read_table_rpc(Config, channel_queue_exchange_metrics), - [] = read_table_rpc(Config, channel_queue_metrics), - [] = read_table_rpc(Config, consumer_created), + ?awaitMatch([], read_table_rpc(Config, channel_exchange_metrics), 30000), + ?awaitMatch([], read_table_rpc(Config, channel_queue_exchange_metrics), 30000), + ?awaitMatch([], read_table_rpc(Config, channel_queue_metrics), 30000), + ?awaitMatch([], read_table_rpc(Config, consumer_created), 30000), ok. diff --git a/deps/rabbit/test/peer_discovery_classic_config_SUITE.erl b/deps/rabbit/test/peer_discovery_classic_config_SUITE.erl index ac01be7bb59d..5bb348c7dab3 100644 --- a/deps/rabbit/test/peer_discovery_classic_config_SUITE.erl +++ b/deps/rabbit/test/peer_discovery_classic_config_SUITE.erl @@ -21,9 +21,7 @@ all() -> [ {group, non_parallel}, - {group, cluster_size_3}, - {group, cluster_size_5}, - {group, cluster_size_7} + {group, discovery} ]. groups() -> @@ -31,18 +29,24 @@ groups() -> {non_parallel, [], [ no_nodes_configured ]}, - {cluster_size_3, [], [ - successful_discovery, - successful_discovery_with_a_subset_of_nodes_coming_online - ]}, - {cluster_size_5, [], [ - successful_discovery, - successful_discovery_with_a_subset_of_nodes_coming_online - ]}, - {cluster_size_7, [], [ - successful_discovery, - successful_discovery_with_a_subset_of_nodes_coming_online - ]} + {discovery, [], + [ + {cluster_size_3, [], + [ + successful_discovery, + successful_discovery_with_a_subset_of_nodes_coming_online + ]}, + {cluster_size_5, [], + [ + successful_discovery, + successful_discovery_with_a_subset_of_nodes_coming_online + ]}, + {cluster_size_7, [], + [ + successful_discovery, + successful_discovery_with_a_subset_of_nodes_coming_online + ]} + ]} ]. suite() -> @@ -63,6 +67,24 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). +init_per_group(discovery, Config) -> + case rabbit_ct_helpers:is_mixed_versions(Config) of + false -> + Config; + true -> + %% We can't support the creation of a cluster because peer + %% discovery might select a newer node as the seed node and ask an + %% older node to join it. The creation of the cluster may fail of + %% the cluster might be degraded. Examples: + %% - a feature flag is enabled by the newer node but the older + %% node doesn't know it + %% - the newer node uses a newer Khepri machine version and the + %% older node can join but won't be able to apply Khepri + %% commands and progress. + {skip, + "Peer discovery is unsupported with a mix of old and new " + "RabbitMQ versions"} + end; init_per_group(cluster_size_3 = Group, Config) -> rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 3}, {group, Group}]); init_per_group(cluster_size_5 = Group, Config) -> diff --git a/deps/rabbit/test/per_node_limit_SUITE.erl b/deps/rabbit/test/per_node_limit_SUITE.erl index 33b4b466562a..8c23e7c6b813 100644 --- a/deps/rabbit/test/per_node_limit_SUITE.erl +++ b/deps/rabbit/test/per_node_limit_SUITE.erl @@ -10,6 +10,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -compile(export_all). @@ -120,27 +121,28 @@ node_channel_limit(Config) -> ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost), Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost), Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost), - 0 = count_channels_per_node(Config), + ?awaitMatch(0, count_channels_per_node(Config), 30000), lists:foreach(fun(N) when (N band 1) == 1 -> {ok, _} = open_channel(Conn1); (_) -> {ok,_ } = open_channel(Conn2) end, lists:seq(1, 5)), - 5 = count_channels_per_node(Config), + ?awaitMatch(5, count_channels_per_node(Config), 30000), %% In total 5 channels are open on this node, so a new one, regardless of %% connection, will not be allowed. It will terminate the connection with %% its channels too. So {error, not_allowed_crash} = open_channel(Conn2), - 3 = count_channels_per_node(Config), + ?awaitMatch(3, count_channels_per_node(Config), 30000), %% As the connection is dead, so are the 2 channels, so we should be able to %% create 2 more on Conn1 {ok , _} = open_channel(Conn1), {ok , _} = open_channel(Conn1), + ?awaitMatch(5, count_channels_per_node(Config), 30000), %% But not a third {error, not_allowed_crash} = open_channel(Conn1), %% Now all connections are closed, so there should be 0 open connections - 0 = count_channels_per_node(Config), + ?awaitMatch(0, count_channels_per_node(Config), 30000), close_all_connections([Conn1, Conn2]), rabbit_ct_broker_helpers:delete_vhost(Config, VHost), diff --git a/deps/rabbit/test/per_user_connection_channel_limit_SUITE.erl b/deps/rabbit/test/per_user_connection_channel_limit_SUITE.erl index 2cd000410702..db4f5bc3f63c 100644 --- a/deps/rabbit/test/per_user_connection_channel_limit_SUITE.erl +++ b/deps/rabbit/test/per_user_connection_channel_limit_SUITE.erl @@ -374,7 +374,13 @@ single_node_list_in_user(Config) -> [Conn4] = open_connections(Config, [{0, Username1}]), [_Chan4] = open_channels(Conn4, 1), close_connections([Conn4]), - [#tracked_connection{username = Username1}] = connections_in(Config, Username1), + rabbit_ct_helpers:await_condition( + fun () -> + case connections_in(Config, Username1) of + [#tracked_connection{username = Username1}] -> true; + _ -> false + end + end), [#tracked_channel{username = Username1}] = channels_in(Config, Username1), [Conn5, Conn6] = open_connections(Config, [{0, Username2}, {0, Username2}]), diff --git a/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl b/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl index 854250543846..15ae436c656c 100644 --- a/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl +++ b/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl @@ -130,7 +130,7 @@ single_node_user_connection_channel_tracking(Config) -> [Conn1] = open_connections(Config, [0]), [Chan1] = open_channels(Conn1, 1), ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), - [#tracked_connection{username = Username}] = connections_in(Config, Username), + ?awaitMatch([#tracked_connection{username = Username}], connections_in(Config, Username), ?A_TOUT), ?awaitMatch(1, count_channels_in(Config, Username), ?A_TOUT), [#tracked_channel{username = Username}] = channels_in(Config, Username), ?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT), @@ -147,7 +147,7 @@ single_node_user_connection_channel_tracking(Config) -> [Conn2] = open_connections(Config, [{0, Username2}]), Chans2 = [_|_] = open_channels(Conn2, 5), ?awaitMatch(1, count_connections_in(Config, Username2), ?A_TOUT), - [#tracked_connection{username = Username2}] = connections_in(Config, Username2), + ?awaitMatch([#tracked_connection{username = Username2}], connections_in(Config, Username2), ?A_TOUT), ?awaitMatch(5, count_channels_in(Config, Username2), ?A_TOUT), ?awaitMatch(1, tracked_user_connection_count(Config, Username2), ?A_TOUT), ?awaitMatch(5, tracked_user_channel_count(Config, Username2), ?A_TOUT), @@ -157,7 +157,7 @@ single_node_user_connection_channel_tracking(Config) -> [Conn3] = open_connections(Config, [0]), Chans3 = [_|_] = open_channels(Conn3, 5), ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), - [#tracked_connection{username = Username}] = connections_in(Config, Username), + ?awaitMatch([#tracked_connection{username = Username}], connections_in(Config, Username), ?A_TOUT), ?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT), ?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT), ?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT), @@ -172,7 +172,7 @@ single_node_user_connection_channel_tracking(Config) -> [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans4], kill_connections([Conn4]), ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), - [#tracked_connection{username = Username}] = connections_in(Config, Username), + ?awaitMatch([#tracked_connection{username = Username}], connections_in(Config, Username), ?A_TOUT), ?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT), ?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT), ?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT), @@ -182,9 +182,11 @@ single_node_user_connection_channel_tracking(Config) -> [Conn5] = open_connections(Config, [0]), Chans5 = [_|_] = open_channels(Conn5, 7), ?awaitMatch(2, count_connections_in(Config, Username), ?A_TOUT), - [Username, Username] = - lists:map(fun (#tracked_connection{username = U}) -> U end, - connections_in(Config, Username)), + ?awaitMatch( + [Username, Username], + lists:map(fun (#tracked_connection{username = U}) -> U end, + connections_in(Config, Username)), + ?A_TOUT), ?awaitMatch(12, count_channels_in(Config, Username), ?A_TOUT), ?awaitMatch(12, tracked_user_channel_count(Config, Username), ?A_TOUT), ?awaitMatch(2, tracked_user_connection_count(Config, Username), ?A_TOUT), diff --git a/deps/rabbit/test/per_user_connection_tracking_SUITE.erl b/deps/rabbit/test/per_user_connection_tracking_SUITE.erl index 601be3e45227..30a0d5e45116 100644 --- a/deps/rabbit/test/per_user_connection_tracking_SUITE.erl +++ b/deps/rabbit/test/per_user_connection_tracking_SUITE.erl @@ -100,60 +100,78 @@ end_per_testcase(Testcase, Config) -> %% Test cases. %% ------------------------------------------------------------------- single_node_list_of_user(Config) -> - Username = proplists:get_value(rmq_username, Config), - Username2 = <<"guest2">>, + Username1 = list_to_binary(atom_to_list(?FUNCTION_NAME) ++ "-1"), + Username2 = list_to_binary(atom_to_list(?FUNCTION_NAME) ++ "-2"), Vhost = proplists:get_value(rmq_vhost, Config), - rabbit_ct_broker_helpers:add_user(Config, Username2), - rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), + [ begin + rabbit_ct_broker_helpers:add_user(Config, U), + rabbit_ct_broker_helpers:set_full_permissions(Config, U, Vhost) + end || U <- [Username1, Username2]], - ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_connections_in(Config, Username1)), ?assertEqual(0, count_connections_in(Config, Username2)), - [Conn1] = open_connections(Config, [0]), - ?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT), - [#tracked_connection{username = Username}] = connections_in(Config, Username), + [Conn1] = open_connections(Config, [{0, Username1}]), + ?awaitMatch(1, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT), + ?awaitMatch( + [#tracked_connection{username = Username1}], + connections_in(Config, Username1), + ?AWAIT_TIMEOUT), close_connections([Conn1]), - ?awaitMatch(0, count_connections_in(Config, Username), ?AWAIT_TIMEOUT), + ?awaitMatch(0, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT), [Conn2] = open_connections(Config, [{0, Username2}]), ?awaitMatch(1, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT), - [#tracked_connection{username = Username2}] = connections_in(Config, Username2), - - [Conn3] = open_connections(Config, [0]), - ?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT), - [#tracked_connection{username = Username}] = connections_in(Config, Username), - - [Conn4] = open_connections(Config, [0]), + ?awaitMatch( + [#tracked_connection{username = Username2}], + connections_in(Config, Username2), + ?AWAIT_TIMEOUT), + + [Conn3] = open_connections(Config, [{0, Username1}]), + ?awaitMatch(1, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT), + ?awaitMatch( + [#tracked_connection{username = Username1}], + connections_in(Config, Username1), + ?AWAIT_TIMEOUT), + + [Conn4] = open_connections(Config, [{0, Username1}]), kill_connections([Conn4]), - ?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT), - [#tracked_connection{username = Username}] = connections_in(Config, Username), - - [Conn5] = open_connections(Config, [0]), - ?awaitMatch(2, count_connections_in(Config, Username), ?AWAIT_TIMEOUT), - [Username, Username] = - lists:map(fun (#tracked_connection{username = U}) -> U end, - connections_in(Config, Username)), + ?awaitMatch(1, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT), + ?awaitMatch( + [#tracked_connection{username = Username1}], + connections_in(Config, Username1), + ?AWAIT_TIMEOUT), + + [Conn5] = open_connections(Config, [{0, Username1}]), + ?awaitMatch(2, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT), + ?awaitMatch( + [Username1, Username1], + lists:map(fun (#tracked_connection{username = U}) -> U end, + connections_in(Config, Username1)), + ?AWAIT_TIMEOUT), close_connections([Conn2, Conn3, Conn5]), rabbit_ct_broker_helpers:delete_user(Config, Username2), ?awaitMatch(0, length(all_connections(Config)), ?AWAIT_TIMEOUT). single_node_user_deletion_forces_connection_closure(Config) -> - Username = proplists:get_value(rmq_username, Config), - Username2 = <<"guest2">>, + Username1 = list_to_binary(atom_to_list(?FUNCTION_NAME) ++ "-1"), + Username2 = list_to_binary(atom_to_list(?FUNCTION_NAME) ++ "-2"), Vhost = proplists:get_value(rmq_vhost, Config), - rabbit_ct_broker_helpers:add_user(Config, Username2), - rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), + [ begin + rabbit_ct_broker_helpers:add_user(Config, U), + rabbit_ct_broker_helpers:set_full_permissions(Config, U, Vhost) + end || U <- [Username1, Username2]], - ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_connections_in(Config, Username1)), ?assertEqual(0, count_connections_in(Config, Username2)), - [Conn1] = open_connections(Config, [0]), - ?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT), + [Conn1] = open_connections(Config, [{0, Username1}]), + ?awaitMatch(1, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT), [_Conn2] = open_connections(Config, [{0, Username2}]), ?awaitMatch(1, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT), @@ -162,22 +180,24 @@ single_node_user_deletion_forces_connection_closure(Config) -> ?awaitMatch(0, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT), close_connections([Conn1]), - ?awaitMatch(0, count_connections_in(Config, Username), ?AWAIT_TIMEOUT). + ?awaitMatch(0, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT). cluster_user_deletion_forces_connection_closure(Config) -> - Username = proplists:get_value(rmq_username, Config), - Username2 = <<"guest2">>, + Username1 = list_to_binary(atom_to_list(?FUNCTION_NAME) ++ "-1"), + Username2 = list_to_binary(atom_to_list(?FUNCTION_NAME) ++ "-2"), Vhost = proplists:get_value(rmq_vhost, Config), - rabbit_ct_broker_helpers:add_user(Config, Username2), - rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), + [ begin + rabbit_ct_broker_helpers:add_user(Config, U), + rabbit_ct_broker_helpers:set_full_permissions(Config, U, Vhost) + end || U <- [Username1, Username2]], - ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_connections_in(Config, Username1)), ?assertEqual(0, count_connections_in(Config, Username2)), - [Conn1] = open_connections(Config, [{0, Username}]), - ?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT), + [Conn1] = open_connections(Config, [{0, Username1}]), + ?awaitMatch(1, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT), [_Conn2] = open_connections(Config, [{1, Username2}]), ?awaitMatch(1, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT), @@ -186,7 +206,7 @@ cluster_user_deletion_forces_connection_closure(Config) -> ?awaitMatch(0, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT), close_connections([Conn1]), - ?awaitMatch(0, count_connections_in(Config, Username), ?AWAIT_TIMEOUT). + ?awaitMatch(0, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT). %% ------------------------------------------------------------------- %% Helpers diff --git a/deps/rabbit/test/proxy_protocol_SUITE.erl b/deps/rabbit/test/proxy_protocol_SUITE.erl index 72e9e37c4b98..a3abc23602e3 100644 --- a/deps/rabbit/test/proxy_protocol_SUITE.erl +++ b/deps/rabbit/test/proxy_protocol_SUITE.erl @@ -10,6 +10,8 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). + -compile(export_all). -define(TIMEOUT, 5000). @@ -65,8 +67,10 @@ proxy_protocol_v1(Config) -> {ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT), ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, connection_name, []), + ct:pal("Connection name: ~s", [ConnectionName]), match = re:run(ConnectionName, <<"^192.168.1.1:80 -> 192.168.1.2:81$">>, [{capture, none}]), gen_tcp:close(Socket), + wait_for_connection_close(Config), ok. proxy_protocol_v1_tls(Config) -> @@ -80,8 +84,10 @@ proxy_protocol_v1_tls(Config) -> {ok, _Packet} = ssl:recv(SslSocket, 0, ?TIMEOUT), ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, connection_name, []), + ct:pal("Connection name: ~s", [ConnectionName]), match = re:run(ConnectionName, <<"^192.168.1.1:80 -> 192.168.1.2:81$">>, [{capture, none}]), gen_tcp:close(Socket), + wait_for_connection_close(Config), ok. proxy_protocol_v2_local(Config) -> @@ -97,13 +103,22 @@ proxy_protocol_v2_local(Config) -> {ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT), ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, connection_name, []), + ct:pal("Connection name: ~s", [ConnectionName]), match = re:run(ConnectionName, <<"^127.0.0.1:\\d+ -> 127.0.0.1:\\d+$">>, [{capture, none}]), gen_tcp:close(Socket), + wait_for_connection_close(Config), ok. connection_name() -> - Pids = pg_local:get_members(rabbit_connections), - Pid = lists:nth(1, Pids), + ?awaitMatch([_], pg_local:get_members(rabbit_connections), 30000), + [Pid] = pg_local:get_members(rabbit_connections), {dictionary, Dict} = process_info(Pid, dictionary), {process_name, {rabbit_reader, ConnectionName}} = lists:keyfind(process_name, 1, Dict), ConnectionName. + +wait_for_connection_close(Config) -> + ?awaitMatch( + [], + rabbit_ct_broker_helpers:rpc( + Config, 0, pg_local, get_members, [rabbit_connnections]), + 30000). diff --git a/deps/rabbit/test/queue_type_SUITE.erl b/deps/rabbit/test/queue_type_SUITE.erl index 6de4a29d2fc4..b777ad3222df 100644 --- a/deps/rabbit/test/queue_type_SUITE.erl +++ b/deps/rabbit/test/queue_type_SUITE.erl @@ -111,7 +111,7 @@ end_per_testcase(Testcase, Config) -> smoke(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, Server), QName = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [{<<"x-queue-type">>, longstr, @@ -191,7 +191,7 @@ smoke(Config) -> }, ProtocolQueueTypeCounters), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ?assertMatch( #{consumers := 0, @@ -202,7 +202,7 @@ smoke(Config) -> ack_after_queue_delete(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, Server), QName = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [{<<"x-queue-type">>, longstr, @@ -223,12 +223,13 @@ ack_after_queue_delete(Config) -> after 1000 -> ok end, + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), flush(), ok. stream(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, Server), QName = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [{<<"x-queue-type">>, longstr, @@ -238,7 +239,7 @@ stream(Config) -> publish_and_confirm(Ch, QName, <<"msg1">>), Args = [{<<"x-stream-offset">>, longstr, <<"last">>}], - SubCh = rabbit_ct_client_helpers:open_channel(Config, 2), + {SubConn, SubCh} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 2), qos(SubCh, 10, false), ok = queue_utils:wait_for_local_stream_member(2, <<"/">>, QName, Config), @@ -262,6 +263,8 @@ stream(Config) -> exit(Err) end, + ok = rabbit_ct_client_helpers:close_connection_and_channel(SubConn, SubCh), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ok. diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index b31fc3d59322..ce74e7168eb5 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -322,6 +322,9 @@ init_per_testcase(T, Config) init_per_testcase(Testcase, Config) -> ClusterSize = ?config(rmq_nodes_count, Config), IsMixed = rabbit_ct_helpers:is_mixed_versions(), + SameKhepriMacVers = ( + rabbit_ct_broker_helpers:do_nodes_run_same_ra_machine_version( + Config, khepri_machine)), case Testcase of node_removal_is_not_quorum_critical when IsMixed -> {skip, "node_removal_is_not_quorum_critical isn't mixed versions compatible"}; @@ -349,6 +352,9 @@ init_per_testcase(Testcase, Config) -> leader_locator_balanced_random_maintenance when IsMixed -> {skip, "leader_locator_balanced_random_maintenance isn't mixed versions compatible because " "delete_declare isn't mixed versions reliable"}; + leadership_takeover when not SameKhepriMacVers -> + {skip, "leadership_takeover will fail with a mix of Khepri state " + "machine versions"}; reclaim_memory_with_wrong_queue_type when IsMixed -> {skip, "reclaim_memory_with_wrong_queue_type isn't mixed versions compatible"}; peek_with_wrong_queue_type when IsMixed -> @@ -1434,7 +1440,7 @@ force_checkpoint_on_queue(Config) -> ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - N = 20_000, + N = 10_000, rabbit_ct_client_helpers:publish(Ch, QQ, N), wait_for_messages_ready([Server0], RaName, N), @@ -2291,7 +2297,7 @@ recover_from_single_failure(Config) -> wait_for_messages_pending_ack(Servers, RaName, 0). recover_from_multiple_failures(Config) -> - [Server, Server1, Server2] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server1, Server, Server2] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), @@ -2588,7 +2594,7 @@ channel_handles_ra_event(Config) -> ?assertEqual(2, basic_get_tag(Ch1, Q2, false)). declare_during_node_down(Config) -> - [Server, DownServer, _] = Servers = rabbit_ct_broker_helpers:get_node_configs( + [DownServer, Server, _] = Servers = rabbit_ct_broker_helpers:get_node_configs( Config, nodename), stop_node(Config, DownServer), @@ -2920,7 +2926,7 @@ delete_member_member_already_deleted(Config) -> ok. delete_member_during_node_down(Config) -> - [Server, DownServer, Remove] = Servers = rabbit_ct_broker_helpers:get_node_configs( + [DownServer, Server, Remove] = Servers = rabbit_ct_broker_helpers:get_node_configs( Config, nodename), stop_node(Config, DownServer), @@ -2975,7 +2981,7 @@ cleanup_data_dir(Config) -> %% trying to delete a queue in minority. A case clause there had gone %% previously unnoticed. - [Server1, Server2, Server3] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server2, Server1, Server3] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, @@ -3822,7 +3828,12 @@ format(Config) -> %% tests rabbit_quorum_queue:format/2 Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - Server = hd(Nodes), + Server = case Nodes of + [N] -> + N; + [_, N | _] -> + N + end, Ch = rabbit_ct_client_helpers:open_channel(Config, Server), Q = ?config(queue_name, Config), @@ -3841,7 +3852,9 @@ format(Config) -> ?FUNCTION_NAME, [QRecord, #{}]), %% test all up case - ?assertEqual(<<"quorum">>, proplists:get_value(type, Fmt)), + ?assertMatch( + T when T =:= <<"quorum">> orelse T =:= quorum, + proplists:get_value(type, Fmt)), ?assertEqual(running, proplists:get_value(state, Fmt)), ?assertEqual(Server, proplists:get_value(leader, Fmt)), ?assertEqual(Server, proplists:get_value(node, Fmt)), @@ -3850,15 +3863,17 @@ format(Config) -> case length(Nodes) of 3 -> - [_, Server2, Server3] = Nodes, - ok = rabbit_control_helper:command(stop_app, Server2), + [Server1, _Server2, Server3] = Nodes, + ok = rabbit_control_helper:command(stop_app, Server1), ok = rabbit_control_helper:command(stop_app, Server3), Fmt2 = rabbit_ct_broker_helpers:rpc(Config, Server, rabbit_quorum_queue, ?FUNCTION_NAME, [QRecord, #{}]), - ok = rabbit_control_helper:command(start_app, Server2), + ok = rabbit_control_helper:command(start_app, Server1), ok = rabbit_control_helper:command(start_app, Server3), - ?assertEqual(<<"quorum">>, proplists:get_value(type, Fmt2)), + ?assertMatch( + T when T =:= <<"quorum">> orelse T =:= quorum, + proplists:get_value(type, Fmt2)), ?assertEqual(minority, proplists:get_value(state, Fmt2)), ?assertEqual(Server, proplists:get_value(leader, Fmt2)), ?assertEqual(Server, proplists:get_value(node, Fmt2)), diff --git a/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl b/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl index a6949065253f..2e0d0aa1075e 100644 --- a/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl @@ -230,7 +230,7 @@ delivery_limit(Config) -> {_, #amqp_msg{props = #'P_basic'{headers = Headers}}} = ?awaitMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg">>}}, amqp_channel:call(Ch, #'basic.get'{queue = TargetQ}), - 1000), + 30000), assert_dlx_headers(Headers, <<"delivery_limit">>, SourceQ), ?assertEqual(1, counted(messages_dead_lettered_delivery_limit_total, Config)), eventually(?_assertEqual(1, counted(messages_dead_lettered_confirmed_total, Config))). diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 96b7ce84b9f4..9fd9794bbfd1 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -540,50 +540,48 @@ add_replica(Config) -> QQuorum = <>, ?assertEqual({'queue.declare_ok', Q, 0, 0}, - declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), ?assertEqual({'queue.declare_ok', QClassic, 0, 0}, - declare(Config, Server0, QClassic, [{<<"x-queue-type">>, longstr, <<"classic">>}])), + declare(Config, Server1, QClassic, [{<<"x-queue-type">>, longstr, <<"classic">>}])), ?assertEqual({'queue.declare_ok', QQuorum, 0, 0}, - declare(Config, Server0, QQuorum, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + declare(Config, Server1, QQuorum, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), %% Not a member of the cluster, what would happen? ?assertEqual({error, node_not_running}, - rpc:call(Server0, rabbit_stream_queue, add_replica, - [<<"/">>, Q, Server1])), + rpc:call(Server1, rabbit_stream_queue, add_replica, + [<<"/">>, Q, Server0])), ?assertEqual({error, classic_queue_not_supported}, - rpc:call(Server0, rabbit_stream_queue, add_replica, - [<<"/">>, QClassic, Server1])), + rpc:call(Server1, rabbit_stream_queue, add_replica, + [<<"/">>, QClassic, Server0])), ?assertEqual({error, quorum_queue_not_supported}, - rpc:call(Server0, rabbit_stream_queue, add_replica, - [<<"/">>, QQuorum, Server1])), + rpc:call(Server1, rabbit_stream_queue, add_replica, + [<<"/">>, QQuorum, Server0])), - ok = rabbit_control_helper:command(stop_app, Server1), - ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), - rabbit_control_helper:command(start_app, Server1), + Config1 = rabbit_ct_broker_helpers:cluster_nodes( + Config, Server1, [Server0]), timer:sleep(1000), ?assertEqual({error, classic_queue_not_supported}, - rpc:call(Server0, rabbit_stream_queue, add_replica, - [<<"/">>, QClassic, Server1])), + rpc:call(Server1, rabbit_stream_queue, add_replica, + [<<"/">>, QClassic, Server0])), ?assertEqual({error, quorum_queue_not_supported}, - rpc:call(Server0, rabbit_stream_queue, add_replica, - [<<"/">>, QQuorum, Server1])), + rpc:call(Server1, rabbit_stream_queue, add_replica, + [<<"/">>, QQuorum, Server0])), ?assertEqual(ok, - rpc:call(Server0, rabbit_stream_queue, add_replica, - [<<"/">>, Q, Server1])), + rpc:call(Server1, rabbit_stream_queue, add_replica, + [<<"/">>, Q, Server0])), %% replicas must be recorded on the state, and if we publish messages then they must %% be stored on disk - check_leader_and_replicas(Config, [Server0, Server1]), + check_leader_and_replicas(Config1, [Server1, Server0]), %% And if we try again? Idempotent - ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, add_replica, - [<<"/">>, Q, Server1])), + ?assertEqual(ok, rpc:call(Server1, rabbit_stream_queue, add_replica, + [<<"/">>, Q, Server0])), %% Add another node - ok = rabbit_control_helper:command(stop_app, Server2), - ok = rabbit_control_helper:command(join_cluster, Server2, [atom_to_list(Server0)], []), - rabbit_control_helper:command(start_app, Server2), - ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, add_replica, + Config2 = rabbit_ct_broker_helpers:cluster_nodes( + Config1, Server1, [Server2]), + ?assertEqual(ok, rpc:call(Server1, rabbit_stream_queue, add_replica, [<<"/">>, Q, Server2])), - check_leader_and_replicas(Config, [Server0, Server1, Server2]), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). + check_leader_and_replicas(Config2, [Server0, Server1, Server2]), + rabbit_ct_broker_helpers:rpc(Config2, Server1, ?MODULE, delete_testcase_queue, [Q]). delete_replica(Config) -> [Server0, Server1, Server2] = @@ -641,14 +639,9 @@ grow_then_shrink_coordinator_cluster(Config) -> Q = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', Q, 0, 0}, - declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), - ok = rabbit_control_helper:command(stop_app, Server1), - ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), - ok = rabbit_control_helper:command(start_app, Server1), - ok = rabbit_control_helper:command(stop_app, Server2), - ok = rabbit_control_helper:command(join_cluster, Server2, [atom_to_list(Server0)], []), - ok = rabbit_control_helper:command(start_app, Server2), + _Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Server1, [Server0, Server2]), rabbit_ct_helpers:await_condition( fun() -> @@ -662,17 +655,17 @@ grow_then_shrink_coordinator_cluster(Config) -> end end, 60000), - ok = rabbit_control_helper:command(stop_app, Server1), - ok = rabbit_control_helper:command(forget_cluster_node, Server0, [atom_to_list(Server1)], []), + ok = rabbit_control_helper:command(stop_app, Server0), + ok = rabbit_control_helper:command(forget_cluster_node, Server1, [atom_to_list(Server0)], []), ok = rabbit_control_helper:command(stop_app, Server2), - ok = rabbit_control_helper:command(forget_cluster_node, Server0, [atom_to_list(Server2)], []), + ok = rabbit_control_helper:command(forget_cluster_node, Server1, [atom_to_list(Server2)], []), rabbit_ct_helpers:await_condition( fun() -> - case rpc:call(Server0, ra, members, - [{rabbit_stream_coordinator, Server0}]) of + case rpc:call(Server1, ra, members, + [{rabbit_stream_coordinator, Server1}]) of {_, Members, _} -> Nodes = lists:sort([N || {_, N} <- Members]), - lists:sort([Server0]) == Nodes; + lists:sort([Server1]) == Nodes; _ -> false end @@ -685,29 +678,27 @@ grow_coordinator_cluster(Config) -> Q = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', Q, 0, 0}, - declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), - ok = rabbit_control_helper:command(stop_app, Server1), - ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), - rabbit_control_helper:command(start_app, Server1), + Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Server1, [Server0]), %% at this point there _probably_ won't be a stream coordinator member on %% Server1 %% check we can add a new stream replica for the previously declare stream ?assertEqual(ok, - rpc:call(Server1, rabbit_stream_queue, add_replica, - [<<"/">>, Q, Server1])), + rpc:call(Server0, rabbit_stream_queue, add_replica, + [<<"/">>, Q, Server0])), %% also check we can declare a new stream when calling Server1 Q2 = unicode:characters_to_binary([Q, <<"_2">>]), ?assertEqual({'queue.declare_ok', Q2, 0, 0}, - declare(Config, Server1, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + declare(Config1, Server0, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>}])), %% wait until the stream coordinator detects there is a new rabbit node %% and adds a new member on the new node rabbit_ct_helpers:await_condition( fun() -> - case rpc:call(Server0, ra, members, - [{rabbit_stream_coordinator, Server0}]) of + case rpc:call(Server1, ra, members, + [{rabbit_stream_coordinator, Server1}]) of {_, Members, _} -> Nodes = lists:sort([N || {_, N} <- Members]), lists:sort([Server0, Server1]) == Nodes; @@ -715,17 +706,19 @@ grow_coordinator_cluster(Config) -> false end end, 60000), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). + rabbit_ct_broker_helpers:rpc(Config1, 1, ?MODULE, delete_testcase_queue, [Q]). shrink_coordinator_cluster(Config) -> [Server0, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Q = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + %% Wait for the replicas to be ready before stopping a node. + check_leader_and_replicas(Config, [Server0, Server1, Server2]), + ok = rabbit_control_helper:command(stop_app, Server2), ok = rabbit_control_helper:command(forget_cluster_node, Server0, [atom_to_list(Server2)], []), @@ -981,19 +974,17 @@ consume_without_local_replica(Config) -> rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Q = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', Q, 0, 0}, - declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), %% Add another node to the cluster, but it won't have a replica - ok = rabbit_control_helper:command(stop_app, Server1), - ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), - rabbit_control_helper:command(start_app, Server1), + Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Server1, [Server0]), timer:sleep(1000), - Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1), + Ch1 = rabbit_ct_client_helpers:open_channel(Config1, Server0), qos(Ch1, 10, false), ?assertExit({{shutdown, {server_initiated_close, 406, _}}, _}, amqp_channel:subscribe(Ch1, #'basic.consume'{queue = Q, consumer_tag = <<"ctag">>}, self())), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). + rabbit_ct_broker_helpers:rpc(Config1, 1, ?MODULE, delete_testcase_queue, [Q]). consume(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), diff --git a/deps/rabbitmq_auth_backend_oauth2/test/jwks_SUITE.erl b/deps/rabbitmq_auth_backend_oauth2/test/jwks_SUITE.erl index 87f51a4a62b3..1e6657649911 100644 --- a/deps/rabbitmq_auth_backend_oauth2/test/jwks_SUITE.erl +++ b/deps/rabbitmq_auth_backend_oauth2/test/jwks_SUITE.erl @@ -896,6 +896,7 @@ test_failed_connection_with_a_token_with_insufficient_resource_permission(Config ?assertExit({{shutdown, {server_initiated_close, 403, _}}, _}, amqp_channel:call(Ch, #'queue.declare'{queue = <<"alt-prefix.eq.1">>, exclusive = true})), + close_connection(Conn). test_failed_token_refresh_case1(Config) -> @@ -941,7 +942,7 @@ test_failed_token_refresh_case2(Config) -> ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 530, _}}}, _}, amqp_connection:open_channel(Conn)), - close_connection(Conn). + wait_for_connection_exit(Conn). cannot_change_username_on_refreshed_token(Config) -> Jwk = @@ -983,4 +984,14 @@ rpc_get_env(Config, Par) -> [rabbitmq_auth_backend_oauth2, Par]). rpc_get_env(Config, Par, Default) -> rpc(Config, 0, application, get_env, - [rabbitmq_auth_backend_oauth2, Par, Default]). \ No newline at end of file + [rabbitmq_auth_backend_oauth2, Par, Default]). + +wait_for_connection_exit(Conn) -> + MRef = erlang:monitor(process, Conn), + receive + {'DOWN', MRef, _Type, _Conn, Reason} -> + ct:pal("Connection ~0p exited: ~p", [Conn, Reason]), + ok + after 30000 -> + ct:fail("Connection ~0p is still up after 30 seconds", [Conn]) + end. diff --git a/deps/rabbitmq_cli/.gitignore b/deps/rabbitmq_cli/.gitignore index 43c231de0dd8..9b987ada02eb 100644 --- a/deps/rabbitmq_cli/.gitignore +++ b/deps/rabbitmq_cli/.gitignore @@ -1 +1,2 @@ /deps/ +/logs diff --git a/deps/rabbitmq_cli/Makefile b/deps/rabbitmq_cli/Makefile index 616e44d640a4..024f71b12e2a 100644 --- a/deps/rabbitmq_cli/Makefile +++ b/deps/rabbitmq_cli/Makefile @@ -123,7 +123,11 @@ tests:: escript test-deps $(verbose) $(MAKE) -C ../../ install-cli $(verbose) $(MAKE) -C ../../ start-background-broker \ PLUGINS="rabbitmq_federation rabbitmq_stomp rabbitmq_stream_management amqp_client" \ - $(if $(filter khepri,$(RABBITMQ_METADATA_STORE)),,RABBITMQ_FEATURE_FLAGS="-khepri_db") + $(if $(filter khepri,$(RABBITMQ_METADATA_STORE)),,RABBITMQ_FEATURE_FLAGS="-khepri_db"); \ + rm -f logs; \ + log_dir=$$(../../sbin/rabbitmqctl eval 'io:format("~s~n", [maps:get(log_base_dir,rabbit_prelaunch:get_context())]).'); \ + log_dir=$$(echo "$$log_dir" | head -n 1); \ + ln -s "$$log_dir" logs $(gen_verbose) $(MIX_TEST) \ $(if $(RABBITMQ_METADATA_STORE),--exclude $(filter-out $(RABBITMQ_METADATA_STORE),khepri mnesia),) \ $(TEST_FILE); \ @@ -160,7 +164,7 @@ endif clean:: clean-mix clean-mix: - $(gen_verbose) rm -f $(ESCRIPT_FILE) $(LINKED_ESCRIPTS) + $(gen_verbose) rm -f $(ESCRIPT_FILE) $(LINKED_ESCRIPTS) logs $(verbose) echo y | mix clean format: diff --git a/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl b/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl index 35b3a5a70e5b..641f7e8596a3 100644 --- a/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl +++ b/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl @@ -26,6 +26,9 @@ all() -> {group, khepri_migration} ]. +suite() -> + [{timetrap, {minutes, 5}}]. + groups() -> [ {routing_tests, [], routing_tests()}, @@ -156,7 +159,7 @@ custom_header_undefined(Config) -> Exchange = <<"my exchange">>, Queue = <<"my queue">>, - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), #'exchange.declare_ok'{} = amqp_channel:call( Ch, #'exchange.declare' { @@ -179,7 +182,7 @@ custom_header_undefined(Config) -> ?assertMatch({#'basic.get_ok'{}, #amqp_msg{}}, amqp_channel:call(Ch, #'basic.get'{queue = Queue})), - rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), clean_up_test_topology(Config, Exchange, [Queue]), ok. @@ -373,7 +376,7 @@ test_with_timestamp(Config, Qs) -> Qs). test_mutually_exclusive_arguments(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), process_flag(trap_exit, true), Cmd = #'exchange.declare'{ @@ -384,11 +387,11 @@ test_mutually_exclusive_arguments(Config) -> }, ?assertExit(_, amqp_channel:call(Chan, Cmd)), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_non_supported_property(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), process_flag(trap_exit, true), Cmd = #'exchange.declare'{ @@ -398,7 +401,7 @@ test_non_supported_property(Config) -> }, ?assertExit(_, amqp_channel:call(Chan, Cmd)), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. rnd() -> @@ -411,13 +414,13 @@ test0(Config, MakeMethod, MakeMsg, DeclareArgs, Queues) -> test0(Config, MakeMethod, MakeMsg, DeclareArgs, Queues, ?DEFAULT_SAMPLE_COUNT). test0(Config, MakeMethod, MakeMsg, DeclareArgs, [Q1, Q2, Q3, Q4] = Queues, IterationCount) -> - Chan = rabbit_ct_client_helpers:open_channel(Config), - #'confirm.select_ok'{} = amqp_channel:call(Chan, #'confirm.select'{}), - CHX = <<"e">>, clean_up_test_topology(Config, CHX, Queues), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config), + #'confirm.select_ok'{} = amqp_channel:call(Chan, #'confirm.select'{}), + #'exchange.declare_ok'{} = amqp_channel:call(Chan, #'exchange.declare' { @@ -464,11 +467,11 @@ test0(Config, MakeMethod, MakeMsg, DeclareArgs, [Q1, Q2, Q3, Q4] = Queues, Itera [Chi, Obs]), clean_up_test_topology(Config, CHX, Queues), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_binding_with_negative_routing_key(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"bind-fail">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -482,15 +485,15 @@ test_binding_with_negative_routing_key(Config) -> Cmd = #'queue.bind'{exchange = <<"bind-fail">>, routing_key = <<"-1">>}, ?assertExit(_, amqp_channel:call(Chan, Cmd)), - Ch2 = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn2, Ch2} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), amqp_channel:call(Ch2, #'queue.delete'{queue = Q}), - rabbit_ct_client_helpers:close_channel(Chan), - rabbit_ct_client_helpers:close_channel(Ch2), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn2, Ch2), ok. test_binding_with_non_numeric_routing_key(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"bind-fail">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -505,10 +508,11 @@ test_binding_with_non_numeric_routing_key(Config) -> routing_key = <<"not-a-number">>}, ?assertExit(_, amqp_channel:call(Chan, Cmd)), - Ch2 = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn2, Ch2} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), amqp_channel:call(Ch2, #'queue.delete'{queue = Q}), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn2, Ch2), ok. %% @@ -516,7 +520,7 @@ test_binding_with_non_numeric_routing_key(Config) -> %% test_durable_exchange_hash_ring_recovery_between_node_restarts(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"test_hash_ring_recovery_between_node_restarts">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -547,11 +551,11 @@ test_durable_exchange_hash_ring_recovery_between_node_restarts(Config) -> assert_ring_consistency(Config, X), clean_up_test_topology(Config, X, Queues), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_hash_ring_updates_when_queue_is_deleted(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"test_hash_ring_updates_when_queue_is_deleted">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -576,11 +580,11 @@ test_hash_ring_updates_when_queue_is_deleted(Config) -> ?assertEqual(0, count_buckets_of_exchange(Config, X)), clean_up_test_topology(Config, X, [Q]), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_hash_ring_updates_when_multiple_queues_are_deleted(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"test_hash_ring_updates_when_multiple_queues_are_deleted">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -611,7 +615,7 @@ test_hash_ring_updates_when_multiple_queues_are_deleted(Config) -> ?assertEqual(0, count_buckets_of_exchange(Config, X)), clean_up_test_topology(Config, X, Queues), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure(Config) -> @@ -706,7 +710,7 @@ test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closu ok. test_hash_ring_updates_when_exchange_is_deleted(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"test_hash_ring_updates_when_exchange_is_deleted">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -734,11 +738,11 @@ test_hash_ring_updates_when_exchange_is_deleted(Config) -> ?assertEqual(0, count_buckets_of_exchange(Config, X)), clean_up_test_topology(Config, X, Queues), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_hash_ring_updates_when_queue_is_unbound(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"test_hash_ring_updates_when_queue_is_unbound">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -769,11 +773,11 @@ test_hash_ring_updates_when_queue_is_unbound(Config) -> ?assertEqual(8, count_buckets_of_exchange(Config, X)), clean_up_test_topology(Config, X, Queues), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -818,11 +822,11 @@ test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted(Co assert_ring_consistency(Config, X), clean_up_test_topology(Config, X, [Q1, Q2]), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -872,14 +876,14 @@ test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted( ?assertEqual(0, count_buckets_of_exchange(Config, X)), clean_up_test_topology(Config, X, [Q1, Q2]), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. %% Follows the setup described in %% https://github.com/rabbitmq/rabbitmq-server/issues/3386#issuecomment-1103929292 node_restart(Config) -> - Chan1 = rabbit_ct_client_helpers:open_channel(Config, 1), - Chan2 = rabbit_ct_client_helpers:open_channel(Config, 2), + {Conn1, Chan1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 1), + {Conn2, Chan2} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 2), X = atom_to_binary(?FUNCTION_NAME), #'exchange.declare_ok'{} = amqp_channel:call(Chan1, @@ -903,8 +907,8 @@ node_restart(Config) -> F(Chan1, QsNode1), F(Chan2, QsNode2), - rabbit_ct_client_helpers:close_channel(Chan1), - rabbit_ct_client_helpers:close_channel(Chan2), + rabbit_ct_client_helpers:close_connection_and_channel(Conn1, Chan1), + rabbit_ct_client_helpers:close_connection_and_channel(Conn2, Chan2), rabbit_ct_broker_helpers:restart_node(Config, 1), rabbit_ct_broker_helpers:restart_node(Config, 2), @@ -942,13 +946,14 @@ count_buckets_of_exchange(Config, X) -> from_mnesia_to_khepri(Config) -> Queues = [Q1, Q2, Q3, Q4] = ?RoutingTestQs, IterationCount = ?DEFAULT_SAMPLE_COUNT, - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), - #'confirm.select_ok'{} = amqp_channel:call(Chan, #'confirm.select'{}), CHX = <<"e">>, clean_up_test_topology(Config, CHX, Queues), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + #'confirm.select_ok'{} = amqp_channel:call(Chan, #'confirm.select'{}), + #'exchange.declare_ok'{} = amqp_channel:call(Chan, #'exchange.declare' { @@ -973,36 +978,32 @@ from_mnesia_to_khepri(Config) -> case rabbit_ct_broker_helpers:enable_feature_flag(Config, khepri_db) of ok -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, rabbit_consistent_hash_exchange_raft_based_metadata_store) of - ok -> - [amqp_channel:call(Chan, - #'basic.publish'{exchange = CHX, routing_key = rnd()}, - #amqp_msg{props = #'P_basic'{}, payload = <<>>}) - || _ <- lists:duplicate(IterationCount, const)], - amqp_channel:wait_for_confirms(Chan, 300), - timer:sleep(500), - Counts = - [begin - #'queue.declare_ok'{message_count = M} = - amqp_channel:call(Chan, #'queue.declare' {queue = Q, - exclusive = true}), - M - end || Q <- Queues], - ?assertEqual(IterationCount, lists:sum(Counts)), %% All messages got routed - %% Chi-square test - %% H0: routing keys are not evenly distributed according to weight - Expected = [IterationCount div 6, IterationCount div 6, (IterationCount div 6) * 2, (IterationCount div 6) * 2], - Obs = lists:zip(Counts, Expected), - Chi = lists:sum([((O - E) * (O - E)) / E || {O, E} <- Obs]), - ct:pal("Chi-square test for 3 degrees of freedom is ~p, p = 0.01 is 11.35, observations (counts, expected): ~p", - [Chi, Obs]), - clean_up_test_topology(Config, CHX, Queues), - rabbit_ct_client_helpers:close_channel(Chan), - ok; - Skip -> - Skip - end; + [amqp_channel:call(Chan, + #'basic.publish'{exchange = CHX, routing_key = rnd()}, + #amqp_msg{props = #'P_basic'{}, payload = <<>>}) + || _ <- lists:duplicate(IterationCount, const)], + amqp_channel:wait_for_confirms(Chan, 300), + timer:sleep(500), + Counts = + [begin + #'queue.declare_ok'{message_count = M} = + amqp_channel:call(Chan, #'queue.declare' {queue = Q, + exclusive = true}), + M + end || Q <- Queues], + ?assertEqual(IterationCount, lists:sum(Counts)), %% All messages got routed + %% Chi-square test + %% H0: routing keys are not evenly distributed according to weight + Expected = [IterationCount div 6, IterationCount div 6, (IterationCount div 6) * 2, (IterationCount div 6) * 2], + Obs = lists:zip(Counts, Expected), + Chi = lists:sum([((O - E) * (O - E)) / E || {O, E} <- Obs]), + ct:pal("Chi-square test for 3 degrees of freedom is ~p, p = 0.01 is 11.35, observations (counts, expected): ~p", + [Chi, Obs]), + clean_up_test_topology(Config, CHX, Queues), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), + ok; Skip -> + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), Skip end. @@ -1010,12 +1011,12 @@ clean_up_test_topology(Config) -> clean_up_test_topology(Config, none, ?AllQs). clean_up_test_topology(Config, none, Qs) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), [amqp_channel:call(Ch, #'queue.delete' {queue = Q}) || Q <- Qs], - rabbit_ct_client_helpers:close_channel(Ch); + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch); clean_up_test_topology(Config, X, Qs) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), amqp_channel:call(Ch, #'exchange.delete' {exchange = X}), [amqp_channel:call(Ch, #'queue.delete' {queue = Q}) || Q <- Qs], - rabbit_ct_client_helpers:close_channel(Ch). + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). diff --git a/deps/rabbitmq_federation/test/exchange_SUITE.erl b/deps/rabbitmq_federation/test/exchange_SUITE.erl index 9d6297f94dc8..58d617b5def1 100644 --- a/deps/rabbitmq_federation/test/exchange_SUITE.erl +++ b/deps/rabbitmq_federation/test/exchange_SUITE.erl @@ -660,7 +660,7 @@ child_id_format(Config) -> %% %% After that, the supervisors run on the new code. Config2 = rabbit_ct_broker_helpers:cluster_nodes( - Config1, [OldNodeA, NewNodeB, NewNodeD]), + Config1, OldNodeA, [NewNodeB, NewNodeD]), ok = rabbit_ct_broker_helpers:stop_broker(Config2, OldNodeA), ok = rabbit_ct_broker_helpers:reset_node(Config1, OldNodeA), ok = rabbit_ct_broker_helpers:stop_broker(Config2, OldNodeC), diff --git a/deps/rabbitmq_mqtt/test/auth_SUITE.erl b/deps/rabbitmq_mqtt/test/auth_SUITE.erl index f69d80a14c03..773e3cd0862f 100644 --- a/deps/rabbitmq_mqtt/test/auth_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/auth_SUITE.erl @@ -10,6 +10,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). %% not defined in v3 -define(SUBACK_FAILURE, 16#80). @@ -1241,6 +1242,7 @@ vhost_connection_limit(Config) -> {ok, _} = emqtt:connect(C1), {ok, C2} = connect_anonymous(Config, <<"client2">>), {ok, _} = emqtt:connect(C2), + ?awaitMatch(2, count_connections_per_vhost(Config), 30000), {ok, C3} = connect_anonymous(Config, <<"client3">>), ExpectedError = expected_connection_limit_error(Config), unlink(C3), @@ -1249,6 +1251,12 @@ vhost_connection_limit(Config) -> ok = emqtt:disconnect(C2), ok = rabbit_ct_broker_helpers:clear_vhost_limit(Config, 0, <<"/">>). +count_connections_per_vhost(Config) -> + rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_connection_tracking, count_local_tracked_items_in_vhost, + [<<"/">>]). + vhost_queue_limit(Config) -> ok = rabbit_ct_broker_helpers:set_vhost_limit(Config, 0, <<"/">>, max_queues, 1), {ok, C} = connect_anonymous(Config), @@ -1268,6 +1276,7 @@ user_connection_limit(Config) -> ok = rabbit_ct_broker_helpers:set_user_limits(Config, DefaultUser, #{max_connections => 1}), {ok, C1} = connect_anonymous(Config, <<"client1">>), {ok, _} = emqtt:connect(C1), + ?awaitMatch(1, count_connections_per_vhost(Config), 30000), {ok, C2} = connect_anonymous(Config, <<"client2">>), ExpectedError = expected_connection_limit_error(Config), unlink(C2), diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index d6964017dec1..c3376ff3d613 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -242,9 +242,14 @@ end_per_testcase(Testcase, Config) -> end_per_testcase0(Testcase, Config) -> rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), %% Assert that every testcase cleaned up their MQTT sessions. + _ = rpc(Config, ?MODULE, delete_queues, []), eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))), rabbit_ct_helpers:testcase_finished(Config, Testcase). +delete_queues() -> + [catch rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) + || Q <- rabbit_amqqueue:list()]. + %% ------------------------------------------------------------------- %% Testsuite cases %% ------------------------------------------------------------------- @@ -395,7 +400,7 @@ publish_to_all_queue_types_qos1(Config) -> publish_to_all_queue_types(Config, qos1). publish_to_all_queue_types(Config, QoS) -> - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), CQ = <<"classic-queue">>, QQ = <<"quorum-queue">>, @@ -447,7 +452,8 @@ publish_to_all_queue_types(Config, QoS) -> delete_queue(Ch, [CQ, QQ, SQ]), ok = emqtt:disconnect(C), ?awaitMatch([], - all_connection_pids(Config), 10_000, 1000). + all_connection_pids(Config), 10_000, 1000), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). publish_to_all_non_deprecated_queue_types_qos0(Config) -> publish_to_all_non_deprecated_queue_types(Config, qos0). @@ -456,7 +462,7 @@ publish_to_all_non_deprecated_queue_types_qos1(Config) -> publish_to_all_non_deprecated_queue_types(Config, qos1). publish_to_all_non_deprecated_queue_types(Config, QoS) -> - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), CQ = <<"classic-queue">>, QQ = <<"quorum-queue">>, @@ -506,7 +512,8 @@ publish_to_all_non_deprecated_queue_types(Config, QoS) -> delete_queue(Ch, [CQ, QQ, SQ]), ok = emqtt:disconnect(C), ?awaitMatch([], - all_connection_pids(Config), 10_000, 1000). + all_connection_pids(Config), 10_000, 1000), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). %% This test case does not require multiple nodes %% but it is grouped together with flow test cases for other queue types @@ -538,7 +545,7 @@ flow(Config, {App, Par, Val}, QueueType) Result = rpc_all(Config, application, set_env, [App, Par, Val]), ?assert(lists:all(fun(R) -> R =:= ok end, Result)), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), QueueName = Topic = atom_to_binary(?FUNCTION_NAME), declare_queue(Ch, QueueName, [{<<"x-queue-type">>, longstr, QueueType}]), bind(Ch, QueueName, Topic), @@ -566,7 +573,8 @@ flow(Config, {App, Par, Val}, QueueType) ?awaitMatch([], all_connection_pids(Config), 10_000, 1000), ?assertEqual(Result, - rpc_all(Config, application, set_env, [App, Par, DefaultVal])). + rpc_all(Config, application, set_env, [App, Par, DefaultVal])), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). events(Config) -> ok = rabbit_ct_broker_helpers:add_code_path_to_all_nodes(Config, event_recorder), @@ -810,9 +818,10 @@ queue_down_qos1(Config) -> ok = rabbit_ct_broker_helpers:start_node(Config, 1) end, - Ch0 = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch0} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), delete_queue(Ch0, CQ), - ok = emqtt:disconnect(C). + ok = emqtt:disconnect(C), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch0). %% Consuming classic queue on a different node goes down. consuming_classic_queue_down(Config) -> @@ -851,7 +860,7 @@ consuming_classic_queue_down(Config) -> ok. delete_create_queue(Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), CQ1 = <<"classic-queue-1-delete-create">>, CQ2 = <<"classic-queue-2-delete-create">>, QQ = <<"quorum-queue-delete-create">>, @@ -911,7 +920,8 @@ delete_create_queue(Config) -> 1000, 10), delete_queue(Ch, [CQ1, CQ2, QQ]), - ok = emqtt:disconnect(C). + ok = emqtt:disconnect(C), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). session_expiry(Config) -> App = rabbitmq_mqtt, @@ -1107,7 +1117,7 @@ large_message_amqp_to_mqtt(Config) -> C = connect(ClientId, Config), {ok, _, [1]} = emqtt:subscribe(C, {Topic, qos1}), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), Payload0 = binary:copy(<<"x">>, 8_000_000), Payload = <>, amqp_channel:call(Ch, @@ -1115,20 +1125,22 @@ large_message_amqp_to_mqtt(Config) -> routing_key = Topic}, #amqp_msg{payload = Payload}), ok = expect_publishes(C, Topic, [Payload]), - ok = emqtt:disconnect(C). + ok = emqtt:disconnect(C), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). amqp_to_mqtt_qos0(Config) -> Topic = ClientId = Payload = atom_to_binary(?FUNCTION_NAME), C = connect(ClientId, Config), {ok, _, [0]} = emqtt:subscribe(C, {Topic, qos0}), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), amqp_channel:call(Ch, #'basic.publish'{exchange = <<"amq.topic">>, routing_key = Topic}, #amqp_msg{payload = Payload}), ok = expect_publishes(C, Topic, [Payload]), - ok = emqtt:disconnect(C). + ok = emqtt:disconnect(C), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). %% Packet identifier is a non zero two byte integer. %% Test that the server wraps around the packet identifier. @@ -1609,7 +1621,7 @@ rabbit_status_connection_count(Config) -> trace(Config) -> Server = atom_to_binary(get_node_config(Config, 0, nodename)), Topic = Payload = TraceQ = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), declare_queue(Ch, TraceQ, []), #'queue.bind_ok'{} = amqp_channel:call( Ch, #'queue.bind'{queue = TraceQ, @@ -1664,11 +1676,12 @@ trace(Config) -> amqp_channel:call(Ch, #'basic.get'{queue = TraceQ})), delete_queue(Ch, TraceQ), - [ok = emqtt:disconnect(C) || C <- [Pub, Sub]]. + [ok = emqtt:disconnect(C) || C <- [Pub, Sub]], + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). trace_large_message(Config) -> TraceQ = <<"trace-queue">>, - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), declare_queue(Ch, TraceQ, []), #'queue.bind_ok'{} = amqp_channel:call( Ch, #'queue.bind'{queue = TraceQ, @@ -1693,7 +1706,8 @@ trace_large_message(Config) -> {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_off"]), delete_queue(Ch, TraceQ), - ok = emqtt:disconnect(C). + ok = emqtt:disconnect(C), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). max_packet_size_unauthenticated(Config) -> ClientId = ?FUNCTION_NAME, @@ -1784,7 +1798,7 @@ default_queue_type(Config) -> incoming_message_interceptors(Config) -> Key = ?FUNCTION_NAME, ok = rpc(Config, persistent_term, put, [Key, [{set_header_timestamp, false}]]), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), Payload = ClientId = Topic = atom_to_binary(?FUNCTION_NAME), CQName = <<"my classic queue">>, Stream = <<"my stream">>, @@ -1832,7 +1846,8 @@ incoming_message_interceptors(Config) -> delete_queue(Ch, Stream), delete_queue(Ch, CQName), true = rpc(Config, persistent_term, erase, [Key]), - ok = emqtt:disconnect(C). + ok = emqtt:disconnect(C), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). %% This test makes sure that a retained message that got written in 3.12 or earlier %% can be consumed in 3.13 or later. @@ -1872,7 +1887,7 @@ bind_exchange_to_exchange(Config) -> SourceX = <<"amq.topic">>, DestinationX = <<"destination">>, Q = <<"q">>, - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DestinationX, durable = true, auto_delete = true}), @@ -1890,13 +1905,14 @@ bind_exchange_to_exchange(Config) -> eventually(?_assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg">>}}, amqp_channel:call(Ch, #'basic.get'{queue = Q}))), #'queue.delete_ok'{message_count = 0} = amqp_channel:call(Ch, #'queue.delete'{queue = Q}), - ok = emqtt:disconnect(C). + ok = emqtt:disconnect(C), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). bind_exchange_to_exchange_single_message(Config) -> SourceX = <<"amq.topic">>, DestinationX = <<"destination">>, Q = <<"q">>, - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DestinationX, durable = true, auto_delete = true}), @@ -1923,7 +1939,8 @@ bind_exchange_to_exchange_single_message(Config) -> timer:sleep(10), ?assertEqual(#'queue.delete_ok'{message_count = 0}, amqp_channel:call(Ch, #'queue.delete'{queue = Q})), - ok = emqtt:disconnect(C). + ok = emqtt:disconnect(C), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). notify_consumer_qos0_queue_deleted(Config) -> Topic = atom_to_binary(?FUNCTION_NAME), diff --git a/deps/rabbitmq_mqtt/test/v5_SUITE.erl b/deps/rabbitmq_mqtt/test/v5_SUITE.erl index d0cff4eda23b..74462aedaa3c 100644 --- a/deps/rabbitmq_mqtt/test/v5_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/v5_SUITE.erl @@ -1073,12 +1073,7 @@ session_upgrade_v3_v5_qos(Qos, Config) -> {ok, _, [Qos]} = emqtt:subscribe(Subv3, Topic, Qos), Sender = spawn_link(?MODULE, send, [self(), Pub, Topic, 0]), receive {publish, #{payload := <<"1">>, - client_pid := Subv3, - packet_id := PacketId}} -> - case Qos of - 0 -> ok; - 1 -> emqtt:puback(Subv3, PacketId) - end + client_pid := Subv3}} -> ok after ?TIMEOUT -> ct:fail("did not receive 1") end, %% Upgrade session from v3 to v5 while another client is sending messages. @@ -1102,7 +1097,7 @@ session_upgrade_v3_v5_qos(Qos, Config) -> 0 -> assert_received_no_duplicates(); 1 -> - ExpectedPayloads = [integer_to_binary(I) || I <- lists:seq(2, NumSent - 1)], + ExpectedPayloads = [integer_to_binary(I) || I <- lists:seq(1, NumSent - 1)], ok = expect_publishes(Subv5, Topic, ExpectedPayloads) end, ok = emqtt:disconnect(Pub), diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index b14c0897f10e..535edd940e6a 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -63,6 +63,7 @@ groups() -> build_info_product_test ]}, {detailed_metrics, [], [ + stream_pub_sub_metrics, detailed_metrics_no_families_enabled_by_default, queue_consumer_count_single_vhost_per_object_test, queue_consumer_count_all_vhosts_per_object_test, @@ -74,8 +75,7 @@ groups() -> queue_consumer_count_and_queue_metrics_mutually_exclusive_test, vhost_status_metric, exchange_bindings_metric, - exchange_names_metric, - stream_pub_sub_metrics + exchange_names_metric ]}, {special_chars, [], [core_metrics_special_chars]}, {authentication, [], [basic_auth]} @@ -476,7 +476,7 @@ identity_info_test(Config) -> specific_erlang_metrics_present_test(Config) -> {_Headers, Body} = http_get_with_pal(Config, [], 200), - ?assertEqual(match, re:run(Body, "^erlang_vm_dist_node_queue_size_bytes{", [{capture, none}, multiline])). + ?assertEqual(match, re:run(Body, "^erlang_vm_dirty_io_schedulers ", [{capture, none}, multiline])). global_metrics_present_test(Config) -> {_Headers, Body} = http_get_with_pal(Config, [], 200), @@ -782,6 +782,10 @@ exchange_names_metric(Config) -> ok. stream_pub_sub_metrics(Config) -> + {_, Body0} = http_get_with_pal(Config, "/metrics", [], 200), + Metrics = parse_response(Body0), + ct:pal("Initial metrics: ~p", [Metrics]), + Stream1 = atom_to_list(?FUNCTION_NAME) ++ "1", MsgPerBatch1 = 2, {ok, S1, C1} = publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config), diff --git a/deps/rabbitmq_shovel/test/amqp10_inter_cluster_SUITE.erl b/deps/rabbitmq_shovel/test/amqp10_inter_cluster_SUITE.erl index 96e414adc387..580b6ec947d5 100644 --- a/deps/rabbitmq_shovel/test/amqp10_inter_cluster_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp10_inter_cluster_SUITE.erl @@ -9,6 +9,9 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). + +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). + -compile([export_all, nowarn_export_all]). -import(rabbit_ct_broker_helpers, [rpc/5]). @@ -72,22 +75,23 @@ end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). old_to_new_on_old(Config) -> - ok = shovel(?OLD, ?NEW, ?OLD, Config). + ok = shovel(?FUNCTION_NAME, ?OLD, ?NEW, ?OLD, Config). old_to_new_on_new(Config) -> - ok = shovel(?OLD, ?NEW, ?NEW, Config). + ok = shovel(?FUNCTION_NAME, ?OLD, ?NEW, ?NEW, Config). new_to_old_on_old(Config) -> - ok = shovel(?NEW, ?OLD, ?OLD, Config). + ok = shovel(?FUNCTION_NAME, ?NEW, ?OLD, ?OLD, Config). new_to_old_on_new(Config) -> - ok = shovel(?NEW, ?OLD, ?NEW, Config). + ok = shovel(?FUNCTION_NAME, ?NEW, ?OLD, ?NEW, Config). -shovel(SrcNode, DestNode, ShovelNode, Config) -> +shovel(Caller, SrcNode, DestNode, ShovelNode, Config) -> SrcUri = shovel_test_utils:make_uri(Config, SrcNode), DestUri = shovel_test_utils:make_uri(Config, DestNode), - SrcQ = <<"my source queue">>, - DestQ = <<"my destination queue">>, + ShovelName = atom_to_binary(Caller), + SrcQ = <>, + DestQ = <>, Definition = [ {<<"src-uri">>, SrcUri}, {<<"src-protocol">>, <<"amqp10">>}, @@ -96,7 +100,6 @@ shovel(SrcNode, DestNode, ShovelNode, Config) -> {<<"dest-protocol">>, <<"amqp10">>}, {<<"dest-address">>, DestQ} ], - ShovelName = <<"my shovel">>, ok = rpc(Config, ShovelNode, rabbit_runtime_parameters, set, [<<"/">>, <<"shovel">>, ShovelName, Definition, none]), ok = shovel_test_utils:await_shovel(Config, ShovelNode, ShovelName), @@ -125,6 +128,7 @@ shovel(SrcNode, DestNode, ShovelNode, Config) -> ok = amqp10_client:flow_link_credit(Receiver, NumMsgs, never), Msgs = receive_messages(Receiver, NumMsgs), + ct:pal("~b messages:~n~p", [length(Msgs), Msgs]), lists:map( fun(N) -> Msg = lists:nth(N, Msgs), @@ -136,8 +140,28 @@ shovel(SrcNode, DestNode, ShovelNode, Config) -> ok = rpc(Config, ShovelNode, rabbit_runtime_parameters, clear, [<<"/">>, <<"shovel">>, ShovelName, none]), ExpectedQueueLen = 0, - ?assertEqual([ExpectedQueueLen], rpc(Config, ?OLD, ?MODULE, delete_queues, [])), - ?assertEqual([ExpectedQueueLen], rpc(Config, ?NEW, ?MODULE, delete_queues, [])). + ?awaitMatch( + [{_, ExpectedQueueLen}], + begin + Ret = rpc(Config, ?OLD, ?MODULE, queues_length, []), + ct:pal("Queues on old: ~p", [Ret]), + Ret + end, + 30000), + ?awaitMatch( + [{_, ExpectedQueueLen}], + begin + Ret = rpc(Config, ?NEW, ?MODULE, queues_length, []), + ct:pal("Queues on new: ~p", [Ret]), + Ret + end, + 30000), + ?assertEqual( + [ExpectedQueueLen], + rpc(Config, ?OLD, ?MODULE, delete_queues, [])), + ?assertEqual( + [ExpectedQueueLen], + rpc(Config, ?NEW, ?MODULE, delete_queues, [])). wait_for_credit(Sender) -> receive @@ -170,6 +194,13 @@ flush(Prefix) -> ok end. +queues_length() -> + [begin + #{<<"name">> := Name} = amqqueue:to_printable(Q), + [{messages, N}] = rabbit_amqqueue:info(Q, [messages]), + {Name, N} + end || Q <- rabbit_amqqueue:list()]. + delete_queues() -> [begin {ok, N} = rabbit_amqqueue:delete(Q, false, false, <<"tests">>), diff --git a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl index aa1f34e38634..099fb1de9f38 100644 --- a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl @@ -871,16 +871,16 @@ dest_resource_alarm(AckMode, Config) -> %%---------------------------------------------------------------------------- with_ch(Config, Fun) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Fun(Ch), - rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), cleanup(Config), ok. with_newch(Config, Fun) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Fun(Ch), - rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ok. publish(Ch, X, Key, Payload) when is_binary(Payload) -> diff --git a/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl b/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl index 57afc089d160..5c3221febc0d 100644 --- a/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl +++ b/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl @@ -101,7 +101,7 @@ child_id_format(Config) -> %% Node 4: the secondary umbrella %% ... %% - %% Therefore, `Pouet' will use the primary copy, `OldNode' the secondary + %% Therefore, `NewNode' will use the primary copy, `OldNode' the secondary %% umbrella, `NewRefNode' the primary copy, and `NodeWithQueues' the %% secondary umbrella. @@ -221,7 +221,7 @@ child_id_format(Config) -> %% After that, the supervisors run on the new code. ct:pal("Clustering nodes ~s and ~s", [OldNode, NewNode]), Config1 = rabbit_ct_broker_helpers:cluster_nodes( - Config, [OldNode, NewNode]), + Config, OldNode, [NewNode]), ok = rabbit_ct_broker_helpers:stop_broker(Config1, OldNode), ok = rabbit_ct_broker_helpers:reset_node(Config1, OldNode), diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/base.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/base.py index af643737c23c..a08e5f03f51d 100644 --- a/deps/rabbitmq_stomp/test/python_SUITE_data/src/base.py +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/base.py @@ -126,7 +126,10 @@ def tearDown(self): if self.conn.is_connected(): try: self.conn.disconnect() - except: + except Exception as inst: + print(type(inst)) + print(inst.args) + print(inst) pass elapsed = time.time() - self._started_at print('{} ({}s)'.format(self.id(), round(elapsed, 2))) diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/requirements.txt b/deps/rabbitmq_stomp/test/python_SUITE_data/src/requirements.txt index fd2cc9d6beb1..789ce525d372 100644 --- a/deps/rabbitmq_stomp/test/python_SUITE_data/src/requirements.txt +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/requirements.txt @@ -1,3 +1,3 @@ -stomp.py==8.1.0 -pika==1.1.0 +stomp.py==8.2.0 +pika==1.3.2 rabbitman===0.1.0 diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_runner.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_runner.py index 8b15a5b89b4d..32cdd61e9621 100644 --- a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_runner.py +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_runner.py @@ -20,7 +20,7 @@ def run_unittests(modules): if name.startswith("Test") and issubclass(obj, unittest.TestCase): suite.addTest(unittest.TestLoader().loadTestsFromTestCase(obj)) - ts = unittest.TextTestRunner().run(unittest.TestSuite(suite)) + ts = unittest.TextTestRunner(verbosity=10).run(unittest.TestSuite(suite)) if ts.errors or ts.failures: sys.exit(1) diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_name.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_name.py index 2aed99ec31f9..a5e783d52d75 100644 --- a/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_name.py +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_name.py @@ -11,6 +11,7 @@ import base import time import os +import test_util class TestUserGeneratedQueueName(base.BaseTest): @@ -29,6 +30,12 @@ def test_exchange_dest(self): pika.ConnectionParameters( host='127.0.0.1', port=int(os.environ["AMQP_PORT"]))) channel = connection.channel() + test_util.rabbitmqctl(['list_queues']) + test_util.rabbitmqctl(['list_connections', 'peer_host', 'peer_port', + 'protocol']) + test_util.rabbitmqctl(['list_stomp_connections', 'peer_host', + 'peer_port', 'protocol']) + # publish a message to the named queue channel.basic_publish( exchange='', @@ -36,11 +43,13 @@ def test_exchange_dest(self): body='Hello World!') # check if we receive the message from the STOMP subscription - self.assertTrue(self.listener.wait(5), "initial message not received") + self.assertTrue(self.listener.wait(30), "initial message not received") self.assertEqual(1, len(self.listener.messages)) - self.conn.disconnect() + # self.conn.disconnect() connection.close() + while not connection.is_closed: + time.sleep(1) def test_topic_dest(self): queueName='my-user-generated-queue-name-topic' @@ -57,6 +66,12 @@ def test_topic_dest(self): pika.ConnectionParameters( host='127.0.0.1', port=int(os.environ["AMQP_PORT"]))) channel = connection.channel() + test_util.rabbitmqctl(['list_queues']) + test_util.rabbitmqctl(['list_connections', 'peer_host', 'peer_port', + 'protocol']) + test_util.rabbitmqctl(['list_stomp_connections', 'peer_host', + 'peer_port', 'protocol']) + # publish a message to the named queue channel.basic_publish( exchange='', @@ -64,11 +79,13 @@ def test_topic_dest(self): body='Hello World!') # check if we receive the message from the STOMP subscription - self.assertTrue(self.listener.wait(5), "initial message not received") + self.assertTrue(self.listener.wait(30), "initial message not received") self.assertEqual(1, len(self.listener.messages)) - self.conn.disconnect() + # self.conn.disconnect() connection.close() + while not connection.is_closed: + time.sleep(1) if __name__ == '__main__': diff --git a/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl index e6c69bc17bd1..1b9e6b0b8237 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl @@ -211,7 +211,7 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co %% the coordinator leader node will be isolated ?assertNotEqual(L#node.name, CL), - log("Stream leader and coordinator leader are on ~p", [L#node.name]), + log("Coordinator leader on: ~0p~nStream leader on: ~0p", [CL, L#node.name]), {ok, So0, C0_00} = stream_test_utils:connect(Config, CL), {ok, So1, C1_00} = stream_test_utils:connect(Config, CF1), diff --git a/deps/rabbitmq_stream_management/test/http_SUITE_data/Makefile b/deps/rabbitmq_stream_management/test/http_SUITE_data/Makefile index dae43a1ad68c..994584d771a7 100644 --- a/deps/rabbitmq_stream_management/test/http_SUITE_data/Makefile +++ b/deps/rabbitmq_stream_management/test/http_SUITE_data/Makefile @@ -2,7 +2,8 @@ export PATH :=$(CURDIR):$(PATH) HOSTNAME := $(shell hostname) MVN_FLAGS += -Dstream.port=$(STREAM_PORT) \ -Dstream.port.tls=$(STREAM_PORT_TLS) \ - -Dmanagement.port=$(MANAGEMENT_PORT) + -Dmanagement.port=$(MANAGEMENT_PORT) \ + -Dmaven.wagon.http.retryHandler.count=5 .PHONY: tests clean