diff --git a/deps/oauth2_client/test/system_SUITE.erl b/deps/oauth2_client/test/system_SUITE.erl index a0be9dd3976d..c58bbf371398 100644 --- a/deps/oauth2_client/test/system_SUITE.erl +++ b/deps/oauth2_client/test/system_SUITE.erl @@ -93,6 +93,7 @@ init_per_group(https, Config) -> CaCertFile = filename:join([CertsDir, "testca", "cacert.pem"]), WrongCaCertFile = filename:join([CertsDir, "server", "server.pem"]), [{group, https}, + {certsdir, CertsDir}, {oauth_provider_id, <<"uaa">>}, {oauth_provider, build_https_oauth_provider(<<"uaa">>, CaCertFile)}, {oauth_provider_with_issuer, keep_only_issuer_and_ssl_options( @@ -110,7 +111,7 @@ init_per_group(https_down, Config) -> [{issuer, build_issuer("https")}, {oauth_provider_id, <<"uaa">>}, - {oauth_provider, build_https_oauth_provider(<<"uaa">>, CaCertFile)} | Config]; + {oauth_provider, build_https_oauth_provider(<<"uaa">>, CaCertFile)} | Config0]; init_per_group(openid_configuration_with_path, Config) -> [{use_openid_configuration_with_path, true} | Config]; @@ -122,7 +123,7 @@ init_per_group(with_all_oauth_provider_settings, Config) -> [{with_all_oauth_provider_settings, true}, {oauth_provider_id, <<"uaa">>}, - {oauth_provider, build_https_oauth_provider(<<"uaa">>, CaCertFile)} | Config]; + {oauth_provider, build_https_oauth_provider(<<"uaa">>, CaCertFile)} | Config0]; init_per_group(without_all_oauth_providers_settings, Config) -> Config0 = rabbit_ct_helpers:run_setup_steps(Config), @@ -132,7 +133,7 @@ init_per_group(without_all_oauth_providers_settings, Config) -> [{with_all_oauth_provider_settings, false}, {oauth_provider_id, <<"uaa">>}, {oauth_provider, keep_only_issuer_and_ssl_options( - build_https_oauth_provider(<<"uaa">>, CaCertFile))} | Config]; + build_https_oauth_provider(<<"uaa">>, CaCertFile))} | Config0]; init_per_group(with_default_oauth_provider, Config) -> OAuthProvider = ?config(oauth_provider, Config), @@ -245,8 +246,7 @@ init_per_testcase(TestCase, Config) -> case ?config(group, Config) of https -> ct:log("Start https with expectations ~p", [ListOfExpectations]), - start_https_oauth_server(?AUTH_PORT, ?config(rmq_certsdir, Config), - ListOfExpectations); + start_https_oauth_server(?AUTH_PORT, ?config(certsdir, Config), ListOfExpectations); _ -> do_nothing end, diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 26e4e78f3da7..851db5e2c9a2 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -341,6 +341,7 @@ init_per_testcase(Testcase, Config) -> end_per_testcase(Testcase, Config) -> %% Assert that every testcase cleaned up. + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))), %% Wait for sessions to terminate before starting the next test case. eventually(?_assertEqual([], rpc(Config, rabbit_amqp_session, list_local, []))), @@ -350,6 +351,10 @@ end_per_testcase(Testcase, Config) -> get_global_counters(Config))), rabbit_ct_helpers:testcase_finished(Config, Testcase). +delete_queues() -> + [rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) + || Q <- rabbit_amqqueue:list()]. + reliable_send_receive_with_outcomes_classic_queue(Config) -> reliable_send_receive_with_outcomes(<<"classic">>, Config). diff --git a/deps/rabbit/test/amqp_filtex_SUITE.erl b/deps/rabbit/test/amqp_filtex_SUITE.erl new file mode 100644 index 000000000000..cd088a7d6088 --- /dev/null +++ b/deps/rabbit/test/amqp_filtex_SUITE.erl @@ -0,0 +1,665 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +%% Test suite for +%% AMQP Filter Expressions Version 1.0 Working Draft 09 +-module(amqp_filtex_SUITE). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp10_common/include/amqp10_filtex.hrl"). +-include_lib("amqp10_common/include/amqp10_framing.hrl"). + +-compile([nowarn_export_all, + export_all]). + +-import(rabbit_ct_broker_helpers, + [rpc/4]). +-import(rabbit_ct_helpers, + [eventually/1]). +-import(amqp_utils, + [init/1, + connection_config/1, + flush/1, + wait_for_credit/1, + wait_for_accepts/1, + send_messages/3, + detach_link_sync/1, + end_session_sync/1, + wait_for_session_end/1, + close_connection_sync/1]). + +all() -> + [ + {group, cluster_size_1} + ]. + +groups() -> + [ + {cluster_size_1, [shuffle], + [ + properties_section, + application_properties_section, + multiple_sections, + filter_few_messages_from_many, + string_modifier + ]} + ]. + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(amqp10_client), + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:merge_app_env( + Config, {rabbit, [{quorum_tick_interval, 1000}, + {stream_tick_interval, 1000} + ]}). + +end_per_suite(Config) -> + Config. + +init_per_group(_Group, Config) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config( + Config, [{rmq_nodename_suffix, Suffix}]), + rabbit_ct_helpers:run_setup_steps( + Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_group(_, Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + %% Assert that every testcase cleaned up. + eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))), + %% Wait for sessions to terminate before starting the next test case. + eventually(?_assertEqual([], rpc(Config, rabbit_amqp_session, list_local, []))), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +properties_section(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(Stream), + + OpnConf0 = connection_config(Config), + OpnConf = OpnConf0#{notify_with_performative => true}, + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), + {ok, #{}} = rabbitmq_amqp_client:declare_queue( + LinkPair, + Stream, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + Now = erlang:system_time(millisecond), + To = rabbitmq_amqp_address:exchange(<<"some exchange">>, <<"routing key">>), + ReplyTo = rabbitmq_amqp_address:queue(<<"some queue">>), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{message_id => {ulong, 999}, + user_id => <<"guest">>, + to => To, + subject => <<"🐇"/utf8>>, + reply_to => ReplyTo, + correlation_id => <<"corr-123">>, + content_type => <<"text/plain">>, + content_encoding => <<"some encoding">>, + absolute_expiry_time => Now + 100_000, + creation_time => Now, + group_id => <<"my group ID">>, + group_sequence => 16#ff_ff_ff_ff, + reply_to_group_id => <<"other group ID">>}, + amqp10_msg:new(<<"t1">>, <<"m1">>))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:new(<<"t2">>, <<"m2">>)), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{group_id => <<"my group ID">>}, + amqp10_msg:new(<<"t3">>, <<"m3">>))), + + ok = wait_for_accepts(3), + ok = detach_link_sync(Sender), + flush(sent), + + PropsFilter1 = [ + {{symbol, <<"message-id">>}, {ulong, 999}}, + {{symbol, <<"user-id">>}, {binary, <<"guest">>}}, + {{symbol, <<"subject">>}, {utf8, <<"🐇"/utf8>>}}, + {{symbol, <<"to">>}, {utf8, To}}, + {{symbol, <<"reply-to">>}, {utf8, ReplyTo}}, + {{symbol, <<"correlation-id">>}, {utf8, <<"corr-123">>}}, + {{symbol, <<"content-type">>}, {symbol, <<"text/plain">>}}, + {{symbol, <<"content-encoding">>}, {symbol, <<"some encoding">>}}, + {{symbol, <<"absolute-expiry-time">>}, {timestamp, Now + 100_000}}, + {{symbol, <<"creation-time">>}, {timestamp, Now}}, + {{symbol, <<"group-id">>}, {utf8, <<"my group ID">>}}, + {{symbol, <<"group-sequence">>}, {uint, 16#ff_ff_ff_ff}}, + {{symbol, <<"reply-to-group-id">>}, {utf8, <<"other group ID">>}} + ], + Filter1 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter1}}, + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, Address, + settled, configuration, Filter1), + ok = amqp10_client:flow_link_credit(Receiver1, 10, never), + receive {amqp10_msg, Receiver1, R1M1} -> + ?assertEqual([<<"m1">>], amqp10_msg:body(R1M1)) + after 30000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = assert_no_msg_received(?LINE), + ok = detach_link_sync(Receiver1), + + PropsFilter2 = [{{symbol, <<"group-id">>}, {utf8, <<"my group ID">>}}], + Filter2 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter2}}, + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, Address, + unsettled, configuration, Filter2), + {ok, R2M1} = amqp10_client:get_msg(Receiver2), + {ok, R2M2} = amqp10_client:get_msg(Receiver2), + ok = amqp10_client:accept_msg(Receiver2, R2M1), + ok = amqp10_client:accept_msg(Receiver2, R2M2), + ?assertEqual([<<"m1">>], amqp10_msg:body(R2M1)), + ?assertEqual([<<"m3">>], amqp10_msg:body(R2M2)), + ok = detach_link_sync(Receiver2), + + %% Filter is in place, but no message matches. + PropsFilter3 = [{{symbol, <<"group-id">>}, {utf8, <<"no match">>}}], + Filter3 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter3}}, + {ok, Receiver3} = amqp10_client:attach_receiver_link( + Session, <<"receiver 3">>, Address, + unsettled, configuration, Filter3), + receive {amqp10_event, {link, Receiver3, {attached, #'v1_0.attach'{}}}} -> ok + after 30000 -> ct:fail({missing_event, ?LINE}) + end, + ok = amqp10_client:flow_link_credit(Receiver3, 10, never), + ok = assert_no_msg_received(?LINE), + ok = detach_link_sync(Receiver3), + + %% Wrong type should fail validation in the server. + %% RabbitMQ should exclude this filter in its reply attach frame because + %% "the sending endpoint [RabbitMQ] sets the filter actually in place". + %% Hence, no filter expression is actually in place and we should receive all messages. + PropsFilter4 = [{{symbol, <<"group-id">>}, {uint, 3}}], + Filter4 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter4}}, + {ok, Receiver4} = amqp10_client:attach_receiver_link( + Session, <<"receiver 4">>, Address, + unsettled, configuration, Filter4), + receive {amqp10_event, + {link, Receiver4, + {attached, #'v1_0.attach'{ + source = #'v1_0.source'{filter = {map, ActualFilter}}}}}} -> + ?assertMatch([{{symbol,<<"rabbitmq:stream-offset-spec">>}, _}], + ActualFilter) + after 30000 -> ct:fail({missing_event, ?LINE}) + end, + {ok, R4M1} = amqp10_client:get_msg(Receiver4), + {ok, R4M2} = amqp10_client:get_msg(Receiver4), + {ok, R4M3} = amqp10_client:get_msg(Receiver4), + ok = amqp10_client:accept_msg(Receiver4, R4M1), + ok = amqp10_client:accept_msg(Receiver4, R4M2), + ok = amqp10_client:accept_msg(Receiver4, R4M3), + ?assertEqual([<<"m1">>], amqp10_msg:body(R4M1)), + ?assertEqual([<<"m2">>], amqp10_msg:body(R4M2)), + ?assertEqual([<<"m3">>], amqp10_msg:body(R4M3)), + ok = detach_link_sync(Receiver4), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection). + +application_properties_section(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(Stream), + OpnConf0 = connection_config(Config), + OpnConf = OpnConf0#{notify_with_performative => true}, + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), + {ok, #{}} = rabbitmq_amqp_client:declare_queue( + LinkPair, + Stream, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_application_properties( + #{<<"k1">> => -2, + <<"k2">> => 10, + <<"k3">> => false, + <<"k4">> => true, + <<"k5">> => <<"hey">>}, + amqp10_msg:new(<<"t1">>, <<"m1">>))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_application_properties( + #{<<"k2">> => 10.1}, + amqp10_msg:new(<<"t2">>, <<"m2">>))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:new(<<"t3">>, <<"m3">>)), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_application_properties( + #{<<"k2">> => 10.0}, + amqp10_msg:new(<<"t4">>, <<"m4">>))), + + ok = wait_for_accepts(4), + ok = detach_link_sync(Sender), + flush(sent), + + AppPropsFilter0 = [{{utf8, <<"k5">>}, {symbol, <<"no match">>}}], + Filter0 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter0}}, + {ok, Receiver0} = amqp10_client:attach_receiver_link( + Session, <<"receiver 0">>, Address, + unsettled, configuration, Filter0), + %% Wait for the attach so the detach command won't fail + receive {amqp10_event, + {link, Receiver0, {attached, #'v1_0.attach'{}}}} -> + ok + after 30000 -> ct:fail({missing_event, ?LINE}) + end, + ok = amqp10_client:flow_link_credit(Receiver0, 10, never), + ok = assert_no_msg_received(?LINE), + ok = detach_link_sync(Receiver0), + + AppPropsFilter1 = [ + {{utf8, <<"k1">>}, {int, -2}}, + {{utf8, <<"k5">>}, {symbol, <<"hey">>}}, + {{utf8, <<"k4">>}, {boolean, true}}, + {{utf8, <<"k3">>}, false} + ], + Filter1 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter1}}, + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, Address, + settled, configuration, Filter1), + receive {amqp10_event, + {link, Receiver1, + {attached, #'v1_0.attach'{ + source = #'v1_0.source'{filter = {map, ActualFilter1}}}}}} -> + ?assertMatch( + {described, _Type, {map, [ + {{utf8, <<"k1">>}, {int, -2}}, + {{utf8, <<"k5">>}, {symbol, <<"hey">>}}, + {{utf8, <<"k4">>}, true}, + {{utf8, <<"k3">>}, false} + ]}}, + proplists:get_value({symbol, ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER}, ActualFilter1)) + after 30000 -> ct:fail({missing_event, ?LINE}) + end, + ok = amqp10_client:flow_link_credit(Receiver1, 10, never), + receive {amqp10_msg, Receiver1, R1M1} -> + ?assertEqual([<<"m1">>], amqp10_msg:body(R1M1)) + after 30000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = assert_no_msg_received(?LINE), + ok = detach_link_sync(Receiver1), + + %% Due to simple type matching [filtex-v1.0-wd09 §4.1.1] + %% we expect integer 10 to also match number 10.0. + AppPropsFilter2 = [{{utf8, <<"k2">>}, {uint, 10}}], + Filter2 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter2}}, + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, Address, + unsettled, configuration, Filter2), + {ok, R2M1} = amqp10_client:get_msg(Receiver2), + {ok, R2M2} = amqp10_client:get_msg(Receiver2), + ok = amqp10_client:accept_msg(Receiver2, R2M1), + ok = amqp10_client:accept_msg(Receiver2, R2M2), + ?assertEqual([<<"m1">>], amqp10_msg:body(R2M1)), + ?assertEqual([<<"m4">>], amqp10_msg:body(R2M2)), + ok = detach_link_sync(Receiver2), + + %% A reference field value of NULL should always match. [filtex-v1.0-wd09 §4.1.1] + AppPropsFilter3 = [{{utf8, <<"k2">>}, null}], + Filter3 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter3}}, + {ok, Receiver3} = amqp10_client:attach_receiver_link( + Session, <<"receiver 3">>, Address, + unsettled, configuration, Filter3), + {ok, R3M1} = amqp10_client:get_msg(Receiver3), + {ok, R3M2} = amqp10_client:get_msg(Receiver3), + {ok, R3M3} = amqp10_client:get_msg(Receiver3), + ok = amqp10_client:accept_msg(Receiver3, R3M1), + ok = amqp10_client:accept_msg(Receiver3, R3M2), + ok = amqp10_client:accept_msg(Receiver3, R3M3), + ?assertEqual([<<"m1">>], amqp10_msg:body(R3M1)), + ?assertEqual([<<"m2">>], amqp10_msg:body(R3M2)), + ?assertEqual([<<"m4">>], amqp10_msg:body(R3M3)), + ok = detach_link_sync(Receiver3), + + %% Wrong type should fail validation in the server. + %% RabbitMQ should exclude this filter in its reply attach frame because + %% "the sending endpoint [RabbitMQ] sets the filter actually in place". + %% Hence, no filter expression is actually in place and we should receive all messages. + AppPropsFilter4 = [{{symbol, <<"k2">>}, {uint, 10}}], + Filter4 = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter4}}, + {ok, Receiver4} = amqp10_client:attach_receiver_link( + Session, <<"receiver 4">>, Address, + unsettled, configuration, Filter4), + receive {amqp10_event, + {link, Receiver4, + {attached, #'v1_0.attach'{ + source = #'v1_0.source'{filter = {map, ActualFilter4}}}}}} -> + ?assertMatch([{{symbol,<<"rabbitmq:stream-offset-spec">>}, _}], + ActualFilter4) + after 30000 -> ct:fail({missing_event, ?LINE}) + end, + {ok, R4M1} = amqp10_client:get_msg(Receiver4), + {ok, R4M2} = amqp10_client:get_msg(Receiver4), + {ok, R4M3} = amqp10_client:get_msg(Receiver4), + {ok, R4M4} = amqp10_client:get_msg(Receiver4), + ok = amqp10_client:accept_msg(Receiver4, R4M1), + ok = amqp10_client:accept_msg(Receiver4, R4M2), + ok = amqp10_client:accept_msg(Receiver4, R4M3), + ok = amqp10_client:accept_msg(Receiver4, R4M4), + ?assertEqual([<<"m1">>], amqp10_msg:body(R4M1)), + ?assertEqual([<<"m2">>], amqp10_msg:body(R4M2)), + ?assertEqual([<<"m3">>], amqp10_msg:body(R4M3)), + ?assertEqual([<<"m4">>], amqp10_msg:body(R4M4)), + ok = detach_link_sync(Receiver4), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection). + +%% Test filter expressions matching multiple message sections. +multiple_sections(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(Stream), + {Connection, Session, LinkPair} = init(Config), + {ok, #{}} = rabbitmq_amqp_client:declare_queue( + LinkPair, + Stream, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{subject => <<"The Subject">>}, + amqp10_msg:new(<<"t1">>, <<"m1">>))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_application_properties( + #{<<"The Key">> => -123}, + amqp10_msg:new(<<"t2">>, <<"m2">>))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{subject => <<"The Subject">>}, + amqp10_msg:set_application_properties( + #{<<"The Key">> => -123}, + amqp10_msg:new(<<"t3">>, <<"m3">>)))), + + ok = wait_for_accepts(3), + ok = detach_link_sync(Sender), + flush(sent), + + PropsFilter = [{{symbol, <<"subject">>}, {utf8, <<"The Subject">>}}], + Filter1 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, Address, + unsettled, configuration, Filter1), + {ok, R1M1} = amqp10_client:get_msg(Receiver1), + {ok, R1M3} = amqp10_client:get_msg(Receiver1), + ok = amqp10_client:accept_msg(Receiver1, R1M1), + ok = amqp10_client:accept_msg(Receiver1, R1M3), + ?assertEqual([<<"m1">>], amqp10_msg:body(R1M1)), + ?assertEqual([<<"m3">>], amqp10_msg:body(R1M3)), + ok = detach_link_sync(Receiver1), + + AppPropsFilter = [{{utf8, <<"The Key">>}, {byte, -123}}], + Filter2 = #{?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, Address, + unsettled, configuration, Filter2), + {ok, R2M2} = amqp10_client:get_msg(Receiver2), + {ok, R2M3} = amqp10_client:get_msg(Receiver2), + ok = amqp10_client:accept_msg(Receiver2, R2M2), + ok = amqp10_client:accept_msg(Receiver2, R2M3), + ?assertEqual([<<"m2">>], amqp10_msg:body(R2M2)), + ?assertEqual([<<"m3">>], amqp10_msg:body(R2M3)), + ok = detach_link_sync(Receiver2), + + Filter3 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter}, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver3} = amqp10_client:attach_receiver_link( + Session, <<"receiver 3">>, Address, + unsettled, configuration, Filter3), + {ok, R3M3} = amqp10_client:get_msg(Receiver3), + ok = amqp10_client:accept_msg(Receiver3, R3M3), + ?assertEqual([<<"m3">>], amqp10_msg:body(R3M3)), + ok = detach_link_sync(Receiver3), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection). + +%% Filter a small subset from many messages. +%% We test here that flow control still works correctly. +filter_few_messages_from_many(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(Stream), + {Connection, Session, LinkPair} = init(Config), + {ok, #{}} = rabbitmq_amqp_client:declare_queue( + LinkPair, + Stream, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{group_id => <<"my group ID">>}, + amqp10_msg:new(<<"t1">>, <<"first msg">>))), + ok = send_messages(Sender, 1000, false), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{group_id => <<"my group ID">>}, + amqp10_msg:new(<<"t2">>, <<"last msg">>))), + ok = wait_for_accepts(1002), + ok = detach_link_sync(Sender), + flush(sent), + + %% Our filter should cause us to receive only the first and + %% last message out of the 1002 messages in the stream. + PropsFilter = [{{symbol, <<"group-id">>}, {utf8, <<"my group ID">>}}], + Filter = #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, + ?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter}}, + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, Address, + unsettled, configuration, Filter), + + ok = amqp10_client:flow_link_credit(Receiver, 2, never), + receive {amqp10_msg, Receiver, M1} -> + ?assertEqual([<<"first msg">>], amqp10_msg:body(M1)), + ok = amqp10_client:accept_msg(Receiver, M1) + after 30000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, Receiver, M2} -> + ?assertEqual([<<"last msg">>], amqp10_msg:body(M2)), + ok = amqp10_client:accept_msg(Receiver, M2) + after 30000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = detach_link_sync(Receiver), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection). + +string_modifier(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(Stream), + {Connection, Session, LinkPair} = init(Config), + {ok, #{}} = rabbitmq_amqp_client:declare_queue( + LinkPair, + Stream, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{to => <<"abc 1">>, + reply_to => <<"abc 2">>, + subject => <<"abc 3">>, + group_id => <<"abc 4">>, + reply_to_group_id => <<"abc 5">>, + message_id => {utf8, <<"abc 6">>}, + correlation_id => <<"abc 7">>, + group_sequence => 16#ff_ff_ff_ff}, + amqp10_msg:set_application_properties( + #{<<"k1">> => <<"abc 8">>, + <<"k2">> => <<"abc 9">>}, + amqp10_msg:new(<<"t1">>, <<"m1">>)))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_application_properties( + #{<<"k1">> => <<"abc">>}, + amqp10_msg:new(<<"t2">>, <<"m2">>))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{subject => <<"$Hello">>, + reply_to_group_id => <<"xyz 5">>}, + amqp10_msg:new(<<"t3">>, <<"m3">>))), + + ok = wait_for_accepts(3), + ok = detach_link_sync(Sender), + flush(sent), + + PropsFilter1 = [ + {{symbol, <<"to">>}, {utf8, <<"$p:abc ">>}}, + {{symbol, <<"reply-to">>}, {utf8, <<"$p:abc">>}}, + {{symbol, <<"subject">>}, {utf8, <<"$p:ab">>}}, + {{symbol, <<"group-id">>}, {utf8, <<"$p:a">>}}, + {{symbol, <<"reply-to-group-id">>}, {utf8, <<"$s:5">>}}, + {{symbol, <<"correlation-id">>}, {utf8, <<"$s:abc 7">>}}, + {{symbol, <<"message-id">>}, {utf8, <<"$p:abc 6">>}} + ], + AppPropsFilter1 = [ + {{utf8, <<"k1">>}, {utf8, <<"$s: 8">>}}, + {{utf8, <<"k2">>}, {utf8, <<"$p:abc ">>}} + ], + Filter1 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter1}, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter1}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, Address, + settled, configuration, Filter1), + ok = amqp10_client:flow_link_credit(Receiver1, 10, never), + receive {amqp10_msg, Receiver1, R1M1} -> + ?assertEqual([<<"m1">>], amqp10_msg:body(R1M1)) + after 30000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = assert_no_msg_received(?LINE), + ok = detach_link_sync(Receiver1), + + %% Same filters as before except for subject which shouldn't match anymore. + PropsFilter2 = lists:keyreplace( + {symbol, <<"subject">>}, 1, PropsFilter1, + {{symbol, <<"subject">>}, {utf8, <<"$s:xxxxxxxxxxxxxx">>}}), + Filter2 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter2}, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter1}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, Address, + settled, configuration, Filter2), + ok = amqp10_client:flow_link_credit(Receiver2, 10, never), + ok = assert_no_msg_received(?LINE), + ok = detach_link_sync(Receiver2), + + PropsFilter3 = [{{symbol, <<"reply-to-group-id">>}, {utf8, <<"$s: 5">>}}], + Filter3 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter3}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver3} = amqp10_client:attach_receiver_link( + Session, <<"receiver 3">>, Address, + settled, configuration, Filter3), + ok = amqp10_client:flow_link_credit(Receiver3, 10, never), + receive {amqp10_msg, Receiver3, R3M1} -> + ?assertEqual([<<"m1">>], amqp10_msg:body(R3M1)) + after 30000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, Receiver3, R3M3} -> + ?assertEqual([<<"m3">>], amqp10_msg:body(R3M3)) + after 30000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = detach_link_sync(Receiver3), + + %% '$$" is the escape prefix for case-sensitive matching of a string starting with ‘&’ + PropsFilter4 = [{{symbol, <<"subject">>}, {utf8, <<"$$Hello">>}}], + Filter4 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter4}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver4} = amqp10_client:attach_receiver_link( + Session, <<"receiver 4">>, Address, + settled, configuration, Filter4), + {ok, R4M3} = amqp10_client:get_msg(Receiver4), + ?assertEqual([<<"m3">>], amqp10_msg:body(R4M3)), + ok = detach_link_sync(Receiver4), + + %% Starting the reference field value with $ is invalid without using a valid modifier + %% prefix is invalid. + %% RabbitMQ should exclude this filter in its reply attach frame because + %% "the sending endpoint [RabbitMQ] sets the filter actually in place". + %% Hence, no filter expression is actually in place and we should receive all messages. + PropsFilter5 = [{{symbol, <<"subject">>}, {utf8, <<"$Hello">>}}], + Filter5 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter5}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver5} = amqp10_client:attach_receiver_link( + Session, <<"receiver 5">>, Address, + settled, configuration, Filter5), + {ok, R5M1} = amqp10_client:get_msg(Receiver5), + ?assertEqual([<<"m1">>], amqp10_msg:body(R5M1)), + {ok, R5M2} = amqp10_client:get_msg(Receiver5), + ?assertEqual([<<"m2">>], amqp10_msg:body(R5M2)), + {ok, R5M3} = amqp10_client:get_msg(Receiver5), + ?assertEqual([<<"m3">>], amqp10_msg:body(R5M3)), + ok = detach_link_sync(Receiver5), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection). + +%% ------------------------------------------------------------------- +%% Helpers +%% ------------------------------------------------------------------- + +assert_no_msg_received(Line) -> + receive {amqp10_msg, _, _} = Msg -> + ct:fail({received_unexpected_msg, Line, Msg}) + after 10 -> + ok + end. diff --git a/deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl b/deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl index 801de565d125..ee109b9f9c56 100644 --- a/deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl +++ b/deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl @@ -584,9 +584,14 @@ ensure_rabbitmq_queues_cmd(Config) -> ensure_ssl_certs(Config) -> SrcDir = ?config(rabbitmq_ct_helpers_srcdir, Config), + UniqueDir = io_lib:format( + "~s2-~p", + [node(), erlang:unique_integer([positive,monotonic])]), CertsMakeDir = filename:join([SrcDir, "tools", "tls-certs"]), PrivDir = ?config(priv_dir, Config), - CertsDir = filename:join(PrivDir, "certs"), + CertsDir = filename:join([PrivDir, UniqueDir, "certs"]), + _ = filelib:ensure_dir(CertsDir), + _ = file:make_dir(CertsDir), CertsPwd = proplists:get_value(rmq_certspwd, Config, ?SSL_CERT_PASSWORD), Cmd = [ "PASSWORD=" ++ CertsPwd, diff --git a/deps/rabbitmq_management/test/clustering_SUITE.erl b/deps/rabbitmq_management/test/clustering_SUITE.erl index 0e9039cc3675..38d58e0d28ae 100644 --- a/deps/rabbitmq_management/test/clustering_SUITE.erl +++ b/deps/rabbitmq_management/test/clustering_SUITE.erl @@ -15,6 +15,7 @@ -include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -import(rabbit_ct_broker_helpers, [get_node_config/3, restart_node/2]). +-import(rabbit_ct_helpers, [eventually/3]). -import(rabbit_mgmt_test_util, [http_get/2, http_put/4, http_post/4, http_delete/3, http_delete/4]). -import(rabbit_misc, [pget/2]). @@ -68,7 +69,8 @@ merge_app_env(Config) -> Config1 = rabbit_ct_helpers:merge_app_env( Config, {rabbit, [ {collect_statistics, fine}, - {collect_statistics_interval, ?STATS_INTERVAL} + {collect_statistics_interval, ?STATS_INTERVAL}, + {core_metrics_gc_interval, 500} ]}), rabbit_ct_helpers:merge_app_env(Config1, {rabbitmq_management_agent, [ @@ -119,7 +121,8 @@ end_per_testcase(Testcase, Config) -> list_cluster_nodes_test(Config) -> %% see rmq_nodes_count in init_per_suite - ?assertEqual(2, length(http_get(Config, "/nodes"))), + eventually(?_assertEqual(2, length(http_get(Config, "/nodes"))), + 1000, 30), passed. qq_replicas_add(Config) -> @@ -216,15 +219,27 @@ queue_on_other_node(Config) -> {ok, Chan2} = amqp_connection:open_channel(?config(conn, Config)), consume(Chan2, <<"some-queue">>), - force_stats(Config), - ?awaitMatch([_], maps:get(consumer_details, http_get(Config, "/queues/%2F/some-queue")), 60000), - - Res = http_get(Config, "/queues/%2F/some-queue"), - % assert some basic data is present - [Cons] = maps:get(consumer_details, Res), - #{} = maps:get(channel_details, Cons), % channel details proplist must not be empty - 0 = maps:get(prefetch_count, Cons), % check one of the augmented properties - <<"some-queue">> = maps:get(name, Res), + ?awaitMatch([_], + begin + force_stats(Config), + maps:get(consumer_details, http_get(Config, "/queues/%2F/some-queue")) + end, + 60000), + + ?awaitMatch({#{}, 0, <<"some-queue">>}, + begin + Res = http_get(Config, "/queues/%2F/some-queue"), + %% assert some basic data is present + case maps:get(consumer_details, Res, undefined) of + [Cons] -> + {maps:get(channel_details, Cons, undefined), % channel details proplist must not be empty + maps:get(prefetch_count, Cons, undefined), % check one of the augmented properties + maps:get(name, Res, undefined)}; + Any -> + {unexpected_consumer_details, Any} + end + end, + 60000), http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT), @@ -252,21 +267,29 @@ queue_with_multiple_consumers(Config) -> amqp_channel:cast(Chan, #'basic.ack'{delivery_tag = T}) end, - force_stats(Config), + eventually(?_assertMatch( + {#{}, #{}, 0, 0, Q}, + begin + force_stats(Config), + Res = http_get(Config, "/queues/%2F/multi-consumer-queue1"), + %% assert some basic data is there + case maps:get(consumer_details, Res) of + [C1, C2] -> + %% channel details proplist must not be empty + {maps:get(channel_details, C1), + maps:get(channel_details, C2), + %% check one of the augmented properties + maps:get(prefetch_count, C1), + maps:get(prefetch_count, C2), + maps:get(name, Res)}; + Any -> + {unexpected_consumer_details, Any} + end + end), + 1000, 60), - Res = http_get(Config, "/queues/%2F/multi-consumer-queue1"), http_delete(Config, "/queues/%2F/multi-consumer-queue1", ?NO_CONTENT), - % assert some basic data is there - [C1, C2] = maps:get(consumer_details, Res), - % channel details proplist must not be empty - #{} = maps:get(channel_details, C1), - #{} = maps:get(channel_details, C2), - % check one of the augmented properties - 0 = maps:get(prefetch_count, C1), - 0 = maps:get(prefetch_count, C2), - Q = maps:get(name, Res), - amqp_channel:close(Chan), amqp_channel:close(Chan2), rabbit_ct_client_helpers:close_connection(Conn), @@ -282,14 +305,19 @@ queue_consumer_cancelled(Config) -> #'basic.cancel_ok'{} = amqp_channel:call(Chan, #'basic.cancel'{consumer_tag = Tag}), - force_stats(Config), - Res = http_get(Config, "/queues/%2F/some-queue"), + eventually(?_assertMatch( + {[], <<"some-queue">>}, + begin + force_stats(Config), + Res = http_get(Config, "/queues/%2F/some-queue"), + %% assert there are no consumer details + {maps:get(consumer_details, Res), + maps:get(name, Res)} + end), + 1000, 60), amqp_channel:close(Chan), - % assert there are no consumer details - [] = maps:get(consumer_details, Res), - <<"some-queue">> = maps:get(name, Res), http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT), ok. @@ -346,12 +374,17 @@ queues_single(Config) -> http_put(Config, "/queues/%2F/some-queue", none, [?CREATED, ?NO_CONTENT]), _ = wait_for_queue(Config, "/queues/%2F/some-queue"), - force_stats(Config), - Res = http_get(Config, "/queues/%2F"), - http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT), + eventually(?_assertMatch( + true, + begin + force_stats(Config), + Res = http_get(Config, "/queues/%2F"), + %% assert at least one queue is returned + length(Res) >= 1 + end), + 1000, 60), - % assert at least one queue is returned - ?assert(length(Res) >= 1), + http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT), ok. @@ -362,16 +395,25 @@ queues_multiple(Config) -> _ = wait_for_queue(Config, "/queues/%2F/some-queue"), _ = wait_for_queue(Config, "/queues/%2F/some-other-queue"), - force_stats(Config), - - Res = http_get(Config, "/queues/%2F"), - [Q1, Q2 | _] = Res, + eventually(?_assertNot( + begin + force_stats(Config), + case http_get(Config, "/queues/%2F") of + [Q1, Q2 | _] -> + %% assert some basic data is present + ct:pal("Name q1 ~p q2 ~p", + [maps:get(name, Q1), + maps:get(name, Q2)]), + maps:get(name, Q1) =:= maps:get(name, Q2); + Any -> + {unexpected_queues, Any} + end + end), + 1000, 60), - % assert some basic data is present http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT), http_delete(Config, "/queues/%2F/some-other-queue", ?NO_CONTENT), - false = (maps:get(name, Q1) =:= maps:get(name, Q2)), amqp_channel:close(Chan), ok. @@ -381,8 +423,13 @@ queues_removed(Config) -> force_stats(Config), N = length(http_get(Config, "/queues/%2F")), http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT), - force_stats(Config), - ?assertEqual(N - 1, length(http_get(Config, "/queues/%2F"))), + eventually(?_assertEqual( + N - 1, + begin + force_stats(Config), + length(http_get(Config, "/queues/%2F")) + end), + 1000, 60), ok. channels_multiple_on_different_nodes(Config) -> @@ -395,11 +442,12 @@ channels_multiple_on_different_nodes(Config) -> {ok, Chan2} = amqp_connection:open_channel(Conn2), consume(Chan, <<"some-queue">>), - force_stats(Config), - % assert two channels are present ?awaitMatch([_,_], - http_get(Config, "/channels"), + begin + force_stats(Config), + http_get(Config, "/channels") + end, 30000), http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT), @@ -422,10 +470,9 @@ channel_closed(Config) -> consume(Chan2, <<"some-queue">>), amqp_channel:close(Chan), - force_stats(Config), - rabbit_ct_helpers:await_condition( fun() -> + force_stats(Config), %% assert one channel is present length(http_get(Config, "/channels")) == 1 end, @@ -442,11 +489,15 @@ channel(Config) -> [{_, ChData}] = rabbit_ct_broker_helpers:rpc(Config, 0, ets, tab2list, [channel_created]), ChName = uri_string:recompose(#{path => binary_to_list(pget(name, ChData))}), - - force_stats(Config), - Res = http_get(Config, "/channels/" ++ ChName ), - % assert channel is non empty - #{} = Res, + + eventually(?_assertMatch( + #{}, + begin + force_stats(Config), + %% assert channel is non empty + http_get(Config, "/channels/" ++ ChName ) + end), + 1000, 60), amqp_channel:close(Chan), ok. @@ -462,14 +513,20 @@ channel_other_node(Config) -> consume(Chan, Q), publish(Chan, Q), - wait_for_collect_statistics_interval(), - force_stats(Config), - - Res = http_get(Config, "/channels/" ++ ChName ), - % assert channel is non empty - #{} = Res, - [#{}] = maps:get(deliveries, Res), - #{} = maps:get(connection_details, Res), + eventually(?_assertMatch( + {[#{}], #{}}, + begin + force_stats(Config), + case http_get(Config, "/channels/" ++ ChName) of + %% assert channel is non empty + #{} = Res -> + {maps:get(deliveries, Res), + maps:get(connection_details, Res)}; + Any -> + {unexpected_channels, Any} + end + end), + 1000, 60), http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT), amqp_connection:close(Conn), @@ -486,14 +543,21 @@ channel_with_consumer_on_other_node(Config) -> consume(Chan, Q), publish(Chan, Q), - force_stats(Config), + eventually(?_assertMatch( + [#{}], + begin + force_stats(Config), + case http_get(Config, "/channels/" ++ ChName) of + %% assert channel is non empty + #{} = Res -> + maps:get(consumer_details, Res); + Any -> + {unexpected_channels, Any} + end + end), + 1000, 60), - Res = http_get(Config, "/channels/" ++ ChName), http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT), - % assert channel is non empty - #{} = Res, - [#{}] = maps:get(consumer_details, Res), - amqp_channel:close(Chan), ok. @@ -507,15 +571,19 @@ channel_with_consumer_on_one_node(Config) -> ChName = get_channel_name(Config, 0), consume(Chan, Q), - force_stats(Config), + eventually(?_assertMatch( + [#{}], + begin + force_stats(Config), + Res = http_get(Config, "/channels/" ++ ChName), + %% assert channel is non empty + maps:get(consumer_details, Res) + end), + 1000, 60), - Res = http_get(Config, "/channels/" ++ ChName), amqp_channel:close(Chan), http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT), - % assert channel is non empty - #{} = Res, - [#{}] = maps:get(consumer_details, Res), ok. consumers(Config) -> @@ -529,13 +597,20 @@ consumers(Config) -> consume(Chan, <<"some-queue">>), consume(Chan2, <<"some-queue">>), - force_stats(Config), - Res = http_get(Config, "/consumers"), - - % assert there are two non-empty consumer records - [#{} = C1, #{} = C2] = Res, - #{} = maps:get(channel_details, C1), - #{} = maps:get(channel_details, C2), + %% assert there are two non-empty consumer records + eventually(?_assertMatch([#{}, #{}], + begin + force_stats(Config), + http_get(Config, "/consumers") + end), + 1000, 30), + eventually(?_assertMatch([#{}, #{}], + begin + [C1, C2] = http_get(Config, "/consumers"), + [maps:get(channel_details, C1), + maps:get(channel_details, C2)] + end), + 1000, 30), http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT), @@ -553,16 +628,19 @@ connections(Config) -> Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0), {ok, _Chan2} = amqp_connection:open_channel(Conn2), - %% channel count needs a bit longer for 2nd chan - wait_for_collect_statistics_interval(), - force_stats(Config), - - Res = http_get(Config, "/connections"), - - % assert there are two non-empty connection records - [#{} = C1, #{} = C2] = Res, - 1 = maps:get(channels, C1), - 1 = maps:get(channels, C2), + %% assert there are two non-empty connection records + eventually(?_assertMatch([1, 1], + begin + force_stats(Config), + case http_get(Config, "/connections") of + [#{} = C1, #{} = C2] -> + [maps:get(channels, C1), + maps:get(channels, C2)]; + Any -> + {unexpected_connections, Any} + end + end), + 1000, 30), amqp_channel:close(Chan), rabbit_ct_client_helpers:close_connection(Conn2), @@ -582,11 +660,13 @@ exchanges(Config) -> consume(Chan, QName), publish_to(Chan, XName, <<"some-key">>), - force_stats(Config), - Res = http_get(Config, "/exchanges"), - [X] = [X || X <- Res, maps:get(name, X) =:= XName], - - ?assertEqual(<<"direct">>, maps:get(type, X)), + eventually(?_assertEqual([<<"direct">>], + begin + force_stats(Config), + Res = http_get(Config, "/exchanges"), + [maps:get(type, X) || X <- Res, maps:get(name, X) =:= XName] + end), + 1000, 30), amqp_channel:close(Chan), rabbit_ct_client_helpers:close_connection(Conn), @@ -606,11 +686,12 @@ exchange(Config) -> consume(Chan, QName), publish_to(Chan, XName, <<"some-key">>), - force_stats(Config), - force_stats(Config), - Res = http_get(Config, "/exchanges/%2F/some-other-exchange"), - - ?assertEqual(<<"direct">>, maps:get(type, Res)), + eventually(?_assertEqual(<<"direct">>, + begin + force_stats(Config), + maps:get(type, http_get(Config, "/exchanges/%2F/some-other-exchange")) + end), + 1000, 30), amqp_channel:close(Chan), rabbit_ct_client_helpers:close_connection(Conn), @@ -627,15 +708,21 @@ vhosts(Config) -> {ok, Chan2} = amqp_connection:open_channel(Conn2), publish(Chan2, <<"some-queue">>), - wait_for_collect_statistics_interval(), - force_stats(Config), - Res = http_get(Config, "/vhosts"), + eventually(?_assertMatch(#{}, + begin + force_stats(Config), + %% default vhost + case http_get(Config, "/vhosts") of + [#{} = Vhost] -> + %% assert vhost has some message stats + maps:get(message_stats, Vhost); + Any -> + {unexpected_vhosts, Any} + end + end), + 1000, 30), http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT), - % default vhost - [#{} = Vhost] = Res, - % assert vhost has some message stats - #{} = maps:get(message_stats, Vhost), amqp_channel:close(Chan), amqp_channel:close(Chan2), @@ -652,16 +739,22 @@ nodes(Config) -> {ok, Chan2} = amqp_connection:open_channel(Conn), publish(Chan2, <<"some-queue">>), - wait_for_collect_statistics_interval(), - force_stats(Config), - Res = http_get(Config, "/nodes"), - http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT), + eventually(?_assertMatch({true, true, [#{} | _], [#{} | _]}, + begin + force_stats(Config), + case http_get(Config, "/nodes") of + [#{} = N1 , #{} = N2] -> + {is_binary(maps:get(name, N1)), + is_binary(maps:get(name, N2)), + maps:get(cluster_links, N1), + maps:get(cluster_links, N2)}; + Any -> + {unexpected_nodes, Any} + end + end), + 1000, 30), - [#{} = N1 , #{} = N2] = Res, - ?assert(is_binary(maps:get(name, N1))), - ?assert(is_binary(maps:get(name, N2))), - [#{} | _] = maps:get(cluster_links, N1), - [#{} | _] = maps:get(cluster_links, N2), + http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT), amqp_channel:close(Chan), amqp_channel:close(Chan2), @@ -681,27 +774,32 @@ overview(Config) -> {ok, Chan2} = amqp_connection:open_channel(Conn2), publish(Chan, <<"queue-n1">>), publish(Chan2, <<"queue-n2">>), - wait_for_collect_statistics_interval(), - force_stats(Config), % channel count needs a bit longer for 2nd chan - Res = http_get(Config, "/overview"), + + eventually(?_assertMatch( + {true, true, true, true, 2, 2, 0, 2, 0, 0}, + begin + force_stats(Config), % channel count needs a bit longer for 2nd chan + Res = http_get(Config, "/overview"), + %% assert there are two non-empty connection records + ObjTots = maps:get(object_totals, Res), + QT = maps:get(queue_totals, Res), + MS = maps:get(message_stats, Res), + ChurnRates = maps:get(churn_rates, Res), + {maps:get(connections, ObjTots) >= 2, + maps:get(channels, ObjTots) >= 2, + maps:get(messages_ready, QT) >= 2, + maps:get(publish, MS) >= 2, + maps:get(queue_declared, ChurnRates), + maps:get(queue_created, ChurnRates), + maps:get(queue_deleted, ChurnRates), + maps:get(channel_created, ChurnRates), + maps:get(channel_closed, ChurnRates), + maps:get(connection_closed, ChurnRates)} + end), + 1000, 60), http_delete(Config, "/queues/%2F/queue-n1", ?NO_CONTENT), http_delete(Config, "/queues/%2F/queue-n2", ?NO_CONTENT), - % assert there are two non-empty connection records - ObjTots = maps:get(object_totals, Res), - ?assert(maps:get(connections, ObjTots) >= 2), - ?assert(maps:get(channels, ObjTots) >= 2), - #{} = QT = maps:get(queue_totals, Res), - ?assert(maps:get(messages_ready, QT) >= 2), - MS = maps:get(message_stats, Res), - ?assert(maps:get(publish, MS) >= 2), - ChurnRates = maps:get(churn_rates, Res), - ?assertEqual(maps:get(queue_declared, ChurnRates), 2), - ?assertEqual(maps:get(queue_created, ChurnRates), 2), - ?assertEqual(maps:get(queue_deleted, ChurnRates), 0), - ?assertEqual(maps:get(channel_created, ChurnRates), 2), - ?assertEqual(maps:get(channel_closed, ChurnRates), 0), - ?assertEqual(maps:get(connection_closed, ChurnRates), 0), amqp_channel:close(Chan), amqp_channel:close(Chan2), @@ -877,6 +975,3 @@ listener_proto(Proto) when is_atom(Proto) -> %% rabbit:status/0 used this formatting before rabbitmq/rabbitmq-cli#340 listener_proto({Proto, _Port, _Interface}) -> Proto. - -wait_for_collect_statistics_interval() -> - timer:sleep(?STATS_INTERVAL * 2). diff --git a/deps/rabbitmq_management/test/clustering_prop_SUITE.erl b/deps/rabbitmq_management/test/clustering_prop_SUITE.erl index e006bad9077b..df27571f043a 100644 --- a/deps/rabbitmq_management/test/clustering_prop_SUITE.erl +++ b/deps/rabbitmq_management/test/clustering_prop_SUITE.erl @@ -121,9 +121,11 @@ prop_connection_channel_counts(Config) -> 60), cleanup(Cons), rabbit_ct_helpers:await_condition( - fun () -> validate_counts(Config, []) end, + fun () -> + cleanup(Cons), + force_stats(Config), + validate_counts(Config, []) end, 60000), - force_stats(Config), Res end). diff --git a/deps/rabbitmq_mqtt/test/auth_SUITE.erl b/deps/rabbitmq_mqtt/test/auth_SUITE.erl index d151af003a71..72a9be726090 100644 --- a/deps/rabbitmq_mqtt/test/auth_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/auth_SUITE.erl @@ -393,6 +393,7 @@ end_per_testcase(Testcase, Config) when Testcase == ssl_user_auth_success; Testcase == ssl_user_auth_failure; Testcase == ssl_user_vhost_not_allowed -> delete_cert_user(Config), + close_all_connections(Config), rabbit_ct_helpers:testcase_finished(Config, Testcase); end_per_testcase(TestCase, Config) when TestCase == ssl_user_vhost_parameter_mapping_success; TestCase == ssl_user_vhost_parameter_mapping_not_allowed -> @@ -400,14 +401,17 @@ end_per_testcase(TestCase, Config) when TestCase == ssl_user_vhost_parameter_map VhostForCertUser = ?config(temp_vhost_for_ssl_user, Config), ok = rabbit_ct_broker_helpers:delete_vhost(Config, VhostForCertUser), ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_default_vhosts), + close_all_connections(Config), rabbit_ct_helpers:testcase_finished(Config, TestCase); end_per_testcase(user_credentials_auth, Config) -> User = ?config(new_user, Config), {ok,_} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["delete_user", User]), + close_all_connections(Config), rabbit_ct_helpers:testcase_finished(Config, user_credentials_auth); end_per_testcase(ssl_user_vhost_parameter_mapping_vhost_does_not_exist, Config) -> delete_cert_user(Config), ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_default_vhosts), + close_all_connections(Config), rabbit_ct_helpers:testcase_finished(Config, ssl_user_vhost_parameter_mapping_vhost_does_not_exist); end_per_testcase(Testcase, Config) when Testcase == port_vhost_mapping_success; Testcase == port_vhost_mapping_not_allowed; @@ -417,11 +421,13 @@ end_per_testcase(Testcase, Config) when Testcase == port_vhost_mapping_success; VHost = ?config(temp_vhost_for_port_mapping, Config), ok = rabbit_ct_broker_helpers:delete_vhost(Config, VHost), ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_port_to_vhost_mapping), + close_all_connections(Config), rabbit_ct_helpers:testcase_finished(Config, Testcase); end_per_testcase(T = port_vhost_mapping_vhost_does_not_exist, Config) -> User = <<"guest">>, ok = set_full_permissions(Config, User, <<"/">>), ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_port_to_vhost_mapping), + close_all_connections(Config), rabbit_ct_helpers:testcase_finished(Config, T); end_per_testcase(T = ssl_user_cert_vhost_mapping_takes_precedence_over_port_vhost_mapping, Config) -> delete_cert_user(Config), @@ -432,6 +438,7 @@ end_per_testcase(T = ssl_user_cert_vhost_mapping_takes_precedence_over_port_vhos VHostForPortVHostMapping = ?config(temp_vhost_for_port_mapping, Config), ok = rabbit_ct_broker_helpers:delete_vhost(Config, VHostForPortVHostMapping), ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_port_to_vhost_mapping), + close_all_connections(Config), rabbit_ct_helpers:testcase_finished(Config, T); end_per_testcase(T, Config) when T == queue_bind_permission; T == queue_unbind_permission; @@ -459,6 +466,8 @@ end_per_testcase(T, Config) when T == queue_bind_permission; %% And provide an empty log file for the next test in this group file:write_file(?config(log_location, Config), <<>>), + close_all_connections(Config), + rabbit_ct_helpers:testcase_finished(Config, T); end_per_testcase(T, Config) @@ -469,11 +478,17 @@ end_per_testcase(T, Config) T =:= client_id_from_cert_san_email; T =:= client_id_from_cert_dn -> SetupProcess = ?config(mock_setup_process, Config), - SetupProcess ! stop; + SetupProcess ! stop, + close_all_connections(Config); end_per_testcase(Testcase, Config) -> + close_all_connections(Config), rabbit_ct_helpers:testcase_finished(Config, Testcase). +close_all_connections(Config) -> + rpc(Config, 0, rabbit_mqtt, close_local_client_connections, + [end_per_testcase]). + delete_cert_user(Config) -> User = ?config(temp_ssl_user, Config), {ok,_} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["delete_user", User]). diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl index 6b7d7fa80255..2101d9039c26 100644 --- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl @@ -1235,7 +1235,6 @@ management_plugin_connection(Config) -> eventually(?_assertEqual([], all_connection_pids(Config)), 500, 3). management_plugin_enable(Config) -> - ?assertEqual(0, length(http_get(Config, "/connections"))), ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0, rabbitmq_management), ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0, rabbitmq_management_agent), diff --git a/deps/rabbitmq_mqtt/test/v5_SUITE.erl b/deps/rabbitmq_mqtt/test/v5_SUITE.erl index ed22a599280d..8b252003b3c3 100644 --- a/deps/rabbitmq_mqtt/test/v5_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/v5_SUITE.erl @@ -1071,15 +1071,15 @@ send(Parent, Client, Topic, NumSent) -> end. assert_received_no_duplicates() -> - assert_received_no_duplicates0(#{}). + assert_received_no_duplicates0(#{}, 30000). -assert_received_no_duplicates0(Received) -> +assert_received_no_duplicates0(Received, Timeout) -> receive {publish, #{payload := P}} -> case maps:is_key(P, Received) of true -> ct:fail("Received ~p twice", [P]); - false -> assert_received_no_duplicates0(maps:put(P, ok, Received)) + false -> assert_received_no_duplicates0(maps:put(P, ok, Received), 500) end - after 500 -> + after Timeout -> %% Check that we received at least one message. ?assertNotEqual(0, maps:size(Received)) end. diff --git a/selenium/test/basic-auth/ac-management.js b/selenium/test/basic-auth/ac-management.js index d5282d386b82..a07484d0f0c1 100644 --- a/selenium/test/basic-auth/ac-management.js +++ b/selenium/test/basic-auth/ac-management.js @@ -50,8 +50,11 @@ describe('management user with vhosts permissions', function () { assert.ok(!await overview.isPopupWarningDisplayed()) }) it('can access limited options in admin tab', async function () { + console.log("before clickOnAdminTab") await overview.clickOnAdminTab() + console.log("before waitForAdminTab") await overview.waitForAdminTab() + console.log("after waitForAdminTab") assert.ok(!await overview.isPopupWarningDisplayed()) }) @@ -59,7 +62,7 @@ describe('management user with vhosts permissions', function () { await overview.clickOnAdminTab() await admin.clickOnLimits() await limits.list_virtual_host_limits() - assert.rejects(limits.list_user_limits()) + assert.rejects(await limits.list_user_limits()) })