diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 4deecdd157de..3bdf8ad6a177 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -70,6 +70,7 @@ -export([queue/1, queue_names/1]). -export([kill_queue/2, kill_queue/3, kill_queue_hard/2, kill_queue_hard/3]). +-export([delete_transient_queues_on_node/1]). %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, @@ -1839,13 +1840,39 @@ on_node_up(_Node) -> -spec on_node_down(node()) -> 'ok'. on_node_down(Node) -> + case delete_transient_queues_on_node(Node) of + ok -> + ok; + {error, timeout} -> + %% This case is possible when running Khepri. The node going down + %% could leave the cluster in a minority so the command to delete + %% the transient queue records would fail. Also see + %% `rabbit_khepri:init/0': we also try this deletion when the node + %% restarts - a time that the cluster is very likely to have a + %% majority - to ensure these records are deleted. + rabbit_log:warning("transient queues for node '~ts' could not be " + "deleted because of a timeout. These queues " + "will be removed when node '~ts' restarts or " + "is removed from the cluster.", [Node, Node]), + ok + end. + +-spec delete_transient_queues_on_node(Node) -> Ret when + Node :: node(), + Ret :: ok | rabbit_khepri:timeout_error(). + +delete_transient_queues_on_node(Node) -> {Time, Ret} = timer:tc(fun() -> rabbit_db_queue:delete_transient(filter_transient_queues_to_delete(Node)) end), case Ret of - ok -> ok; - {QueueNames, Deletions} -> + ok -> + ok; + {error, timeout} = Err -> + Err; + {QueueNames, Deletions} when is_list(QueueNames) -> case length(QueueNames) of 0 -> ok; - N -> rabbit_log:info("~b transient queues from an old incarnation of node ~tp deleted in ~fs", + N -> rabbit_log:info("~b transient queues from node '~ts' " + "deleted in ~fs", [N, Node, Time / 1_000_000]) end, notify_queue_binding_deletions(Deletions), diff --git a/deps/rabbit/src/rabbit_db.erl b/deps/rabbit/src/rabbit_db.erl index 7dcae084876b..faa4dd28e6b3 100644 --- a/deps/rabbit/src/rabbit_db.erl +++ b/deps/rabbit/src/rabbit_db.erl @@ -100,15 +100,10 @@ init_using_mnesia() -> rabbit_sup:start_child(mnesia_sync). init_using_khepri() -> - case rabbit_khepri:members() of - [] -> - timer:sleep(1000), - init_using_khepri(); - Members -> - ?LOG_WARNING( - "Found the following metadata store members: ~p", [Members], - #{domain => ?RMQLOG_DOMAIN_DB}) - end. + ?LOG_DEBUG( + "DB: initialize Khepri", + #{domain => ?RMQLOG_DOMAIN_DB}), + rabbit_khepri:init(). init_finished() -> %% Used during initialisation by rabbit_logger_exchange_h.erl diff --git a/deps/rabbit/src/rabbit_db_queue.erl b/deps/rabbit/src/rabbit_db_queue.erl index 3ffa50594df1..d1e1829d5873 100644 --- a/deps/rabbit/src/rabbit_db_queue.erl +++ b/deps/rabbit/src/rabbit_db_queue.erl @@ -1012,7 +1012,8 @@ set_many_in_khepri(Qs) -> Queue :: amqqueue:amqqueue(), FilterFun :: fun((Queue) -> boolean()), QName :: rabbit_amqqueue:name(), - Ret :: {[QName], [Deletions :: rabbit_binding:deletions()]}. + Ret :: {[QName], [Deletions :: rabbit_binding:deletions()]} + | rabbit_khepri:timeout_error(). %% @doc Deletes all transient queues that match `FilterFun'. %% %% @private @@ -1073,26 +1074,59 @@ delete_transient_in_khepri(FilterFun) -> %% process might call itself. Instead we can fetch all of the transient %% queues with `get_many' and then filter and fold the results outside of %% Khepri's Ra server process. - case rabbit_khepri:get_many(PathPattern) of - {ok, Qs} -> - Items = maps:fold( - fun(Path, Queue, Acc) when ?is_amqqueue(Queue) -> - case FilterFun(Queue) of - true -> - QueueName = khepri_queue_path_to_name( - Path), - case delete_in_khepri(QueueName, false) of - ok -> - Acc; - Deletions -> - [{QueueName, Deletions} | Acc] - end; - false -> - Acc - end - end, [], Qs), - {QueueNames, Deletions} = lists:unzip(Items), - {QueueNames, lists:flatten(Deletions)}; + case rabbit_khepri:adv_get_many(PathPattern) of + {ok, Props} -> + Qs = maps:fold( + fun(Path0, #{data := Q, payload_version := Vsn}, Acc) + when ?is_amqqueue(Q) -> + case FilterFun(Q) of + true -> + Path = khepri_path:combine_with_conditions( + Path0, + [#if_payload_version{version = Vsn}]), + QName = amqqueue:get_name(Q), + [{Path, QName} | Acc]; + false -> + Acc + end + end, [], Props), + do_delete_transient_queues_in_khepri(Qs, FilterFun); + {error, _} = Error -> + Error + end. + +do_delete_transient_queues_in_khepri([], _FilterFun) -> + %% If there are no changes to make, avoid performing a transaction. When + %% Khepri is in a minority this avoids a long timeout waiting for the + %% transaction command to be processed. Otherwise it avoids appending a + %% somewhat large transaction command to Khepri's log. + {[], []}; +do_delete_transient_queues_in_khepri(Qs, FilterFun) -> + Res = rabbit_khepri:transaction( + fun() -> + rabbit_misc:fold_while_ok( + fun({Path, QName}, Acc) -> + %% Also see `delete_in_khepri/2'. + case khepri_tx_adv:delete(Path) of + {ok, #{data := _}} -> + Deletions = rabbit_db_binding:delete_for_destination_in_khepri( + QName, false), + {ok, [{QName, Deletions} | Acc]}; + {ok, _} -> + {ok, Acc}; + {error, _} = Error -> + Error + end + end, [], Qs) + end), + case Res of + {ok, Items} -> + {QNames, Deletions} = lists:unzip(Items), + {QNames, lists:flatten(Deletions)}; + {error, {khepri, mismatching_node, _}} -> + %% One of the queues changed while attempting to update all + %% queues. Retry the operation. + delete_transient_in_khepri(FilterFun); {error, _} = Error -> Error end. @@ -1366,6 +1400,3 @@ khepri_queues_path() -> khepri_queue_path(#resource{virtual_host = VHost, name = Name}) -> [?MODULE, queues, VHost, Name]. - -khepri_queue_path_to_name([?MODULE, queues, VHost, Name]) -> - rabbit_misc:r(VHost, queue, Name). diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index fabeda694637..98428f45a099 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -96,6 +96,7 @@ -export([setup/0, setup/1, + init/0, can_join_cluster/1, add_member/2, remove_member/1, @@ -323,6 +324,30 @@ wait_for_register_projections(Timeout, Retries) -> %% @private +-spec init() -> Ret when + Ret :: ok | timeout_error(). + +init() -> + case members() of + [] -> + timer:sleep(1000), + init(); + Members -> + ?LOG_NOTICE( + "Found the following metadata store members: ~p", [Members], + #{domain => ?RMQLOG_DOMAIN_DB}), + %% Delete transient queues on init. + %% Note that we also do this in the + %% `rabbit_amqqueue:on_node_down/1' callback. We must try this + %% deletion during init because the cluster may have been in a + %% minority when this node went down. We wait for a majority while + %% booting (via `rabbit_khepri:setup/0') though so this deletion is + %% likely to succeed. + rabbit_amqqueue:delete_transient_queues_on_node(node()) + end. + +%% @private + can_join_cluster(DiscoveryNode) when is_atom(DiscoveryNode) -> ThisNode = node(), try diff --git a/deps/rabbit_common/src/rabbit_misc.erl b/deps/rabbit_common/src/rabbit_misc.erl index af6fc536b046..c67d36adc8fe 100644 --- a/deps/rabbit_common/src/rabbit_misc.erl +++ b/deps/rabbit_common/src/rabbit_misc.erl @@ -89,7 +89,7 @@ maps_put_falsy/3 ]). -export([remote_sup_child/2]). --export([for_each_while_ok/2]). +-export([for_each_while_ok/2, fold_while_ok/3]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -1655,3 +1655,24 @@ for_each_while_ok(Fun, [Elem | Rest]) -> end; for_each_while_ok(_, []) -> ok. + +-spec fold_while_ok(FoldFun, Acc, List) -> Ret when + FoldFun :: fun((Element, Acc) -> {ok, Acc} | {error, ErrReason}), + Element :: any(), + List :: Element, + Ret :: {ok, Acc} | {error, ErrReason}. +%% @doc Calls the given `FoldFun' on each element of the given `List' and the +%% accumulator value, short-circuiting if the function returns `{error,_}'. +%% +%% @returns the first `{error,_}' returned by `FoldFun' or `{ok,Acc}' if +%% `FoldFun' never returns an error tuple. + +fold_while_ok(Fun, Acc0, [Elem | Rest]) -> + case Fun(Elem, Acc0) of + {ok, Acc} -> + fold_while_ok(Fun, Acc, Rest); + {error, _} = Error -> + Error + end; +fold_while_ok(_Fun, Acc, []) -> + {ok, Acc}.