Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 27 additions & 23 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

-export([recover/1, stop/1, start/1, declare/6, declare/7,
delete_immediately/1, delete_exclusive/2, delete/4, purge/1,
forget_all_durable/1]).
forget_all/1]).
-export([pseudo_queue/2, pseudo_queue/3]).
-export([exists/1, lookup/1, lookup/2, lookup_durable_queue/1,
not_found_or_absent_dirty/1,
Expand Down Expand Up @@ -1882,19 +1882,19 @@ internal_delete(Queue, ActingUser, Reason) ->
{user_who_performed_action, ActingUser}])
end.

-spec forget_all_durable(node()) -> 'ok'.
-spec forget_all(node()) -> 'ok'.

%% TODO this is used by `rabbit_mnesia:remove_node_if_mnesia_running`
%% Does it make any sense once mnesia is not used/removed?
forget_all_durable(Node) ->
?LOG_INFO("Will remove all classic queues from node ~ts. The node is likely being removed from the cluster.", [Node]),
%% This is used by `rabbit_mnesia:remove_node_if_mnesia_running/1' and
%% `rabbit_khepri:remove_*_member/1'.
forget_all(Node) ->
?LOG_INFO("Will remove all queues from node ~ts. The node is likely being removed from the cluster.", [Node]),
UpdateFun = fun(Q) ->
forget_node_for_queue(Q)
end,
FilterFun = fun(Q) ->
is_local_to_node(amqqueue:get_pid(Q), Node)
end,
rabbit_db_queue:foreach_durable(UpdateFun, FilterFun).
rabbit_db_queue:foreach(UpdateFun, FilterFun).

forget_node_for_queue(Q)
when ?amqqueue_is_quorum(Q) ->
Expand Down Expand Up @@ -1936,27 +1936,31 @@ is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is_pid(Q) ->

-spec on_node_up(node()) -> 'ok'.

on_node_up(_Node) ->
ok.
on_node_up(Node) ->
case rabbit_khepri:is_enabled() of
true ->
%% With Khepri, we try to delete transient queues now because it's
%% possible any updates timed out because of the lack of a quorum
%% while `Node' was down.
ok = delete_transient_queues_on_node(Node);
false ->
ok
end.

-spec on_node_down(node()) -> 'ok'.

on_node_down(Node) ->
case delete_transient_queues_on_node(Node) of
ok ->
case rabbit_khepri:is_enabled() of
true ->
%% With Khepri, we don't delete transient/exclusive queues. There
%% may be a network partition and the node will be reachable again
%% after the partition is repaired.
%%
%% If the node will never come back, it will likely be removed from
%% the cluster. We take care of transient queues at that time.
ok;
{error, timeout} ->
%% This case is possible when running Khepri. The node going down
%% could leave the cluster in a minority so the command to delete
%% the transient queue records would fail. Also see
%% `rabbit_khepri:init/0': we also try this deletion when the node
%% restarts - a time that the cluster is very likely to have a
%% majority - to ensure these records are deleted.
?LOG_WARNING("transient queues for node '~ts' could not be "
"deleted because of a timeout. These queues "
"will be removed when node '~ts' restarts or "
"is removed from the cluster.", [Node, Node]),
ok
false ->
ok = delete_transient_queues_on_node(Node)
end.

