Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
09d35b8
Improve many testsuites to make them work with mixed versions of Khepri
dumbbell Mar 14, 2025
a9cf086
cluster_minority_SUITE: Fix race in `remove_node_when_seed_node_is_le…
dumbbell Jun 27, 2025
badce2f
cluster_minority_SUITE: Use `Config1`, not `Config`
dumbbell Jun 29, 2025
23f9c94
quorum_queue_SUITE: Use less messages in `force_checkpoint_on_queue`
dumbbell Jul 10, 2025
e0412d9
cluster_minority_SUITE: Ensure cluster can be changed before partition
dumbbell Jul 10, 2025
d9bf365
per_user_connection_channel_limit_SUITE: Fix test flake in `single_no…
dumbbell Jul 11, 2025
bce118b
queue_type_SUITE: Be explicit about connection open+close
dumbbell Jul 11, 2025
e5f0153
dynamic_SUITE: Be explicit about connection open+close
dumbbell Jul 11, 2025
e1d6b3e
rabbit_prometheus_http_SUITE: Use another Erlang metric
dumbbell Jul 11, 2025
c67b3b6
backing_queue_SUITE: Increase the restart time boundary
mkuratczyk Jul 14, 2025
7f19a26
per_node_limit_SUITE: Wait for the channel count to be up-to-date
dumbbell Jul 15, 2025
ddd1ce2
auth_SUITE: Wait for connection tracking to be up-to-date
dumbbell Jul 15, 2025
339a874
v5_SUITE: session_upgrade_v3_v5_qos1
ansd Jul 16, 2025
b06c7a9
amqp_client_SUITE: Trim "list_connections" output before parsing it
dumbbell Jul 16, 2025
652f51c
amqp_client_SUITE: Ignore meck return value in `idle_time_out_on_serv…
dumbbell Jul 30, 2025
d70ee13
metrics_SUITE: Wait for ETS table to be up-to-date
dumbbell Jul 16, 2025
f91aac1
rabbit_stream_queue_SUITE: Wait for replicas in `shrink_coordinator_c…
dumbbell Jul 30, 2025
424c030
Deflake per_user_connection_tracking_SUITE
mkuratczyk Jun 25, 2025
96b08e9
rabbit_vhosts: Only reconcile vhost procs on nodes running RabbitMQ
dumbbell Jul 31, 2025
081b5bd
rabbitmq_stream_management: Tell Maven to retry when fetching deps
dumbbell Aug 4, 2025
cb83ba9
rabbitmq_cli: Create a symlink to test node's logs
dumbbell Aug 5, 2025
def8b67
rabbit_fifo_dlx_integration_SUITE: Increase a timeout in `delivery_li…
dumbbell Jul 31, 2025
48276ed
per_user_connection_channel_tracking_SUITE: Wait for the expected lis…
dumbbell Jul 31, 2025
937e07f
per_user_connection_tracking_SUITE: Wait for the expected list of con…
dumbbell Aug 5, 2025
7de138b
rabbit_prometheus_http_SUITE: Log more details for a future failure i…
dumbbell Jul 31, 2025
076d878
rabbit_prometheus_http_SUITE: Run `stream_pub_sub_metrics` first
dumbbell Aug 7, 2025
b28ea95
rabbit_exchange_type_consistent_hash_SUITE: Open/close connection exp…
dumbbell Jul 31, 2025
18af7da
rabbit_exchange_type_consistent_hash_SUITE: Set timetrap to 5 minutes
dumbbell Jul 31, 2025
dc040b0
rabbit_exchange_type_consistent_hash_SUITE: Don't enable a feature fl…
dumbbell Aug 4, 2025
1707c28
rabbit_stream_partitions_SUITE: Fix incorrect log message
dumbbell Aug 4, 2025
3516cb7
amqp_jms_SUITE: Increase time trap
dumbbell Aug 4, 2025
135fff9
amqp_client_SUITE: Trim "list_connections" output in one more place
dumbbell Jul 31, 2025
e507209
amqp_client_SUITE: Load test module on broker before using one of its…
dumbbell Aug 5, 2025
333719c
amqp10_inter_cluster_SUITE: Wait for queue length to reach expectations
dumbbell Jul 30, 2025
2b5148f
amqp10_inter_cluster_SUITE: Use per-test shovel names
dumbbell Aug 6, 2025
0a24602
amqp10_inter_cluster_SUITE: Log messages and queues length
dumbbell Aug 6, 2025
141f183
feature_flags_v2_SUITE: Catch and log return value of peer:stop/1
dumbbell Aug 6, 2025
d036cf1
auth_SUITE: Wait for connection tracking to be up-to-date
dumbbell Aug 6, 2025
f8d8c09
jwks_SUITE: Wait for connection exit in `test_failed_token_refresh_ca…
dumbbell Aug 6, 2025
0399b22
proxy_protocol_SUITE: Wait for connection close
dumbbell Aug 7, 2025
04b91f6
python_SUITE: Increase a timeout in `test_exchange_dest` and `test_to…
dumbbell Jul 30, 2025
1c745d8
python_SUITE: Bump Python dependencies to their latest versions
dumbbell Aug 4, 2025
8688e7e
python_SUITE: Wait for the AMQP connection to close in `x_queue_name.py`
dumbbell Aug 6, 2025
d08c082
python_SUITE: Increase unittest verbosity
dumbbell Aug 6, 2025
34dac81
python_SUITE: Add more debug messages
dumbbell Aug 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test-make-target.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,6 @@ jobs:
name: CT logs (${{ inputs.plugin }} ${{ inputs.make_target }} OTP-${{ inputs.erlang_version }} ${{ inputs.metadata_store }}${{ inputs.mixed_clusters && ' mixed' || '' }})
path: |
logs/
deps/rabbitmq_cli/logs/
# !logs/**/log_private
if-no-files-found: ignore
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_vhosts.erl
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ start_processes_for_all(Nodes) ->

-spec start_processes_for_all() -> 'ok'.
start_processes_for_all() ->
start_processes_for_all(rabbit_nodes:list_reachable()).
start_processes_for_all(rabbit_nodes:list_running()).

%% Same as rabbit_vhost_sup_sup:start_on_all_nodes/0.
-spec start_on_all_nodes(vhost:name(), [node()]) -> 'ok'.
Expand Down
21 changes: 18 additions & 3 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1773,6 +1773,11 @@ link_target_queue_deleted(QType, Config) ->
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)),
ok = wait_for_accepted(DTag1),

%% Load test module on the broker: we reference an anonymous function
%% from it during the configuration of meck.
[_ | _] = rabbit_ct_broker_helpers:rpc(
Config, ?MODULE, module_info, []),

%% Mock delivery to the target queue to do nothing.
rabbit_ct_broker_helpers:setup_meck(Config, [?MODULE]),
Mod = rabbit_queue_type,
Expand Down Expand Up @@ -1833,6 +1838,11 @@ target_queues_deleted_accepted(Config) ->
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)),
ok = wait_for_accepted(DTag1),

%% Load test module on the broker: we reference an anonymous function
%% from it during the configuration of meck.
[_ | _] = rabbit_ct_broker_helpers:rpc(
Config, ?MODULE, module_info, []),

%% Mock to deliver only to q1.
rabbit_ct_broker_helpers:setup_meck(Config, [?MODULE]),
Mod = rabbit_queue_type,
Expand Down Expand Up @@ -3979,7 +3989,7 @@ list_connections(Config) ->
end,

{ok, StdOut0} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["list_connections", "--silent", "protocol"]),
Protocols0 = re:split(StdOut0, <<"\n">>, [trim]),
Protocols0 = re:split(string:trim(StdOut0), <<"\n">>, [trim]),
%% Remove any whitespaces.
Protocols1 = [binary:replace(Subject, <<" ">>, <<>>, [global]) || Subject <- Protocols0],
Protocols = lists:sort(Protocols1),
Expand All @@ -3993,7 +4003,7 @@ list_connections(Config) ->

%% CLI should list AMQP 1.0 container-id
{ok, StdOut1} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["list_connections", "--silent", "container_id"]),
ContainerIds0 = re:split(StdOut1, <<"\n">>, [trim]),
ContainerIds0 = re:split(string:trim(StdOut1), <<"\n">>, [trim]),
ContainerIds = lists:sort(ContainerIds0),
?assertEqual([<<>>, ContainerId0, ContainerId2],
ContainerIds),
Expand Down Expand Up @@ -4630,6 +4640,11 @@ idle_time_out_on_server(Config) ->
after 30000 -> ct:fail({missing_event, ?LINE})
end,

%% Load test module on the broker: we reference an anonymous function
%% from it during the configuration of meck.
[_ | _] = rabbit_ct_broker_helpers:rpc(
Config, ?MODULE, module_info, []),

%% Mock the server socket to not have received any bytes.
rabbit_ct_broker_helpers:setup_meck(Config),
ok = rpc(Config, meck, new, [Mod, [no_link, passthrough]]),
Expand All @@ -4653,7 +4668,7 @@ idle_time_out_on_server(Config) ->
ct:fail({missing_event, ?LINE})
end
after
?assert(rpc(Config, meck, validate, [Mod])),
_ = rpc(Config, meck, validate, [Mod]),
ok = rpc(Config, meck, unload, [Mod]),
ok = rpc(Config, application, set_env, [App, Par, DefaultVal])
end.
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/test/amqp_jms_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ groups() ->

suite() ->
[
{timetrap, {minutes, 2}}
{timetrap, {minutes, 5}}
].

init_per_suite(Config) ->
Expand Down
8 changes: 4 additions & 4 deletions deps/rabbit/test/backing_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1445,18 +1445,18 @@ variable_queue_restart_large_seq_id2(VQ0, QName) ->
Terms = variable_queue_read_terms(QName),
Count = proplists:get_value(next_seq_id, Terms),

%% set a very high next_seq_id as if 100M messages have been
%% set a very high next_seq_id as if 100 billion messages have been
%% published and consumed
Terms2 = lists:keyreplace(next_seq_id, 1, Terms, {next_seq_id, 100_000_000}),
Terms2 = lists:keyreplace(next_seq_id, 1, Terms, {next_seq_id, 100_000_000_000}),

{TInit, VQ3} =
timer:tc(
fun() -> variable_queue_init(test_amqqueue(QName, true), Terms2) end,
millisecond),
%% even with a very high next_seq_id start of an empty queue
%% should be quick (few milliseconds, but let's give it 100ms, to
%% should be quick (few milliseconds, but let's give it 500ms, to
%% avoid flaking on slow servers)
{true, _} = {TInit < 100, TInit},
{true, _} = {TInit < 500, TInit},

%% should be empty now
true = rabbit_variable_queue:is_empty(VQ3),
Expand Down
211 changes: 167 additions & 44 deletions deps/rabbit/test/cluster_minority_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@

-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").

-compile([export_all, nowarn_export_all]).

all() ->
[
{group, client_operations},
{group, cluster_operation_add},
{group, cluster_operation_remove}
{group, cluster_operation}
].

groups() ->
Expand All @@ -42,8 +42,10 @@ groups() ->
delete_policy,
export_definitions
]},
{cluster_operation_add, [], [add_node]},
{cluster_operation_remove, [], [remove_node]},
{cluster_operation, [], [add_node_when_seed_node_is_leader,
add_node_when_seed_node_is_follower,
remove_node_when_seed_node_is_leader,
remove_node_when_seed_node_is_follower]},
{feature_flags, [], [enable_feature_flag]}
].

Expand Down Expand Up @@ -127,26 +129,49 @@ init_per_group(Group, Config0) when Group == client_operations;
partition_5_node_cluster(Config1),
Config1
end;
init_per_group(Group, Config0) ->
init_per_group(_Group, Config0) ->
Config = rabbit_ct_helpers:set_config(Config0, [{rmq_nodes_count, 5},
{rmq_nodename_suffix, Group},
{rmq_nodes_clustered, false},
{tcp_ports_base},
{net_ticktime, 5}]),
Config1 = rabbit_ct_helpers:merge_app_env(
Config, {rabbit, [{forced_feature_flags_on_init, []}]}),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
Config, {rabbit, [{forced_feature_flags_on_init, []},
{khepri_leader_wait_retry_timeout, 30000}]}),
Config1.

end_per_group(_, Config) ->
end_per_group(Group, Config) when Group == client_operations;
Group == feature_flags ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).

rabbit_ct_broker_helpers:teardown_steps());
end_per_group(_Group, Config) ->
Config.

init_per_testcase(Testcase, Config)
when Testcase =:= add_node_when_seed_node_is_leader orelse
Testcase =:= add_node_when_seed_node_is_follower orelse
Testcase =:= remove_node_when_seed_node_is_leader orelse
Testcase =:= remove_node_when_seed_node_is_follower ->
rabbit_ct_helpers:testcase_started(Config, Testcase),
Config1 = rabbit_ct_helpers:set_config(
Config, [{rmq_nodename_suffix, Testcase}]),
rabbit_ct_helpers:run_steps(
Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps());
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).

end_per_testcase(Testcase, Config)
when Testcase =:= add_node_when_seed_node_is_leader orelse
Testcase =:= add_node_when_seed_node_is_follower orelse
Testcase =:= remove_node_when_seed_node_is_leader orelse
Testcase =:= remove_node_when_seed_node_is_follower ->
rabbit_ct_helpers:run_steps(
Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()),
rabbit_ct_helpers:testcase_finished(Config, Testcase);
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).

Expand Down Expand Up @@ -271,53 +296,151 @@ set_policy(Config) ->
delete_policy(Config) ->
?assertError(_, rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"policy-to-delete">>)).

add_node(Config) ->
[A, B, C, D, _E] = rabbit_ct_broker_helpers:get_node_configs(
add_node_when_seed_node_is_leader(Config) ->
[A, B, C, _D, E] = rabbit_ct_broker_helpers:get_node_configs(
Config, nodename),

%% Three node cluster: A, B, C
ok = rabbit_control_helper:command(stop_app, B),
ok = rabbit_control_helper:command(join_cluster, B, [atom_to_list(A)], []),
rabbit_control_helper:command(start_app, B),
Cluster = [A, B, C],
Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Cluster),

ok = rabbit_control_helper:command(stop_app, C),
ok = rabbit_control_helper:command(join_cluster, C, [atom_to_list(A)], []),
rabbit_control_helper:command(start_app, C),
AMember = {rabbit_khepri:get_store_id(), A},
_ = ra:transfer_leadership(AMember, AMember),
clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster),

%% Minority partition: A
partition_3_node_cluster(Config1),

Pong = ra:ping(AMember, 10000),
ct:pal("Member A state: ~0p", [Pong]),
case Pong of
{pong, State} when State =/= follower andalso State =/= candidate ->
Ret = rabbit_control_helper:command(
join_cluster, E, [atom_to_list(A)], []),
?assertMatch({error, _, _}, Ret),
{error, _, Msg} = Ret,
?assertEqual(
match,
re:run(
Msg, "(Khepri cluster could be in minority|\\{:rabbit, \\{\\{:error, :timeout\\})",
[{capture, none}]));
Ret ->
ct:pal("A is not the expected leader: ~p", [Ret]),
{skip, "Node A was not elected leader"}
end.

add_node_when_seed_node_is_follower(Config) ->
[A, B, C, _D, E] = rabbit_ct_broker_helpers:get_node_configs(
Config, nodename),

%% Three node cluster: A, B, C
Cluster = [A, B, C],
partition_3_node_cluster(Config),

ok = rabbit_control_helper:command(stop_app, D),
%% The command is appended to the log, but it will be dropped once the connectivity
%% is restored
?assertMatch(ok,
rabbit_control_helper:command(join_cluster, D, [atom_to_list(A)], [])),
timer:sleep(10000),
join_3_node_cluster(Config),
clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster).

remove_node(Config) ->
Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Cluster),

CMember = {rabbit_khepri:get_store_id(), C},
ra:transfer_leadership(CMember, CMember),
clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster),

%% Minority partition: A
partition_3_node_cluster(Config1),

AMember = {rabbit_khepri:get_store_id(), A},
Pong = ra:ping(AMember, 10000),
ct:pal("Member A state: ~0p", [Pong]),
case Pong of
{pong, State}
when State =:= follower orelse State =:= pre_vote ->
Ret = rabbit_control_helper:command(
join_cluster, E, [atom_to_list(A)], []),
?assertMatch({error, _, _}, Ret),
{error, _, Msg} = Ret,
?assertEqual(
match,
re:run(
Msg, "Khepri cluster could be in minority",
[{capture, none}]));
{pong, await_condition} ->
Ret = rabbit_control_helper:command(
join_cluster, E, [atom_to_list(A)], []),
?assertMatch({error, _, _}, Ret),
{error, _, Msg} = Ret,
?assertEqual(
match,
re:run(
Msg, "\\{:rabbit, \\{\\{:error, :timeout\\}",
[{capture, none}])),
clustering_utils:assert_cluster_status(
{Cluster, Cluster}, Cluster);
Ret ->
ct:pal("A is not the expected follower: ~p", [Ret]),
{skip, "Node A was not a follower"}
end.

remove_node_when_seed_node_is_leader(Config) ->
[A, B, C | _] = rabbit_ct_broker_helpers:get_node_configs(
Config, nodename),

%% Three node cluster: A, B, C
ok = rabbit_control_helper:command(stop_app, B),
ok = rabbit_control_helper:command(join_cluster, B, [atom_to_list(A)], []),
rabbit_control_helper:command(start_app, B),
Cluster = [A, B, C],
Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Cluster),

ok = rabbit_control_helper:command(stop_app, C),
ok = rabbit_control_helper:command(join_cluster, C, [atom_to_list(A)], []),
rabbit_control_helper:command(start_app, C),
AMember = {rabbit_khepri:get_store_id(), A},
ra:transfer_leadership(AMember, AMember),
clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster),
ct:pal("Waiting for cluster change permitted on node A"),
?awaitMatch(
{ok, #{cluster_change_permitted := true,
leader_id := AMember}, AMember},
rabbit_ct_broker_helpers:rpc(
Config1, A, ra, member_overview, [AMember]),
60000),
{ok, Overview, AMember} = rabbit_ct_broker_helpers:rpc(
Config1, A, ra, member_overview, [AMember]),
ct:pal("Member A overview: ~p", [maps:remove(machine, Overview)]),

%% Minority partition: A
partition_3_node_cluster(Config),
partition_3_node_cluster(Config1),

?assertEqual({pong, leader}, ra:ping(AMember, 10000)),
?awaitMatch(
ok,
rabbit_control_helper:command(
forget_cluster_node, A, [atom_to_list(B)], []),
60000).

remove_node_when_seed_node_is_follower(Config) ->
[A, B, C | _] = rabbit_ct_broker_helpers:get_node_configs(
Config, nodename),

%% Three node cluster: A, B, C
Cluster = [A, B, C],
Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Cluster),

ok = rabbit_control_helper:command(forget_cluster_node, A, [atom_to_list(B)], []),
timer:sleep(10000),
join_3_node_cluster(Config),
clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster).
AMember = {rabbit_khepri:get_store_id(), A},
CMember = {rabbit_khepri:get_store_id(), C},
ra:transfer_leadership(CMember, CMember),
clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster),
?awaitMatch(
{ok, #{cluster_change_permitted := true,
leader_id := CMember}, AMember},
rabbit_ct_broker_helpers:rpc(
Config1, A, ra, member_overview, [AMember]),
60000),
{ok, Overview, AMember} = rabbit_ct_broker_helpers:rpc(
Config1, A, ra, member_overview, [AMember]),
ct:pal("Member A overview: ~p", [maps:remove(machine, Overview)]),

%% Minority partition: A
partition_3_node_cluster(Config1),

Ret = rabbit_control_helper:command(
forget_cluster_node, A, [atom_to_list(B)], []),
?assertMatch({error, _, _}, Ret),
{error, _, Msg} = Ret,
?assertEqual(
match,
re:run(
Msg, "Khepri cluster could be in minority", [{capture, none}])).

enable_feature_flag(Config) ->
[A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/test/clustering_management_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -745,13 +745,13 @@ is_in_minority(Ret) ->
?assertMatch(match, re:run(Msg, ".*timed out.*minority.*", [{capture, none}])).

reset_last_disc_node(Config) ->
Servers = [Rabbit, Hare | _] = cluster_members(Config),
[Rabbit, Hare | _] = cluster_members(Config),

stop_app(Config, Hare),
?assertEqual(ok, change_cluster_node_type(Config, Hare, ram)),
start_app(Config, Hare),

case rabbit_ct_broker_helpers:enable_feature_flag(Config, Servers, khepri_db) of
case rabbit_ct_broker_helpers:enable_feature_flag(Config, [Rabbit], khepri_db) of
ok ->
%% The reset works after the switch to Khepri because the RAM node was
%% implicitly converted to a disc one as Khepri always writes data on disc.
Expand Down
Loading
Loading