diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 36f6b63966df..dbaad06bca8a 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -9,7 +9,7 @@ -export([recover/1, stop/1, start/1, declare/6, declare/7, delete_immediately/1, delete_exclusive/2, delete/4, purge/1, - forget_all_durable/1]). + forget_all/1]). -export([pseudo_queue/2, pseudo_queue/3]). -export([exists/1, lookup/1, lookup/2, lookup_durable_queue/1, not_found_or_absent_dirty/1, @@ -1882,19 +1882,19 @@ internal_delete(Queue, ActingUser, Reason) -> {user_who_performed_action, ActingUser}]) end. --spec forget_all_durable(node()) -> 'ok'. +-spec forget_all(node()) -> 'ok'. -%% TODO this is used by `rabbit_mnesia:remove_node_if_mnesia_running` -%% Does it make any sense once mnesia is not used/removed? -forget_all_durable(Node) -> - ?LOG_INFO("Will remove all classic queues from node ~ts. The node is likely being removed from the cluster.", [Node]), +%% This is used by `rabbit_mnesia:remove_node_if_mnesia_running/1' and +%% `rabbit_khepri:remove_*_member/1'. +forget_all(Node) -> + ?LOG_INFO("Will remove all queues from node ~ts. The node is likely being removed from the cluster.", [Node]), UpdateFun = fun(Q) -> forget_node_for_queue(Q) end, FilterFun = fun(Q) -> is_local_to_node(amqqueue:get_pid(Q), Node) end, - rabbit_db_queue:foreach_durable(UpdateFun, FilterFun). + rabbit_db_queue:foreach(UpdateFun, FilterFun). forget_node_for_queue(Q) when ?amqqueue_is_quorum(Q) -> @@ -1936,27 +1936,31 @@ is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is_pid(Q) -> -spec on_node_up(node()) -> 'ok'. -on_node_up(_Node) -> - ok. +on_node_up(Node) -> + case rabbit_khepri:is_enabled() of + true -> + %% With Khepri, we try to delete transient queues now because it's + %% possible any updates timed out because of the lack of a quorum + %% while `Node' was down. + ok = delete_transient_queues_on_node(Node); + false -> + ok + end. -spec on_node_down(node()) -> 'ok'. on_node_down(Node) -> - case delete_transient_queues_on_node(Node) of - ok -> + case rabbit_khepri:is_enabled() of + true -> + %% With Khepri, we don't delete transient/exclusive queues. There + %% may be a network partition and the node will be reachable again + %% after the partition is repaired. + %% + %% If the node will never come back, it will likely be removed from + %% the cluster. We take care of transient queues at that time. ok; - {error, timeout} -> - %% This case is possible when running Khepri. The node going down - %% could leave the cluster in a minority so the command to delete - %% the transient queue records would fail. Also see - %% `rabbit_khepri:init/0': we also try this deletion when the node - %% restarts - a time that the cluster is very likely to have a - %% majority - to ensure these records are deleted. - ?LOG_WARNING("transient queues for node '~ts' could not be " - "deleted because of a timeout. These queues " - "will be removed when node '~ts' restarts or " - "is removed from the cluster.", [Node, Node]), - ok + false -> + ok = delete_transient_queues_on_node(Node) end. -spec delete_transient_queues_on_node(Node) -> Ret when diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index 0528ab8389b0..ab66d264eaff 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -352,21 +352,66 @@ terminate_delete(EmitStats, Reason0, ReplyTo, fun() -> emit_stats(State) end); true -> ok end, - %% This try-catch block transforms throws to errors since throws are not - %% logged. When mnesia is removed this `try` can be removed: Khepri - %% returns errors as error tuples instead. - Reply = try rabbit_amqqueue:internal_delete(Q, ActingUser, Reason0) of - ok -> - {ok, Len}; - {error, _} = Err -> - Err - catch - {error, ReasonE} -> error(ReasonE) - end, - send_reply(ReplyTo, Reply), + case ReplyTo of + _ when ReplyTo =/= none -> + Reply = case delete_queue_record(Q, ActingUser, Reason0) of + ok -> + {ok, Len}; + {error, _} = Err -> + Err + end, + send_reply(ReplyTo, Reply); + none -> + %% No processes are waiting for this queue process to exit. We + %% can handle the deletion of the queue record differently: if + %% the deletion times out, we retry indefinitely. + %% + %% For instance, this allows an auto-delete queue process to + %% wait and retry until a network partition is resolved (or + %% this node stops of course). This reduces the risk of a + %% "leak" of a queue record in the metadata store. + %% + %% If for whatever reason the queue record is still leaked + %% (this process could not delete it before it was killed), the + %% "leaked" queue record will be cleaned up when the partition + %% is solved (or this node is removed from the cluster). + %% Indeed, when the partition is solved, all nodes are notified + %% with the `node_up' message from `rabbit_node_monitor'. This + %% calls `rabbit_amqqueue:on_node_up/1' which will delete any + %% transient queues. + %% + %% This infinite delete attempts loop is executed in a + %% separate process to let this queue process exits. This way, + %% connections will be notified that the queue process is + %% gone. + worker_pool:submit_async( + fun() -> + _ = infinite_internal_delete(Q, ActingUser, Reason0) + end), + ok + end, BQS1 end. +infinite_internal_delete(Q, ActingUser, Reason) -> + case delete_queue_record(Q, ActingUser, Reason) of + {error, timeout} -> + _ = rabbit_khepri:fence(infinity), + infinite_internal_delete(Q, ActingUser, Reason); + Ret -> + Ret + end. + +delete_queue_record(Q, ActingUser, Reason) -> + %% This try-catch block transforms throws to errors since throws are not + %% logged. When mnesia is removed this `try` can be removed: Khepri returns + %% errors as error tuples instead. + try + rabbit_amqqueue:internal_delete(Q, ActingUser, Reason) + catch + {error, ReasonE} -> error(ReasonE) + end. + terminated_by({terminated_by, auto_delete}) -> ?INTERNAL_USER; terminated_by({terminated_by, ActingUser}) -> diff --git a/deps/rabbit/src/rabbit_db_queue.erl b/deps/rabbit/src/rabbit_db_queue.erl index ed4ee2274379..b17951b8c871 100644 --- a/deps/rabbit/src/rabbit_db_queue.erl +++ b/deps/rabbit/src/rabbit_db_queue.erl @@ -32,7 +32,8 @@ delete/2, update/2, update_decorators/2, - exists/1 + exists/1, + foreach/2 ]). %% Once mnesia is removed, all transient entities will be deleted. These can be replaced @@ -57,8 +58,7 @@ %% Only used by rabbit_amqqueue:forget_node_for_queue, which is only called %% by `rabbit_mnesia:remove_node_if_mnesia_running`. Thus, once mnesia and/or %% HA queues are removed it can be deleted. --export([foreach_durable/2, - internal_delete/3]). +-export([internal_delete/3]). %% Storing it on Khepri is not needed, this function is just used in %% rabbit_quorum_queue to ensure the queue is present in the rabbit_queue @@ -1263,20 +1263,26 @@ foreach_transient_in_khepri(UpdateFun) -> end. %% ------------------------------------------------------------------- -%% foreach_durable(). +%% foreach(). %% ------------------------------------------------------------------- --spec foreach_durable(UpdateFun, FilterFun) -> ok when +-spec foreach(UpdateFun, FilterFun) -> ok when UpdateFun :: fun((Queue) -> any()), FilterFun :: fun((Queue) -> boolean()). -%% @doc Applies `UpdateFun' to all durable queue records that match `FilterFun'. +%% @doc Applies `UpdateFun' to all queue records that match `FilterFun'. +%% +%% With Mnesia, only durable queues are considered because we use the durable +%% queues table. +%% +%% With Khepri, all queues are considered because they are all in the same +%% "table". %% %% @private -foreach_durable(UpdateFun, FilterFun) -> +foreach(UpdateFun, FilterFun) -> rabbit_khepri:handle_fallback( #{mnesia => fun() -> foreach_durable_in_mnesia(UpdateFun, FilterFun) end, - khepri => fun() -> foreach_durable_in_khepri(UpdateFun, FilterFun) end + khepri => fun() -> foreach_in_khepri(UpdateFun, FilterFun) end }). foreach_durable_in_mnesia(UpdateFun, FilterFun) -> @@ -1292,11 +1298,8 @@ foreach_durable_in_mnesia(UpdateFun, FilterFun) -> end), ok. -foreach_durable_in_khepri(UpdateFun, FilterFun) -> - Path = khepri_queue_path( - ?KHEPRI_WILDCARD_STAR, - #if_data_matches{ - pattern = amqqueue:pattern_match_on_durable(true)}), +foreach_in_khepri(UpdateFun, FilterFun) -> + Path = khepri_queue_path(?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR), case rabbit_khepri:filter(Path, fun(_, #{data := Q}) -> FilterFun(Q) end) of diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index c7423b2731ee..1e377eecf362 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -670,7 +670,7 @@ remove_reachable_member(NodeToRemove) -> NodeToRemove, khepri_cluster, reset, [?RA_CLUSTER_NAME]), case Ret of ok -> - rabbit_amqqueue:forget_all_durable(NodeToRemove), + rabbit_amqqueue:forget_all(NodeToRemove), ?LOG_DEBUG( "Node ~s removed from Khepri cluster \"~s\"", [NodeToRemove, ?RA_CLUSTER_NAME], @@ -692,7 +692,7 @@ remove_down_member(NodeToRemove) -> Ret = ra:remove_member(ServerRef, ServerId, Timeout), case Ret of {ok, _, _} -> - rabbit_amqqueue:forget_all_durable(NodeToRemove), + rabbit_amqqueue:forget_all(NodeToRemove), ?LOG_DEBUG( "Node ~s removed from Khepri cluster \"~s\"", [NodeToRemove, ?RA_CLUSTER_NAME], diff --git a/deps/rabbit/src/rabbit_mnesia.erl b/deps/rabbit/src/rabbit_mnesia.erl index 9dd73f68b0dd..541f98b23465 100644 --- a/deps/rabbit/src/rabbit_mnesia.erl +++ b/deps/rabbit/src/rabbit_mnesia.erl @@ -916,7 +916,7 @@ remove_node_if_mnesia_running(Node) -> case mnesia:del_table_copy(schema, Node) of {atomic, ok} -> rabbit_node_monitor:notify_left_cluster(Node), - rabbit_amqqueue:forget_all_durable(Node), + rabbit_amqqueue:forget_all(Node), ok; {aborted, Reason} -> {error, {failed_to_remove_node, Node, Reason}} diff --git a/deps/rabbit/src/rabbit_node_monitor.erl b/deps/rabbit/src/rabbit_node_monitor.erl index 5939c156b259..6a04d4cafe54 100644 --- a/deps/rabbit/src/rabbit_node_monitor.erl +++ b/deps/rabbit/src/rabbit_node_monitor.erl @@ -430,16 +430,8 @@ handle_call(status, _From, State = #state{partitions = Partitions}) -> handle_call(_Request, _From, State) -> {noreply, State}. -handle_cast(notify_node_up, State = #state{guid = GUID}) -> - Nodes = rabbit_nodes:list_reachable() -- [node()], - gen_server:abcast(Nodes, ?SERVER, - {node_up, node(), rabbit_db_cluster:node_type(), GUID}), - %% register other active rabbits with this rabbit - DiskNodes = rabbit_db_cluster:disc_members(), - [gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of - true -> disc; - false -> ram - end}) || N <- Nodes], +handle_cast(notify_node_up, State) -> + do_notify_node_up(State), {noreply, State}; %%---------------------------------------------------------------------------- @@ -665,6 +657,12 @@ handle_info({nodedown, Node, Info}, State) -> handle_info({nodeup, Node, _Info}, State) -> ?LOG_INFO("node ~tp up", [Node]), + %% We notify that `rabbit' is up here too (in addition to the message sent + %% explicitly by a boot step. That's because nodes may go down then up + %% during a network partition, and with Khepri, nodes are not restarted + %% (unlike with some partition handling strategies used with Mnesia), and + %% thus the boot steps are not executed. + do_notify_node_up(State), {noreply, State}; handle_info({mnesia_system_event, @@ -854,6 +852,20 @@ wait_for_cluster_recovery(Condition) -> wait_for_cluster_recovery(Condition) end. +do_notify_node_up(#state{guid = GUID}) -> + Nodes = rabbit_nodes:list_reachable() -- [node()], + gen_server:abcast(Nodes, ?SERVER, + {node_up, node(), rabbit_db_cluster:node_type(), GUID}), + %% register other active rabbits with this rabbit + DiskNodes = rabbit_db_cluster:disc_members(), + _ = [gen_server:cast( + ?SERVER, + {node_up, N, case lists:member(N, DiskNodes) of + true -> disc; + false -> ram + end}) || N <- Nodes], + ok. + handle_dead_rabbit(Node, State) -> %% TODO: This may turn out to be a performance hog when there are %% lots of nodes. We really only need to execute some of these diff --git a/deps/rabbit/test/bindings_SUITE.erl b/deps/rabbit/test/bindings_SUITE.erl index 539515fd93f8..862d0f7b8e53 100644 --- a/deps/rabbit/test/bindings_SUITE.erl +++ b/deps/rabbit/test/bindings_SUITE.erl @@ -96,25 +96,36 @@ end_per_group(_, Config) -> rabbit_ct_broker_helpers:teardown_steps()). init_per_testcase(Testcase, Config) -> - Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), - Name = rabbit_data_coercion:to_binary(Testcase), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange, [Name]), - Config2 = rabbit_ct_helpers:set_config(Config1, - [{queue_name, Name}, - {alt_queue_name, <>}, - {exchange_name, Name} - ]), - rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()). + case {Testcase, rabbit_ct_broker_helpers:configured_metadata_store(Config)} of + {transient_queue_on_node_down, khepri} -> + {skip, "Test irrelevant with Khepri"}; + _ -> + Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), + Name = rabbit_data_coercion:to_binary(Testcase), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange, [Name]), + Config2 = rabbit_ct_helpers:set_config( + Config1, + [{queue_name, Name}, + {alt_queue_name, <>}, + {exchange_name, Name} + ]), + rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()) + end. end_per_testcase(Testcase, Config) -> - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange, - [?config(exchange_name, Config)]), - Config1 = rabbit_ct_helpers:run_steps( - Config, - rabbit_ct_client_helpers:teardown_steps()), - rabbit_ct_helpers:testcase_finished(Config1, Testcase). + case {Testcase, rabbit_ct_broker_helpers:configured_metadata_store(Config)} of + {transient_queue_on_node_down, khepri} -> + Config; + _ -> + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange, + [?config(exchange_name, Config)]), + Config1 = rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_client_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase) + end. %% ------------------------------------------------------------------- %% Testcases. diff --git a/deps/rabbit/test/clustering_recovery_SUITE.erl b/deps/rabbit/test/clustering_recovery_SUITE.erl index 1167fa0a24bc..c5ffb36e953b 100644 --- a/deps/rabbit/test/clustering_recovery_SUITE.erl +++ b/deps/rabbit/test/clustering_recovery_SUITE.erl @@ -33,7 +33,15 @@ groups() -> {clustered_3_nodes, [], [{cluster_size_3, [], [ force_shrink_quorum_queue, - force_shrink_all_quorum_queues + force_shrink_all_quorum_queues, + autodelete_transient_queue_after_partition_recovery_1, + autodelete_durable_queue_after_partition_recovery_1, + autodelete_transient_queue_after_node_loss, + autodelete_durable_queue_after_node_loss, + exclusive_transient_queue_after_partition_recovery_1, + exclusive_durable_queue_after_partition_recovery_1, + exclusive_transient_queue_after_node_loss, + exclusive_durable_queue_after_node_loss ]} ]} ]}, @@ -43,7 +51,19 @@ groups() -> force_standalone_boot, force_standalone_boot_and_restart, force_standalone_boot_and_restart_with_quorum_queues, - recover_after_partition_with_leader + recover_after_partition_with_leader, + autodelete_transient_queue_after_partition_recovery_1, + autodelete_durable_queue_after_partition_recovery_1, + autodelete_transient_queue_after_partition_recovery_2, + autodelete_durable_queue_after_partition_recovery_2, + autodelete_transient_queue_after_node_loss, + autodelete_durable_queue_after_node_loss, + exclusive_transient_queue_after_partition_recovery_1, + exclusive_durable_queue_after_partition_recovery_1, + exclusive_transient_queue_after_partition_recovery_2, + exclusive_durable_queue_after_partition_recovery_2, + exclusive_transient_queue_after_node_loss, + exclusive_durable_queue_after_node_loss ]} ]}, {clustered_5_nodes, [], @@ -110,9 +130,55 @@ init_per_testcase(Testcase, Config) -> {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}, {keep_pid_file_on_exit, true} ]), - rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()). + Config2 = case Testcase of + _ when Testcase =:= autodelete_transient_queue_after_partition_recovery_1 orelse + Testcase =:= autodelete_durable_queue_after_partition_recovery_1 orelse + Testcase =:= autodelete_transient_queue_after_partition_recovery_2 orelse + Testcase =:= autodelete_durable_queue_after_partition_recovery_2 orelse + Testcase =:= exclusive_transient_queue_after_partition_recovery_1 orelse + Testcase =:= exclusive_durable_queue_after_partition_recovery_1 orelse + Testcase =:= exclusive_transient_queue_after_partition_recovery_2 orelse + Testcase =:= exclusive_durable_queue_after_partition_recovery_2 -> + rabbit_ct_helpers:merge_app_env( + Config1, + {rabbit, + [{cluster_partition_handling, pause_minority}]}); + _ -> + Config1 + end, + Config3 = rabbit_ct_helpers:run_steps( + Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()), + case Config3 of + _ when is_list(Config3) andalso + (Testcase =:= autodelete_transient_queue_after_partition_recovery_1 orelse + Testcase =:= autodelete_durable_queue_after_partition_recovery_1 orelse + Testcase =:= autodelete_transient_queue_after_partition_recovery_2 orelse + Testcase =:= autodelete_durable_queue_after_partition_recovery_2 orelse + Testcase =:= exclusive_transient_queue_after_partition_recovery_1 orelse + Testcase =:= exclusive_durable_queue_after_partition_recovery_1 orelse + Testcase =:= exclusive_transient_queue_after_partition_recovery_2 orelse + Testcase =:= exclusive_durable_queue_after_partition_recovery_2) -> + NewEnough = ok =:= rabbit_ct_broker_helpers:enable_feature_flag( + Config3, 'rabbitmq_4.2.0'), + case NewEnough of + true -> + Config3; + false -> + _ = rabbit_ct_helpers:run_steps( + Config3, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config3, Testcase), + {skip, + "The old node does not have improvements to " + "rabbit_amqqueue_process and rabbit_node_monitor"} + end; + _ -> + %% Other testcases or failure to setup broker and client. + Config3 + end. end_per_testcase(Testcase, Config) -> Config1 = rabbit_ct_helpers:run_steps(Config, @@ -255,23 +321,7 @@ force_standalone_boot_and_restart_with_quorum_queues(Config) -> recover_after_partition_with_leader(Config) -> Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - %% We use intermediate Erlang nodes between the common_test control node - %% and the RabbitMQ nodes, using `peer' standard_io communication. The goal - %% is to make sure the common_test control node doesn't interfere with the - %% nodes the RabbitMQ nodes can see, despite the blocking of the Erlang - %% distribution connection. - Proxies0 = [begin - {ok, Proxy, PeerNode} = peer:start_link( - #{name => peer:random_name(), - connection => standard_io, - wait_boot => 120000}), - ct:pal("Proxy ~0p -> ~0p", [Proxy, PeerNode]), - Proxy - end || _ <- Nodes], - Proxies = maps:from_list(lists:zip(Nodes, Proxies0)), - ct:pal("Proxies: ~p", [Proxies]), - Config1 = [{proxies, Proxies} | Config], + Config1 = start_proxies(Config), NodeA = hd(Nodes), @@ -384,6 +434,26 @@ recover_after_partition_with_leader(Config) -> application:unset_env(kernel, dist_auto_connect), ok. +start_proxies(Config) -> + %% We use intermediate Erlang nodes between the common_test control node + %% and the RabbitMQ nodes, using `peer' standard_io communication. The + %% goal is to make sure the common_test control node doesn't interfere + %% with the nodes the RabbitMQ nodes can see, despite the blocking of the + %% Erlang distribution connection. + Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Proxies0 = [begin + {ok, Proxy, PeerNode} = peer:start_link( + #{name => peer:random_name(), + connection => standard_io, + wait_boot => 120000}), + ct:pal("Proxy ~0p -> ~0p", [Proxy, PeerNode]), + Proxy + end || _ <- Nodes], + Proxies = maps:from_list(lists:zip(Nodes, Proxies0)), + ct:pal("Proxies: ~p", [Proxies]), + Config1 = [{proxies, Proxies} | Config], + Config1. + proxied_rpc(Config, Node, Module, Function, Args) -> Proxies = ?config(proxies, Config), Proxy = maps:get(Node, Proxies), @@ -393,9 +463,16 @@ proxied_rpc(Config, Node, Module, Function, Args) -> get_leader_node(Config, Node) -> StoreId = rabbit_khepri:get_store_id(), - Ret = proxied_rpc( - Config, Node, - ra_leaderboard, lookup_leader, [StoreId]), + Ret = case rabbit_ct_helpers:get_config(Config, proxies) of + undefined -> + rabbit_ct_broker_helpers:rpc( + Config, Node, + ra_leaderboard, lookup_leader, [StoreId]); + _ -> + proxied_rpc( + Config, Node, + ra_leaderboard, lookup_leader, [StoreId]) + end, case Ret of {StoreId, LeaderNode} -> {ok, LeaderNode}; @@ -485,6 +562,552 @@ forget_down_node(Config) -> ok. +autodelete_transient_queue_after_partition_recovery_1(Config) -> + QueueDeclare = #'queue.declare'{auto_delete = true, + durable = false}, + temporary_queue_after_partition_recovery_1(Config, QueueDeclare). + +autodelete_durable_queue_after_partition_recovery_1(Config) -> + QueueDeclare = #'queue.declare'{auto_delete = true, + durable = true}, + temporary_queue_after_partition_recovery_1(Config, QueueDeclare). + +exclusive_transient_queue_after_partition_recovery_1(Config) -> + QueueDeclare = #'queue.declare'{exclusive = true, + durable = false}, + temporary_queue_after_partition_recovery_1(Config, QueueDeclare). + +exclusive_durable_queue_after_partition_recovery_1(Config) -> + QueueDeclare = #'queue.declare'{exclusive = true, + durable = true}, + temporary_queue_after_partition_recovery_1(Config, QueueDeclare). + +temporary_queue_after_partition_recovery_1(Config, QueueDeclare) -> + [_Node1, Node2 | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + Majority = Nodes -- [Node2], + Timeout = 60000, + + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, Node2), + CMRef = erlang:monitor(process, Conn), + + %% We create an exclusive queue on node 1 and get its PID on the server + %% side. + ?assertMatch(#'queue.declare_ok'{}, amqp_channel:call(Ch, QueueDeclare)), + Queues = rabbit_ct_broker_helpers:rpc( + Config, Node2, rabbit_amqqueue, list, []), + ?assertMatch([_], Queues), + [Queue] = Queues, + ct:pal("Queue = ~p", [Queue]), + + QName = amqqueue:get_name(Queue), + QPid = amqqueue:get_pid(Queue), + QMRef = erlang:monitor(process, QPid), + subscribe(Ch, QName#resource.name), + + lists:foreach( + fun(Node) -> + rabbit_ct_broker_helpers:block_traffic_between(Node2, Node) + end, Majority), + clustering_utils:assert_cluster_status({Nodes, Majority}, Majority), + + IsAutoDeleteDurable = case QueueDeclare of + #'queue.declare'{auto_delete = true, + durable = true} -> + true; + _ -> + false + end, + case rabbit_ct_broker_helpers:configured_metadata_store(Config) of + mnesia when not IsAutoDeleteDurable -> + clustering_utils:assert_cluster_status({Nodes, []}, [Node2]), + + %% With Mnesia, the client connection is terminated (the node is + %% stopped thanks to the pause_minority partition handling) and + %% the exclusive queue is deleted. + receive + {'DOWN', CMRef, _, _, Reason1} -> + ct:pal("Connection ~p exited: ~p", [Conn, Reason1]), + case Reason1 of + {shutdown, {server_initiated_close, _, _}} -> + ok; + {channel0_died, {shutdown, _}} -> + ok; + _ -> + ct:fail("Unexpected termination reason: ~p", [Reason1]) + end, + ok + after Timeout -> + ct:fail("Connection ~p still running", [Conn]) + end, + receive + {'DOWN', QMRef, _, _, Reason2} -> + ct:pal("Queue ~p exited: ~p", [QPid, Reason2]), + ?assertEqual(normal, Reason2), + ok + after Timeout -> + ct:fail("Queue ~p still running", [QPid]) + end, + + %% The queue was also deleted from the metadata store on nodes on + %% the majority side. + lists:foreach( + fun(Node) -> + ?awaitMatch( + {error, not_found}, + begin + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, + rabbit_amqqueue, lookup, [QName]), + ct:pal( + "Queue lookup on node ~0p: ~p", + [Node, Ret]), + Ret + end, Timeout) + end, Majority), + + %% We can resolve the network partition. + lists:foreach( + fun(Node) -> + rabbit_ct_broker_helpers:allow_traffic_between( + Node2, Node) + end, Majority), + clustering_utils:assert_cluster_status({Nodes, Nodes}, Nodes), + + %% The queue is not recorded anywhere. + lists:foreach( + fun(Node) -> + ?awaitMatch( + {error, not_found}, + begin + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, + rabbit_amqqueue, lookup, [QName]), + ct:pal( + "Queue lookup on node ~0p: ~p", + [Node, Ret]), + Ret + end, Timeout) + end, Nodes), + ok; + + mnesia when IsAutoDeleteDurable -> + %% An auto-delete durable queue seems to survive a network + %% partition or a node loss. Thue, there is nothing to test in the + %% scope of this test case. + ok; + + khepri -> + clustering_utils:assert_cluster_status({Nodes, [Node2]}, [Node2]), + + %% The queue is still recorded everywhere. + lists:foreach( + fun(Node) -> + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_amqqueue, lookup, [QName]), + ct:pal("Queue lookup on node ~0p: ~p", [Node, Ret]), + ?assertEqual({ok, Queue}, Ret) + end, Nodes), + + %% Prepare a publisher. + {PConn, + PCh} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, Node2), + publish_many(PCh, QName#resource.name, 10), + consume(10), + + %% We resolve the network partition. + lists:foreach( + fun(Node) -> + rabbit_ct_broker_helpers:allow_traffic_between( + Node2, Node) + end, Majority), + clustering_utils:assert_cluster_status({Nodes, Nodes}, Nodes), + + publish_many(PCh, QName#resource.name, 10), + consume(10), + + rabbit_ct_client_helpers:close_connection_and_channel(PConn, PCh), + + %% We terminate the channel and connection: the queue should + %% terminate and the metadata store should have no record of it. + _ = rabbit_ct_client_helpers:close_connection_and_channel( + Conn, Ch), + + receive + {'DOWN', CMRef, _, _, Reason1} -> + ct:pal("Connection ~p exited: ~p", [Conn, Reason1]), + ?assertEqual({shutdown, normal}, Reason1), + ok + after Timeout -> + ct:fail("Connection ~p still running", [Conn]) + end, + receive + {'DOWN', QMRef, _, _, Reason} -> + ct:pal("Queue ~p exited: ~p", [QPid, Reason]), + ?assertEqual(normal, Reason), + ok + after Timeout -> + ct:fail("Queue ~p still running", [QPid]) + end, + + %% The queue was also deleted from the metadata store on all + %% nodes. + lists:foreach( + fun(Node) -> + ?awaitMatch( + {error, not_found}, + begin + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, + rabbit_amqqueue, lookup, [QName]), + ct:pal( + "Queue lookup on node ~0p: ~p", + [Node, Ret]), + Ret + end, Timeout) + end, Nodes), + ok + end. + +autodelete_transient_queue_after_partition_recovery_2(Config) -> + QueueDeclare = #'queue.declare'{auto_delete = true, + durable = false}, + temporary_queue_after_partition_recovery_2(Config, QueueDeclare). + +autodelete_durable_queue_after_partition_recovery_2(Config) -> + QueueDeclare = #'queue.declare'{auto_delete = true, + durable = true}, + temporary_queue_after_partition_recovery_2(Config, QueueDeclare). + +exclusive_transient_queue_after_partition_recovery_2(Config) -> + QueueDeclare = #'queue.declare'{exclusive = true, + durable = true}, + temporary_queue_after_partition_recovery_2(Config, QueueDeclare). + +exclusive_durable_queue_after_partition_recovery_2(Config) -> + QueueDeclare = #'queue.declare'{exclusive = true, + durable = true}, + temporary_queue_after_partition_recovery_2(Config, QueueDeclare). + +temporary_queue_after_partition_recovery_2(Config, QueueDeclare) -> + [_Node1, Node2 | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + Majority = Nodes -- [Node2], + Timeout = 60000, + + {Conn1, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, Node2), + CMRef1 = erlang:monitor(process, Conn1), + {Conn2, Ch2} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, Node2), + CMRef2 = erlang:monitor(process, Conn2), + + %% We create an exclusive queue on node 1 and get its PID on the server + %% side. + ?assertMatch(#'queue.declare_ok'{}, amqp_channel:call(Ch1, QueueDeclare)), + ?assertMatch(#'queue.declare_ok'{}, amqp_channel:call(Ch2, QueueDeclare)), + Queues = rabbit_ct_broker_helpers:rpc( + Config, Node2, rabbit_amqqueue, list, []), + ?assertMatch([_, _], Queues), + [Queue1, Queue2] = Queues, + ct:pal("Queues = ~p", [Queues]), + + QName1 = amqqueue:get_name(Queue1), + QPid1 = amqqueue:get_pid(Queue1), + QMRef1 = erlang:monitor(process, QPid1), + subscribe(Ch1, QName1#resource.name), + + QName2 = amqqueue:get_name(Queue2), + QPid2 = amqqueue:get_pid(Queue2), + QMRef2 = erlang:monitor(process, QPid2), + subscribe(Ch2, QName2#resource.name), + + lists:foreach( + fun(Node) -> + rabbit_ct_broker_helpers:block_traffic_between(Node2, Node) + end, Majority), + clustering_utils:assert_cluster_status({Nodes, Majority}, Majority), + clustering_utils:assert_cluster_status({Nodes, [Node2]}, [Node2]), + + %% The queues are still recorded everywhere. + lists:foreach( + fun(Node) -> + Ret1 = rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_amqqueue, lookup, [QName1]), + Ret2 = rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_amqqueue, lookup, [QName2]), + ct:pal( + "Queues lookup on node ~0p:~n ~p~n~p", + [Node, Ret1, Ret2]), + ?assertEqual({ok, Queue1}, Ret1), + ?assertEqual({ok, Queue2}, Ret2) + end, Nodes), + + %% Publich to and consume from the queue. + ct:pal("Open connection"), + {_PConn, PCh} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, Node2), + ct:pal("Publish messages to Q1"), + publish_many(PCh, QName1#resource.name, 10), + ct:pal("Publish messages to Q2"), + publish_many(PCh, QName2#resource.name, 10), + ct:pal("Consume all messages"), + consume(20), + + %% Close the first consuming client to trigger the queue deletion during + %% the network partition. Because of the network partition, the queue + %% process exits but it couldn't delete the queue record. + ct:pal("Close connection 1"), + _ = spawn(fun() -> + rabbit_ct_client_helpers:close_connection_and_channel( + Conn1, Ch1) + end), + + ct:pal("Wait for connection 1 DOWN"), + receive + {'DOWN', CMRef1, _, _, Reason1_1} -> + ct:pal("Connection ~p exited: ~p", [Conn1, Reason1_1]), + ?assertEqual({shutdown, normal}, Reason1_1), + ok + after Timeout -> + ct:fail("Connection ~p still running", [Conn1]) + end, + ct:pal("Wait for queue 1 DOWN"), + receive + {'DOWN', QMRef1, _, _, Reason1_2} -> + ct:pal("Queue ~p exited: ~p", [QPid1, Reason1_2]), + ?assertEqual(normal, Reason1_2), + ok + after Timeout -> + ct:fail("Queue ~p still running", [QPid1]) + end, + + %% We sleep to let the queue record deletion reach the timeout. It should + %% retry indefinitely. + KhepriTimeout = rabbit_ct_broker_helpers:rpc( + Config, Node2, khepri_app, get_default_timeout, []), + ct:pal("Sleep > ~b ms", [KhepriTimeout]), + timer:sleep(KhepriTimeout + 10000), + + %% The queue process exited but the queue record should still be there. The + %% temporary process is still trying to delete it but can't during the + %% network partition. + ?awaitMatch( + {ok, _}, + begin + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node2, rabbit_amqqueue, lookup, [QName1]), + ct:pal("Queue lookup on node ~0p: ~p", [Node2, Ret]), + Ret + end, Timeout), + + %% Close the second consuming client to trigger the queue deletion during + %% the network partition. This time, the partition is solved while the + %% queue process tries to delete the record. + ct:pal("Close connection 2"), + _ = spawn(fun() -> + rabbit_ct_client_helpers:close_connection_and_channel( + Conn2, Ch2) + end), + + ct:pal("Wait for connection 2 DOWN"), + receive + {'DOWN', CMRef2, _, _, Reason2_1} -> + ct:pal("Connection ~p exited: ~p", [Conn2, Reason2_1]), + ?assertEqual({shutdown, normal}, Reason2_1), + ok + after Timeout -> + ct:fail("Connection ~p still running", [Conn2]) + end, + ct:pal("Wait for queue 2 DOWN"), + receive + {'DOWN', QMRef2, _, _, Reason2_2} -> + ct:pal("Queue ~p exited: ~p", [QPid2, Reason2_2]), + ?assertEqual(normal, Reason2_2), + ok + after Timeout -> + ct:fail("Queue ~p still running", [QPid2]) + end, + + %% Again, the queue process exited but the queue record should still be + %% there. The temporary process is still trying to delete it but can't + %% during the network partition. + ?awaitMatch( + {ok, _}, + begin + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node2, rabbit_amqqueue, lookup, [QName2]), + ct:pal("Queue lookup on node ~0p: ~p", [Node2, Ret]), + Ret + end, Timeout), + + %% We resolve the network partition. + lists:foreach( + fun(Node) -> + ct:pal("Allow traffic with ~s", [Node]), + rabbit_ct_broker_helpers:allow_traffic_between( + Node2, Node) + end, Majority), + ct:pal("Cluster status"), + clustering_utils:assert_cluster_status({Nodes, Nodes}, Nodes), + + %% The first queue was deleted from the metadata store on all nodes. + lists:foreach( + fun(Node) -> + ?awaitMatch( + {error, not_found}, + begin + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_amqqueue, lookup, [QName1]), + ct:pal("Queue lookup on node ~0p: ~p", [Node, Ret]), + Ret + end, Timeout) + end, Nodes), + + %% The second queue was deleted from the metadata store on all nodes. + lists:foreach( + fun(Node) -> + ?awaitMatch( + {error, not_found}, + begin + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_amqqueue, lookup, [QName2]), + ct:pal("Queue lookup on node ~0p: ~p", [Node, Ret]), + Ret + end, Timeout) + end, Nodes), + ok. + +autodelete_transient_queue_after_node_loss(Config) -> + QueueDeclare = #'queue.declare'{auto_delete = true, + durable = false}, + temporary_queue_after_node_loss(Config, QueueDeclare). + +autodelete_durable_queue_after_node_loss(Config) -> + QueueDeclare = #'queue.declare'{auto_delete = true, + durable = true}, + temporary_queue_after_node_loss(Config, QueueDeclare). + +exclusive_transient_queue_after_node_loss(Config) -> + QueueDeclare = #'queue.declare'{exclusive = true, + durable = false}, + temporary_queue_after_node_loss(Config, QueueDeclare). + +exclusive_durable_queue_after_node_loss(Config) -> + QueueDeclare = #'queue.declare'{exclusive = true, + durable = true}, + temporary_queue_after_node_loss(Config, QueueDeclare). + +temporary_queue_after_node_loss(Config, QueueDeclare) -> + [Node1, Node2, Node3] = Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + Majority = Nodes -- [Node2], + Timeout = 60000, + + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, Node2), + + %% We create an exclusive queue on node 1. + ?assertMatch(#'queue.declare_ok'{}, amqp_channel:call(Ch, QueueDeclare)), + Queues = rabbit_ct_broker_helpers:rpc( + Config, Node2, rabbit_amqqueue, list, []), + ?assertMatch([_], Queues), + [Queue] = Queues, + ct:pal("Queue = ~p", [Queue]), + + QName = amqqueue:get_name(Queue), + + %% We kill the node. + rabbit_ct_broker_helpers:kill_node(Config, Node2), + + ct:pal("Wait for new Khepri leader to be elected"), + lists:foreach( + fun(Node) -> + ?awaitMatch( + {ok, LeaderNode} + when LeaderNode =:= Node1 orelse LeaderNode =:= Node3, + get_leader_node(Config, Node), + Timeout) + end, Majority), + + IsAutoDeleteDurable = case QueueDeclare of + #'queue.declare'{auto_delete = true, + durable = true} -> + true; + _ -> + false + end, + case rabbit_ct_broker_helpers:configured_metadata_store(Config) of + mnesia when not IsAutoDeleteDurable -> + clustering_utils:assert_cluster_status( + {Nodes, Majority}, Majority), + + %% The queue is already deleted from the metadata store on + %% remaining nodes. + lists:foreach( + fun(Node) -> + ?awaitMatch( + {error, not_found}, + begin + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, + rabbit_amqqueue, lookup, [QName]), + ct:pal( + "Queue lookup on node ~0p: ~p", + [Node, Ret]), + Ret + end, Timeout) + end, Majority), + ok; + + mnesia when IsAutoDeleteDurable -> + %% An auto-delete durable queue seems to survive a network + %% partition or a node loss. Thue, there is nothing to test in the + %% scope of this test case. + ok; + + khepri -> + clustering_utils:assert_cluster_status( + {Nodes, Majority}, Majority), + + %% The queue is still recorded on the remaining nodes. + lists:foreach( + fun(Node) -> + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_amqqueue, lookup, [QName]), + ct:pal("Queue lookup on node ~0p: ~p", [Node, Ret]), + ?assertEqual({ok, Queue}, Ret) + end, Majority), + + %% We remove the lost node from the cluster. + ?assertEqual( + ok, + rabbit_ct_broker_helpers:forget_cluster_node( + Config, Node3, Node2)), + clustering_utils:assert_cluster_status( + {Majority, Majority}, Majority), + + %% The queue was deleted from the metadata store on remaining + %% nodes. + lists:foreach( + fun(Node) -> + ?awaitMatch( + {error, not_found}, + begin + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node, + rabbit_amqqueue, lookup, [QName]), + ct:pal( + "Queue lookup on node ~0p: ~p", + [Node, Ret]), + Ret + end, Timeout) + end, Majority), + ok + end. + %% ------------------------------------------------------------------- %% Internal utils %% ------------------------------------------------------------------- diff --git a/deps/rabbit/test/rabbit_db_queue_SUITE.erl b/deps/rabbit/test/rabbit_db_queue_SUITE.erl index 9ee433524869..ba6245695c9f 100644 --- a/deps/rabbit/test/rabbit_db_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_db_queue_SUITE.erl @@ -500,7 +500,7 @@ foreach_durable1(_Config) -> QName1 = rabbit_misc:r(?VHOST, queue, <<"test-queue1">>), Q1 = new_queue(QName1, rabbit_classic_queue), ?assertEqual(ok, rabbit_db_queue:set(Q1)), - ?assertEqual(ok, rabbit_db_queue:foreach_durable( + ?assertEqual(ok, rabbit_db_queue:foreach( fun(Q0) -> rabbit_db_queue:internal_delete(amqqueue:get_name(Q0), true, normal) end, @@ -566,7 +566,7 @@ internal_delete1(_Config) -> QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>), Q = new_queue(QName, rabbit_classic_queue), ?assertEqual(ok, rabbit_db_queue:set(Q)), - ?assertEqual(ok, rabbit_db_queue:foreach_durable( + ?assertEqual(ok, rabbit_db_queue:foreach( fun(Q0) -> rabbit_db_queue:internal_delete(amqqueue:get_name(Q0), false, normal) end, fun(Q0) when ?is_amqqueue(Q0) -> true end)),