Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
bbcd04d
feature_flags_SUITE: Fix style
dumbbell Jul 9, 2025
f973932
quorum_queue_SUITE: Use Khepri fence before checking number of replicas
dumbbell Jul 10, 2025
1582ae6
quorum_queue_SUITE: Use less messages in `force_checkpoint_on_queue`
dumbbell Jul 10, 2025
ab76698
cluster_minority_SUITE: Ensure cluster can be changed before partition
dumbbell Jul 10, 2025
f613743
per_user_connection_channel_limit_SUITE: Fix test flake in `single_no…
dumbbell Jul 11, 2025
22c0959
queue_type_SUITE: Be explicit about connection open+close
dumbbell Jul 11, 2025
0bc2d8b
dynamic_SUITE: Be explicit about connection open+close
dumbbell Jul 11, 2025
c672935
rabbit_prometheus_http_SUITE: Use another Erlang metric
dumbbell Jul 11, 2025
37b7a2a
backing_queue_SUITE: Increase the restart time boundary
mkuratczyk Jul 14, 2025
6111c27
per_node_limit_SUITE: Wait for the channel count to be up-to-date
dumbbell Jul 15, 2025
5aab965
auth_SUITE: Wait for connection tracking to be up-to-date
dumbbell Jul 15, 2025
ce1545d
java_SUITE: Add missing error handling
dumbbell Jul 15, 2025
8307aa6
v5_SUITE: session_upgrade_v3_v5_qos1
ansd Jul 16, 2025
ffaf919
amqp_client_SUITE: Trim "list_connections" output before parsing it
dumbbell Jul 16, 2025
83b8a6b
amqp_client_SUITE: Ignore meck return value in `idle_time_out_on_serv…
dumbbell Jul 30, 2025
a44d541
metrics_SUITE: Wait for ETS table to be up-to-date
dumbbell Jul 16, 2025
0fb74ba
rabbit_stream_queue_SUITE: Wait for replicas in `shrink_coordinator_c…
dumbbell Jul 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4087,7 +4087,7 @@ list_connections(Config) ->

%% CLI should list AMQP 1.0 container-id
{ok, StdOut1} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["list_connections", "--silent", "container_id"]),
ContainerIds0 = re:split(StdOut1, <<"\n">>, [trim]),
ContainerIds0 = re:split(string:trim(StdOut1), <<"\n">>, [trim]),
ContainerIds = lists:sort(ContainerIds0),
?assertEqual([<<>>, ContainerId0, ContainerId2],
ContainerIds),
Expand Down Expand Up @@ -4749,7 +4749,7 @@ idle_time_out_on_server(Config) ->
ct:fail({missing_event, ?LINE})
end
after
?assert(rpc(Config, meck, validate, [Mod])),
_ = rpc(Config, meck, validate, [Mod]),
ok = rpc(Config, meck, unload, [Mod]),
ok = rpc(Config, application, set_env, [App, Par, DefaultVal])
end.
Expand Down
8 changes: 4 additions & 4 deletions deps/rabbit/test/backing_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1445,18 +1445,18 @@ variable_queue_restart_large_seq_id2(VQ0, QName) ->
Terms = variable_queue_read_terms(QName),
Count = proplists:get_value(next_seq_id, Terms),

%% set a very high next_seq_id as if 100M messages have been
%% set a very high next_seq_id as if 100 billion messages have been
%% published and consumed
Terms2 = lists:keyreplace(next_seq_id, 1, Terms, {next_seq_id, 100_000_000}),
Terms2 = lists:keyreplace(next_seq_id, 1, Terms, {next_seq_id, 100_000_000_000}),

{TInit, VQ3} =
timer:tc(
fun() -> variable_queue_init(test_amqqueue(QName, true), Terms2) end,
millisecond),
%% even with a very high next_seq_id start of an empty queue
%% should be quick (few milliseconds, but let's give it 100ms, to
%% should be quick (few milliseconds, but let's give it 500ms, to
%% avoid flaking on slow servers)
{true, _} = {TInit < 100, TInit},
{true, _} = {TInit < 500, TInit},

%% should be empty now
true = rabbit_variable_queue:is_empty(VQ3),
Expand Down
75 changes: 34 additions & 41 deletions deps/rabbit/test/cluster_minority_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -387,28 +387,26 @@ remove_node_when_seed_node_is_leader(Config) ->
AMember = {rabbit_khepri:get_store_id(), A},
ra:transfer_leadership(AMember, AMember),
clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster),
ct:pal("Waiting for cluster change permitted on node A"),
?awaitMatch(
{ok, #{cluster_change_permitted := true,
leader_id := AMember}, AMember},
rabbit_ct_broker_helpers:rpc(
Config1, A, ra, member_overview, [AMember]),
60000),
{ok, Overview, AMember} = rabbit_ct_broker_helpers:rpc(
Config1, A, ra, member_overview, [AMember]),
ct:pal("Member A overview: ~p", [maps:remove(machine, Overview)]),

%% Minority partition: A
partition_3_node_cluster(Config1),

Pong = ra:ping(AMember, 10000),
ct:pal("Member A state: ~0p", [Pong]),
case Pong of
{pong, leader} ->
?awaitMatch(
{ok, #{cluster_change_permitted := true}, _},
rabbit_ct_broker_helpers:rpc(
Config1, A, ra, member_overview, [AMember]),
60000),
?awaitMatch(
ok,
rabbit_control_helper:command(
forget_cluster_node, A, [atom_to_list(B)], []),
60000);
Ret ->
ct:pal("A is not the expected leader: ~p", [Ret]),
{skip, "Node A was not a leader"}
end.
?assertEqual({pong, leader}, ra:ping(AMember, 10000)),
?awaitMatch(
ok,
rabbit_control_helper:command(
forget_cluster_node, A, [atom_to_list(B)], []),
60000).

remove_node_when_seed_node_is_follower(Config) ->
[A, B, C | _] = rabbit_ct_broker_helpers:get_node_configs(
Expand All @@ -418,36 +416,31 @@ remove_node_when_seed_node_is_follower(Config) ->
Cluster = [A, B, C],
Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Cluster),

AMember = {rabbit_khepri:get_store_id(), A},
CMember = {rabbit_khepri:get_store_id(), C},
ra:transfer_leadership(CMember, CMember),
clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster),
?awaitMatch(
{ok, #{cluster_change_permitted := true,
leader_id := CMember}, AMember},
rabbit_ct_broker_helpers:rpc(
Config1, A, ra, member_overview, [AMember]),
60000),
{ok, Overview, AMember} = rabbit_ct_broker_helpers:rpc(
Config1, A, ra, member_overview, [AMember]),
ct:pal("Member A overview: ~p", [maps:remove(machine, Overview)]),

%% Minority partition: A
partition_3_node_cluster(Config1),

AMember = {rabbit_khepri:get_store_id(), A},
Pong = ra:ping(AMember, 10000),
ct:pal("Member A state: ~0p", [Pong]),
case Pong of
{pong, State}
when State =:= follower orelse State =:= pre_vote ->
Ret = rabbit_control_helper:command(
forget_cluster_node, A, [atom_to_list(B)], []),
?assertMatch({error, _, _}, Ret),
{error, _, Msg} = Ret,
?assertEqual(
match,
re:run(
Msg, "Khepri cluster could be in minority",
[{capture, none}]));
{pong, await_condition} ->
Ret = rabbit_control_helper:command(
forget_cluster_node, A, [atom_to_list(B)], []),
?assertMatch(ok, Ret);
Ret ->
ct:pal("A is not the expected leader: ~p", [Ret]),
{skip, "Node A was not a leader"}
end.
Ret = rabbit_control_helper:command(
forget_cluster_node, A, [atom_to_list(B)], []),
?assertMatch({error, _, _}, Ret),
{error, _, Msg} = Ret,
?assertEqual(
match,
re:run(
Msg, "Khepri cluster could be in minority", [{capture, none}])).

enable_feature_flag(Config) ->
[A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Expand Down
35 changes: 21 additions & 14 deletions deps/rabbit/test/feature_flags_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%% Copyright (c) 2019-2025 Broadcom. All Rights Reserved. The term “Broadcom”
%% refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(feature_flags_SUITE).
Expand Down Expand Up @@ -197,14 +198,15 @@ init_per_group(clustering, Config) ->
{rmq_nodes_clustered, false},
{start_rmq_with_plugins_disabled, true}]),
Config2 = rabbit_ct_helpers:merge_app_env(
Config1, {rabbit, [{forced_feature_flags_on_init, [
restart_streams,
stream_sac_coordinator_unblock_group,
stream_update_config_command,
stream_filtering,
message_containers,
quorum_queue_non_voters
]}]}),
Config1, {rabbit, [{forced_feature_flags_on_init,
[
restart_streams,
stream_sac_coordinator_unblock_group,
stream_update_config_command,
stream_filtering,
message_containers,
quorum_queue_non_voters
]}]}),
rabbit_ct_helpers:run_setup_steps(Config2, [fun prepare_my_plugin/1]);
init_per_group(activating_plugin, Config) ->
Config1 = rabbit_ct_helpers:set_config(
Expand All @@ -219,7 +221,8 @@ init_per_group(_, Config) ->
end_per_group(_, Config) ->
Config.

init_per_testcase(enable_feature_flag_when_ff_file_is_unwritable = Testcase, Config) ->
init_per_testcase(
enable_feature_flag_when_ff_file_is_unwritable = Testcase, Config) ->
case erlang:system_info(otp_release) of
"26" ->
{skip, "Hits a crash in Mnesia fairly frequently"};
Expand Down Expand Up @@ -1284,11 +1287,13 @@ activating_plugin_with_new_ff_enabled(Config) ->
ok.

enable_plugin_feature_flag_after_deactivating_plugin(Config) ->
case rabbit_ct_broker_helpers:is_feature_flag_enabled(Config, 'rabbitmq_4.0.0') of
RabbitMQ40Enabled = rabbit_ct_broker_helpers:is_feature_flag_enabled(
Config, 'rabbitmq_4.0.0'),
case RabbitMQ40Enabled of
true ->
ok;
false ->
throw({skip, "this test triggers a bug present in 3.13"})
throw({skip, "This test triggers a bug present in 3.13"})
end,

FFSubsysOk = is_feature_flag_subsystem_available(Config),
Expand Down Expand Up @@ -1321,11 +1326,13 @@ enable_plugin_feature_flag_after_deactivating_plugin(Config) ->
ok.

restart_node_with_unknown_enabled_feature_flag(Config) ->
case rabbit_ct_broker_helpers:is_feature_flag_enabled(Config, 'rabbitmq_4.0.0') of
RabbitMQ40Enabled = rabbit_ct_broker_helpers:is_feature_flag_enabled(
Config, 'rabbitmq_4.0.0'),
case RabbitMQ40Enabled of
true ->
ok;
false ->
throw({skip, "this test triggers a bug present in 3.13"})
throw({skip, "This test triggers a bug present in 3.13"})
end,

FFSubsysOk = is_feature_flag_subsystem_available(Config),
Expand Down
54 changes: 27 additions & 27 deletions deps/rabbit/test/metrics_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,9 @@ add_rem_counter(Config, {Initial, Ops}, {AddFun, RemFun}, Tables) ->

connection(Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
[_] = read_table_rpc(Config, connection_created),
[_] = read_table_rpc(Config, connection_metrics),
[_] = read_table_rpc(Config, connection_coarse_metrics),
?awaitMatch([_], read_table_rpc(Config, connection_created), 30000),
?awaitMatch([_], read_table_rpc(Config, connection_metrics), 30000),
?awaitMatch([_], read_table_rpc(Config, connection_coarse_metrics), 30000),
ok = rabbit_ct_client_helpers:close_connection(Conn),
force_metric_gc(Config),
?awaitMatch([], read_table_rpc(Config, connection_created),
Expand All @@ -317,25 +317,25 @@ connection(Config) ->
channel(Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
{ok, Chan} = amqp_connection:open_channel(Conn),
[_] = read_table_rpc(Config, channel_created),
[_] = read_table_rpc(Config, channel_metrics),
[_] = read_table_rpc(Config, channel_process_metrics),
?awaitMatch([_], read_table_rpc(Config, channel_created), 30000),
?awaitMatch([_], read_table_rpc(Config, channel_metrics), 30000),
?awaitMatch([_], read_table_rpc(Config, channel_process_metrics), 30000),
ok = amqp_channel:close(Chan),
[] = read_table_rpc(Config, channel_created),
[] = read_table_rpc(Config, channel_metrics),
[] = read_table_rpc(Config, channel_process_metrics),
?awaitMatch([], read_table_rpc(Config, channel_created), 30000),
?awaitMatch([], read_table_rpc(Config, channel_metrics), 30000),
?awaitMatch([], read_table_rpc(Config, channel_process_metrics), 30000),
ok = rabbit_ct_client_helpers:close_connection(Conn).

channel_connection_close(Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
{ok, _} = amqp_connection:open_channel(Conn),
[_] = read_table_rpc(Config, channel_created),
[_] = read_table_rpc(Config, channel_metrics),
[_] = read_table_rpc(Config, channel_process_metrics),
?awaitMatch([_], read_table_rpc(Config, channel_created), 30000),
?awaitMatch([_], read_table_rpc(Config, channel_metrics), 30000),
?awaitMatch([_], read_table_rpc(Config, channel_process_metrics), 30000),
ok = rabbit_ct_client_helpers:close_connection(Conn),
[] = read_table_rpc(Config, channel_created),
[] = read_table_rpc(Config, channel_metrics),
[] = read_table_rpc(Config, channel_process_metrics).
?awaitMatch([], read_table_rpc(Config, channel_created), 30000),
?awaitMatch([], read_table_rpc(Config, channel_metrics), 30000),
?awaitMatch([], read_table_rpc(Config, channel_process_metrics), 30000).

channel_queue_delete_queue(Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
Expand All @@ -344,14 +344,14 @@ channel_queue_delete_queue(Config) ->
ensure_exchange_metrics_populated(Chan, Queue),
ensure_channel_queue_metrics_populated(Chan, Queue),
force_channel_stats(Config),
[_] = read_table_rpc(Config, channel_queue_metrics),
[_] = read_table_rpc(Config, channel_queue_exchange_metrics),
?awaitMatch([_], read_table_rpc(Config, channel_queue_metrics), 30000),
?awaitMatch([_], read_table_rpc(Config, channel_queue_exchange_metrics), 30000),

delete_queue(Chan, Queue),
force_metric_gc(Config),
% ensure removal of queue cleans up channel_queue metrics
[] = read_table_rpc(Config, channel_queue_exchange_metrics),
[] = read_table_rpc(Config, channel_queue_metrics),
?awaitMatch([], read_table_rpc(Config, channel_queue_exchange_metrics), 30000),
?awaitMatch([], read_table_rpc(Config, channel_queue_metrics), 30000),
ok = rabbit_ct_client_helpers:close_connection(Conn),
ok.

Expand All @@ -362,26 +362,26 @@ channel_queue_exchange_consumer_close_connection(Config) ->
ensure_exchange_metrics_populated(Chan, Queue),
force_channel_stats(Config),

[_] = read_table_rpc(Config, channel_exchange_metrics),
[_] = read_table_rpc(Config, channel_queue_exchange_metrics),
?awaitMatch([_], read_table_rpc(Config, channel_exchange_metrics), 30000),
?awaitMatch([_], read_table_rpc(Config, channel_queue_exchange_metrics), 30000),

ensure_channel_queue_metrics_populated(Chan, Queue),
force_channel_stats(Config),
[_] = read_table_rpc(Config, channel_queue_metrics),
?awaitMatch([_], read_table_rpc(Config, channel_queue_metrics), 30000),

Sub = #'basic.consume'{queue = Queue},
#'basic.consume_ok'{consumer_tag = _} =
amqp_channel:call(Chan, Sub),

[_] = read_table_rpc(Config, consumer_created),
?awaitMatch([_], read_table_rpc(Config, consumer_created), 30000),

ok = rabbit_ct_client_helpers:close_connection(Conn),
% ensure cleanup happened
force_metric_gc(Config),
[] = read_table_rpc(Config, channel_exchange_metrics),
[] = read_table_rpc(Config, channel_queue_exchange_metrics),
[] = read_table_rpc(Config, channel_queue_metrics),
[] = read_table_rpc(Config, consumer_created),
?awaitMatch([], read_table_rpc(Config, channel_exchange_metrics), 30000),
?awaitMatch([], read_table_rpc(Config, channel_queue_exchange_metrics), 30000),
?awaitMatch([], read_table_rpc(Config, channel_queue_metrics), 30000),
?awaitMatch([], read_table_rpc(Config, consumer_created), 30000),
ok.


Expand Down
10 changes: 6 additions & 4 deletions deps/rabbit/test/per_node_limit_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").

-compile(export_all).

Expand Down Expand Up @@ -120,27 +121,28 @@ node_channel_limit(Config) ->
ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost),
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
0 = count_channels_per_node(Config),
?awaitMatch(0, count_channels_per_node(Config), 30000),

lists:foreach(fun(N) when (N band 1) == 1 -> {ok, _} = open_channel(Conn1);
(_) -> {ok,_ } = open_channel(Conn2)
end, lists:seq(1, 5)),

5 = count_channels_per_node(Config),
?awaitMatch(5, count_channels_per_node(Config), 30000),
%% In total 5 channels are open on this node, so a new one, regardless of
%% connection, will not be allowed. It will terminate the connection with
%% its channels too. So
{error, not_allowed_crash} = open_channel(Conn2),
3 = count_channels_per_node(Config),
?awaitMatch(3, count_channels_per_node(Config), 30000),
%% As the connection is dead, so are the 2 channels, so we should be able to
%% create 2 more on Conn1
{ok , _} = open_channel(Conn1),
{ok , _} = open_channel(Conn1),
?awaitMatch(5, count_channels_per_node(Config), 30000),
%% But not a third
{error, not_allowed_crash} = open_channel(Conn1),

%% Now all connections are closed, so there should be 0 open connections
0 = count_channels_per_node(Config),
?awaitMatch(0, count_channels_per_node(Config), 30000),
close_all_connections([Conn1, Conn2]),

rabbit_ct_broker_helpers:delete_vhost(Config, VHost),
Expand Down
8 changes: 7 additions & 1 deletion deps/rabbit/test/per_user_connection_channel_limit_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,13 @@ single_node_list_in_user(Config) ->
[Conn4] = open_connections(Config, [{0, Username1}]),
[_Chan4] = open_channels(Conn4, 1),
close_connections([Conn4]),
[#tracked_connection{username = Username1}] = connections_in(Config, Username1),
rabbit_ct_helpers:await_condition(
fun () ->
case connections_in(Config, Username1) of
[#tracked_connection{username = Username1}] -> true;
_ -> false
end
end),
[#tracked_channel{username = Username1}] = channels_in(Config, Username1),

[Conn5, Conn6] = open_connections(Config, [{0, Username2}, {0, Username2}]),
Expand Down
Loading
Loading