-spec delete_transient_queues_on_node(Node) -> Ret when
Expand Down
69 changes: 57 additions & 12 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -352,21 +352,66 @@ terminate_delete(EmitStats, Reason0, ReplyTo,
fun() -> emit_stats(State) end);
true -> ok
end,
%% This try-catch block transforms throws to errors since throws are not
%% logged. When mnesia is removed this `try` can be removed: Khepri
%% returns errors as error tuples instead.
Reply = try rabbit_amqqueue:internal_delete(Q, ActingUser, Reason0) of
ok ->
{ok, Len};
{error, _} = Err ->
Err
catch
{error, ReasonE} -> error(ReasonE)
end,
send_reply(ReplyTo, Reply),
case ReplyTo of
_ when ReplyTo =/= none ->
Reply = case delete_queue_record(Q, ActingUser, Reason0) of
ok ->
{ok, Len};
{error, _} = Err ->
Err
end,
send_reply(ReplyTo, Reply);
none ->
%% No processes are waiting for this queue process to exit. We
%% can handle the deletion of the queue record differently: if
%% the deletion times out, we retry indefinitely.
%%
%% For instance, this allows an auto-delete queue process to
%% wait and retry until a network partition is resolved (or
%% this node stops of course). This reduces the risk of a
%% "leak" of a queue record in the metadata store.
%%
%% If for whatever reason the queue record is still leaked
%% (this process could not delete it before it was killed), the
%% "leaked" queue record will be cleaned up when the partition
%% is solved (or this node is removed from the cluster).
%% Indeed, when the partition is solved, all nodes are notified
%% with the `node_up' message from `rabbit_node_monitor'. This
%% calls `rabbit_amqqueue:on_node_up/1' which will delete any
%% transient queues.
%%
%% This infinite delete attempts loop is executed in a
%% separate process to let this queue process exits. This way,
%% connections will be notified that the queue process is
%% gone.
worker_pool:submit_async(
fun() ->
_ = infinite_internal_delete(Q, ActingUser, Reason0)
end),
ok
end,
BQS1
end.

infinite_internal_delete(Q, ActingUser, Reason) ->
case delete_queue_record(Q, ActingUser, Reason) of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the node is partitioned with a khepri leader on it this code could grow the khepri log infinitely.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. Then, I need to explore @lhoguin’s idea of waiting for the node_up message.

Copy link
Collaborator Author

@dumbbell dumbbell Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, using a Khepri fence after the first delete attempt should be enough: the call waits for all updates to be applied locally. I just pushed that change.

{error, timeout} ->
_ = rabbit_khepri:fence(infinity),
infinite_internal_delete(Q, ActingUser, Reason);
Ret ->
Ret
end.

delete_queue_record(Q, ActingUser, Reason) ->
%% This try-catch block transforms throws to errors since throws are not
%% logged. When mnesia is removed this `try` can be removed: Khepri returns
%% errors as error tuples instead.
try
rabbit_amqqueue:internal_delete(Q, ActingUser, Reason)
catch
{error, ReasonE} -> error(ReasonE)
end.

terminated_by({terminated_by, auto_delete}) ->
?INTERNAL_USER;
terminated_by({terminated_by, ActingUser}) ->
Expand Down
29 changes: 16 additions & 13 deletions deps/rabbit/src/rabbit_db_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
delete/2,
update/2,
update_decorators/2,
exists/1
exists/1,
foreach/2
]).

%% Once mnesia is removed, all transient entities will be deleted. These can be replaced
Expand All @@ -57,8 +58,7 @@
%% Only used by rabbit_amqqueue:forget_node_for_queue, which is only called
%% by `rabbit_mnesia:remove_node_if_mnesia_running`. Thus, once mnesia and/or
%% HA queues are removed it can be deleted.
-export([foreach_durable/2,
internal_delete/3]).
-export([internal_delete/3]).

%% Storing it on Khepri is not needed, this function is just used in
%% rabbit_quorum_queue to ensure the queue is present in the rabbit_queue
Expand Down Expand Up @@ -1263,20 +1263,26 @@ foreach_transient_in_khepri(UpdateFun) ->
end.

%% -------------------------------------------------------------------
%% foreach_durable().
%% foreach().
%% -------------------------------------------------------------------

-spec foreach_durable(UpdateFun, FilterFun) -> ok when
-spec foreach(UpdateFun, FilterFun) -> ok when
UpdateFun :: fun((Queue) -> any()),
FilterFun :: fun((Queue) -> boolean()).
%% @doc Applies `UpdateFun' to all durable queue records that match `FilterFun'.
%% @doc Applies `UpdateFun' to all queue records that match `FilterFun'.
%%
%% With Mnesia, only durable queues are considered because we use the durable
%% queues table.
%%
%% With Khepri, all queues are considered because they are all in the same
%% "table".
%%
%% @private

foreach_durable(UpdateFun, FilterFun) ->
foreach(UpdateFun, FilterFun) ->
rabbit_khepri:handle_fallback(
#{mnesia => fun() -> foreach_durable_in_mnesia(UpdateFun, FilterFun) end,
khepri => fun() -> foreach_durable_in_khepri(UpdateFun, FilterFun) end
khepri => fun() -> foreach_in_khepri(UpdateFun, FilterFun) end
}).

foreach_durable_in_mnesia(UpdateFun, FilterFun) ->
Expand All @@ -1292,11 +1298,8 @@ foreach_durable_in_mnesia(UpdateFun, FilterFun) ->
end),
ok.

