Skip to content

Commit 21fcdab

Browse files
committed
rabbit_db_queue: Transactionally delete transient queues from Khepri
The prior code skirted transactions because the filter function might cause Khepri to call itself. We want to use the same idea as the old code - get all queues, filter them, then delete them - but we want to perform the deletion in a transaction and fail the transaction if any queues changed since we read them. This fixes a bug - that the call to `delete_in_khepri/2` could return an error tuple that would be improperly recognized as `Deletions` - but should also make deleting transient queues atomic and fast. Each call to `delete_in_khepri/2` needed to wait on Ra to replicate because the deletion is an individual command sent from one process. Performing all deletions at once means we only need to wait for one command to be replicated across the cluster. We also bubble up any errors to delete now rather than storing them as deletions. This fixes a crash that occurs on node down when Khepri is in a minority.
1 parent d0da0b5 commit 21fcdab

File tree

2 files changed

+81
-25
lines changed

2 files changed

+81
-25
lines changed

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 59 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,7 +1012,8 @@ set_many_in_khepri(Qs) ->
10121012
Queue :: amqqueue:amqqueue(),
10131013
FilterFun :: fun((Queue) -> boolean()),
10141014
QName :: rabbit_amqqueue:name(),
1015-
Ret :: {[QName], [Deletions :: rabbit_binding:deletions()]}.
1015+
Ret :: {[QName], [Deletions :: rabbit_binding:deletions()]}
1016+
| rabbit_khepri:timeout_error().
10161017
%% @doc Deletes all transient queues that match `FilterFun'.
10171018
%%
10181019
%% @private
@@ -1073,26 +1074,63 @@ delete_transient_in_khepri(FilterFun) ->
10731074
%% process might call itself. Instead we can fetch all of the transient
10741075
%% queues with `get_many' and then filter and fold the results outside of
10751076
%% Khepri's Ra server process.
1076-
case rabbit_khepri:get_many(PathPattern) of
1077-
{ok, Qs} ->
1078-
Items = maps:fold(
1079-
fun(Path, Queue, Acc) when ?is_amqqueue(Queue) ->
1080-
case FilterFun(Queue) of
1081-
true ->
1082-
QueueName = khepri_queue_path_to_name(
1083-
Path),
1084-
case delete_in_khepri(QueueName, false) of
1085-
ok ->
1086-
Acc;
1087-
Deletions ->
1088-
[{QueueName, Deletions} | Acc]
1089-
end;
1090-
false ->
1091-
Acc
1092-
end
1093-
end, [], Qs),
1094-
{QueueNames, Deletions} = lists:unzip(Items),
1095-
{QueueNames, lists:flatten(Deletions)};
1077+
case rabbit_khepri:adv_get_many(PathPattern) of
1078+
{ok, Props} ->
1079+
Qs = maps:fold(
1080+
fun(Path0, #{data := Q, payload_version := Vsn}, Acc)
1081+
when ?is_amqqueue(Q) ->
1082+
case FilterFun(Q) of
1083+
true ->
1084+
Path = khepri_path:combine_with_conditions(
1085+
Path0,
1086+
[#if_payload_version{version = Vsn}]),
1087+
QName = amqqueue:get_name(Q),
1088+
[{Path, QName} | Acc];
1089+
false ->
1090+
Acc
1091+
end
1092+
end, [], Props),
1093+
case Qs of
1094+
[] ->
1095+
%% If there are no changes to make, avoid performing a
1096+
%% transaction. When Khepri is in a minority this avoids
1097+
%% a long timeout waiting for the transaction command to
1098+
%% be processed. Otherwise it avoids appending a somewhat
1099+
%% large transaction command to Khepri's log.
1100+
{[], []};
1101+
_ ->
1102+
do_delete_transient_queues_in_khepri(Qs, FilterFun)
1103+
end;
1104+
{error, _} = Error ->
1105+
Error
1106+
end.
1107+
1108+
do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
1109+
Res = rabbit_khepri:transaction(
1110+
fun() ->
1111+
rabbit_misc:fold_while_ok(
1112+
fun({Path, QName}, Acc) ->
1113+
%% Also see `delete_in_khepri/2'.
1114+
case khepri_tx_adv:delete(Path) of
1115+
{ok, #{data := _}} ->
1116+
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
1117+
QName, false),
1118+
{ok, [{QName, Deletions} | Acc]};
1119+
{ok, _} ->
1120+
{ok, Acc};
1121+
{error, _} = Error ->
1122+
Error
1123+
end
1124+
end, [], Qs)
1125+
end),
1126+
case Res of
1127+
{ok, Items} ->
1128+
{QNames, Deletions} = lists:unzip(Items),
1129+
{QNames, lists:flatten(Deletions)};
1130+
{error, {khepri, mismatching_node, _}} ->
1131+
%% One of the queues changed while attempting to update all
1132+
%% queues. Retry the operation.
1133+
delete_transient_in_khepri(FilterFun);
10961134
{error, _} = Error ->
10971135
Error
10981136
end.
@@ -1366,6 +1404,3 @@ khepri_queues_path() ->
13661404

13671405
khepri_queue_path(#resource{virtual_host = VHost, name = Name}) ->
13681406
[?MODULE, queues, VHost, Name].
1369-
1370-
khepri_queue_path_to_name([?MODULE, queues, VHost, Name]) ->
1371-
rabbit_misc:r(VHost, queue, Name).

deps/rabbit_common/src/rabbit_misc.erl

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@
8989
maps_put_falsy/3
9090
]).
9191
-export([remote_sup_child/2]).
92-
-export([for_each_while_ok/2]).
92+
-export([for_each_while_ok/2, fold_while_ok/3]).
9393

9494
%% Horrible macro to use in guards
9595
-define(IS_BENIGN_EXIT(R),
@@ -1655,3 +1655,24 @@ for_each_while_ok(Fun, [Elem | Rest]) ->
16551655
end;
16561656
for_each_while_ok(_, []) ->
16571657
ok.
1658+
1659+
-spec fold_while_ok(FoldFun, Acc, List) -> Ret when
1660+
FoldFun :: fun((Element, Acc) -> {ok, Acc} | {error, ErrReason}),
1661+
Element :: any(),
1662+
List :: Element,
1663+
Ret :: {ok, Acc} | {error, ErrReason}.
1664+
%% @doc Calls the given `FoldFun' on each element of the given `List' and the
1665+
%% accumulator value, short-circuiting if the function returns `{error,_}'.
1666+
%%
1667+
%% @returns the first `{error,_}' returned by `FoldFun' or `{ok,Acc}' if
1668+
%% `FoldFun' never returns an error tuple.
1669+
1670+
fold_while_ok(Fun, Acc0, [Elem | Rest]) ->
1671+
case Fun(Elem, Acc0) of
1672+
{ok, Acc} ->
1673+
fold_while_ok(Fun, Acc, Rest);
1674+
{error, _} = Error ->
1675+
Error
1676+
end;
1677+
fold_while_ok(_Fun, Acc, []) ->
1678+
{ok, Acc}.

0 commit comments

Comments
 (0)