foreach_durable_in_khepri(UpdateFun, FilterFun) ->
Path = khepri_queue_path(
?KHEPRI_WILDCARD_STAR,
#if_data_matches{
pattern = amqqueue:pattern_match_on_durable(true)}),
foreach_in_khepri(UpdateFun, FilterFun) ->
Path = khepri_queue_path(?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
case rabbit_khepri:filter(Path, fun(_, #{data := Q}) ->
FilterFun(Q)
end) of
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ remove_reachable_member(NodeToRemove) ->
NodeToRemove, khepri_cluster, reset, [?RA_CLUSTER_NAME]),
case Ret of
ok ->
rabbit_amqqueue:forget_all_durable(NodeToRemove),
rabbit_amqqueue:forget_all(NodeToRemove),
?LOG_DEBUG(
"Node ~s removed from Khepri cluster \"~s\"",
[NodeToRemove, ?RA_CLUSTER_NAME],
Expand All @@ -692,7 +692,7 @@ remove_down_member(NodeToRemove) ->
Ret = ra:remove_member(ServerRef, ServerId, Timeout),
case Ret of
{ok, _, _} ->
rabbit_amqqueue:forget_all_durable(NodeToRemove),
rabbit_amqqueue:forget_all(NodeToRemove),
?LOG_DEBUG(
"Node ~s removed from Khepri cluster \"~s\"",
[NodeToRemove, ?RA_CLUSTER_NAME],
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_mnesia.erl
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ remove_node_if_mnesia_running(Node) ->
case mnesia:del_table_copy(schema, Node) of
{atomic, ok} ->
rabbit_node_monitor:notify_left_cluster(Node),
rabbit_amqqueue:forget_all_durable(Node),
rabbit_amqqueue:forget_all(Node),
ok;
{aborted, Reason} ->
{error, {failed_to_remove_node, Node, Reason}}
Expand Down
32 changes: 22 additions & 10 deletions deps/rabbit/src/rabbit_node_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -430,16 +430,8 @@ handle_call(status, _From, State = #state{partitions = Partitions}) ->
handle_call(_Request, _From, State) ->
{noreply, State}.

handle_cast(notify_node_up, State = #state{guid = GUID}) ->
Nodes = rabbit_nodes:list_reachable() -- [node()],
gen_server:abcast(Nodes, ?SERVER,
{node_up, node(), rabbit_db_cluster:node_type(), GUID}),
%% register other active rabbits with this rabbit
DiskNodes = rabbit_db_cluster:disc_members(),
[gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of
true -> disc;
false -> ram
end}) || N <- Nodes],
handle_cast(notify_node_up, State) ->
do_notify_node_up(State),
{noreply, State};

%%----------------------------------------------------------------------------
Expand Down Expand Up @@ -665,6 +657,12 @@ handle_info({nodedown, Node, Info}, State) ->

handle_info({nodeup, Node, _Info}, State) ->
?LOG_INFO("node ~tp up", [Node]),
%% We notify that `rabbit' is up here too (in addition to the message sent
%% explicitly by a boot step. That's because nodes may go down then up
%% during a network partition, and with Khepri, nodes are not restarted
%% (unlike with some partition handling strategies used with Mnesia), and
%% thus the boot steps are not executed.
do_notify_node_up(State),
{noreply, State};

handle_info({mnesia_system_event,
Expand Down Expand Up @@ -854,6 +852,20 @@ wait_for_cluster_recovery(Condition) ->
wait_for_cluster_recovery(Condition)
end.

do_notify_node_up(#state{guid = GUID}) ->
Nodes = rabbit_nodes:list_reachable() -- [node()],
gen_server:abcast(Nodes, ?SERVER,
{node_up, node(), rabbit_db_cluster:node_type(), GUID}),
%% register other active rabbits with this rabbit
DiskNodes = rabbit_db_cluster:disc_members(),
_ = [gen_server:cast(
?SERVER,
{node_up, N, case lists:member(N, DiskNodes) of
true -> disc;
false -> ram
end}) || N <- Nodes],
ok.

handle_dead_rabbit(Node, State) ->
%% TODO: This may turn out to be a performance hog when there are
%% lots of nodes. We really only need to execute some of these
Expand Down
45 changes: 28 additions & 17 deletions deps/rabbit/test/bindings_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,36 @@ end_per_group(_, Config) ->
rabbit_ct_broker_helpers:teardown_steps()).

init_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
Name = rabbit_data_coercion:to_binary(Testcase),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange, [Name]),
Config2 = rabbit_ct_helpers:set_config(Config1,
[{queue_name, Name},
{alt_queue_name, <<Name/binary, "_alt">>},
{exchange_name, Name}
]),
rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()).
case {Testcase, rabbit_ct_broker_helpers:configured_metadata_store(Config)} of
{transient_queue_on_node_down, khepri} ->
{skip, "Test irrelevant with Khepri"};
_ ->
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
Name = rabbit_data_coercion:to_binary(Testcase),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange, [Name]),
Config2 = rabbit_ct_helpers:set_config(
Config1,
[{queue_name, Name},
{alt_queue_name, <<Name/binary, "_alt">>},
{exchange_name, Name}
]),
rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps())
end.

end_per_testcase(Testcase, Config) ->
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange,
[?config(exchange_name, Config)]),
Config1 = rabbit_ct_helpers:run_steps(
Config,
rabbit_ct_client_helpers:teardown_steps()),
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
case {Testcase, rabbit_ct_broker_helpers:configured_metadata_store(Config)} of
{transient_queue_on_node_down, khepri} ->
Config;
_ ->
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange,
[?config(exchange_name, Config)]),
Config1 = rabbit_ct_helpers:run_steps(
Config,
rabbit_ct_client_helpers:teardown_steps()),
rabbit_ct_helpers:testcase_finished(Config1, Testcase)
end.

%% -------------------------------------------------------------------
%% Testcases.
Expand Down
Loading
Loading