Skip to content

Commit 3a75b6c

Browse files
committed
rabbit_db: Eliminate the delete_queue Khepri transaction
... by using `keep_while` conditions on bindings and auto-delete exchanges. [Why] The `delete_queue` transaction's anonymous function has to be be extracted by Horus, like any Khepri transaction. This is an expensive operation, but Horus uses caching to avoid most work after the first extraction. The problem is when there are many concurrent executions of the same transaction, before it has been executed once: the cache is not hot and Horus has to extract the same transaction many times in parallel currently. An example of this situation is when there are massive disconnections from RabbitMQ clients that trigger massive queue deletions. This can put a lot of load on RabbitMQ. [How] This patch removes the entire transaction. Instead, it uses `keep_while` conditions on bindings and auto-delete exchanges to let Khepri handle the deletion of semantically related tree nodes. RabbitMQ just has to make a simle "delete this queue" command.
1 parent ac86eb6 commit 3a75b6c

File tree

3 files changed

+109
-26
lines changed

3 files changed

+109
-26
lines changed

deps/rabbit/src/rabbit_db_binding.erl

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
has_for_source_in_mnesia/1,
3636
has_for_source_in_khepri/1,
3737
match_source_and_destination_in_khepri_tx/2,
38-
clear_in_khepri/0
38+
clear_in_khepri/0,
39+
khepri_ret_to_deletions/2
3940
]).
4041

4142
-export([
@@ -201,6 +202,14 @@ create_in_khepri(#binding{source = SrcName,
201202
case ChecksFun(Src, Dst) of
202203
ok ->
203204
RoutePath = khepri_route_path(Binding),
205+
DstPath = case DstName of
206+
#resource{kind = queue} ->
207+
rabbit_db_queue:khepri_queue_path(DstName);
208+
#resource{kind = exchange} ->
209+
rabbit_db_exchange:khepri_exchange_path(DstName)
210+
end,
211+
KeepWhile = #{DstPath => #if_node_exists{}},
212+
PutOptions = #{keep_while => KeepWhile},
204213
MaybeSerial = rabbit_exchange:serialise_events(Src),
205214
Serial = rabbit_khepri:transaction(
206215
fun() ->
@@ -210,11 +219,17 @@ create_in_khepri(#binding{source = SrcName,
210219
true ->
211220
already_exists;
212221
false ->
213-
ok = khepri_tx:put(RoutePath, sets:add_element(Binding, Set)),
222+
ok = khepri_tx:put(
223+
RoutePath,
224+
sets:add_element(Binding, Set),
225+
PutOptions),
214226
serial_in_khepri(MaybeSerial, Src)
215227
end;
216228
_ ->
217-
ok = khepri_tx:put(RoutePath, sets:add_element(Binding, sets:new([{version, 2}]))),
229+
ok = khepri_tx:put(
230+
RoutePath,
231+
sets:add_element(Binding, sets:new([{version, 2}])),
232+
PutOptions),
218233
serial_in_khepri(MaybeSerial, Src)
219234
end
220235
end, rw),
@@ -906,6 +921,7 @@ delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, na
906921
Name,
907922
?KHEPRI_WILDCARD_STAR), %% RoutingKey
908923
{ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern),
924+
% logger:alert("BindingsMap = ~p", [BindingsMap]),
909925
Bindings = maps:fold(
910926
fun(Path, Props, Acc) ->
911927
case {Path, Props} of
@@ -920,6 +936,38 @@ delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, na
920936
rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4,
921937
lists:keysort(#binding.source, Bindings), OnlyDurable).
922938

939+
khepri_ret_to_deletions(Deleted, OnlyDurable) ->
940+
Bindings0 = maps:fold(
941+
fun(Path, Props, Acc) ->
942+
case {Path, Props} of
943+
{?RABBITMQ_KHEPRI_ROUTE_PATH(
944+
_VHost, _SrcName, _Kind, _Name, _RoutingKey),
945+
#{data := Set}} ->
946+
sets:to_list(Set) ++ Acc;
947+
{_, _} ->
948+
Acc
949+
end
950+
end, [], Deleted),
951+
Bindings1 = lists:keysort(#binding.source, Bindings0),
952+
rabbit_binding:group_bindings_fold(
953+
fun(XName, Bindings, Deletions, _OnlyDurable) ->
954+
ExchangePath = rabbit_db_exchange:khepri_exchange_path(XName),
955+
case Deleted of
956+
#{ExchangePath := #{data := X}} ->
957+
rabbit_binding:add_deletion(
958+
XName, X, deleted, Bindings, Deletions);
959+
_ ->
960+
case rabbit_db_exchange:get(XName) of
961+
{ok, X} ->
962+
rabbit_binding:add_deletion(
963+
XName, X, not_deleted, Bindings, Deletions);
964+
_ ->
965+
Deletions
966+
end
967+
end
968+
end,
969+
Bindings1, OnlyDurable).
970+
923971
%% -------------------------------------------------------------------
924972
%% delete_transient_for_destination_in_mnesia().
925973
%% -------------------------------------------------------------------

deps/rabbit/src/rabbit_db_exchange.erl

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,24 @@ create_or_get_in_khepri(#exchange{name = XName} = X) ->
414414
Path0, [#if_any{conditions =
415415
[#if_node_exists{exists = false},
416416
#if_has_payload{has_payload = false}]}]),
417-
case rabbit_khepri:put(Path1, X) of
417+
Options = case X of
418+
#exchange{name = #resource{virtual_host = VHost,
419+
name = Name},
420+
auto_delete = true} ->
421+
Path = rabbit_db_binding:khepri_route_path(
422+
VHost,
423+
Name,
424+
_Kind = ?KHEPRI_WILDCARD_STAR,
425+
_DstName = ?KHEPRI_WILDCARD_STAR,
426+
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
427+
KeepWhile = #{Path => #if_all{conditions =
428+
[#if_node_exists{},
429+
#if_has_data{}]}},
430+
#{keep_while => KeepWhile};
431+
_ ->
432+
#{}
433+
end,
434+
case rabbit_khepri:put(Path1, X, Options) of
418435
ok ->
419436
{new, X};
420437
{error, {khepri, mismatching_node, #{node_props := #{data := ExistingX}}}} ->

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -411,28 +411,46 @@ delete_in_khepri(QueueName) ->
411411
delete_in_khepri(QueueName, false).
412412

413413
delete_in_khepri(QueueName, OnlyDurable) ->
414-
rabbit_khepri:transaction(
415-
fun () ->
416-
Path = khepri_queue_path(QueueName),
417-
UsesUniformWriteRet = try
418-
khepri_tx:does_api_comply_with(uniform_write_ret)
419-
catch
420-
error:undef ->
421-
false
422-
end,
423-
case khepri_tx_adv:delete(Path) of
424-
{ok, #{Path := #{data := _}}} when UsesUniformWriteRet ->
425-
%% we want to execute some things, as decided by rabbit_exchange,
426-
%% after the transaction.
427-
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
428-
{ok, #{data := _}} when not UsesUniformWriteRet ->
429-
%% we want to execute some things, as decided by rabbit_exchange,
430-
%% after the transaction.
431-
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
432-
{ok, _} ->
433-
ok
434-
end
435-
end, rw).
414+
Path = khepri_queue_path(QueueName),
415+
FeatureFlag = true,
416+
case FeatureFlag of
417+
true ->
418+
case khepri_adv:delete(Path) of
419+
{ok, #{Path := #{data := _}} = Deleted} ->
420+
%% we want to execute some things, as decided by
421+
%% rabbit_exchange, after the transaction.
422+
rabbit_db_binding:khepri_ret_to_deletions(
423+
Deleted, OnlyDurable);
424+
{ok, _} ->
425+
ok;
426+
{error, _} = Error ->
427+
Error
428+
end;
429+
false ->
430+
UsesUniformWriteRet = try
431+
khepri_tx:does_api_comply_with(uniform_write_ret)
432+
catch
433+
error:undef ->
434+
false
435+
end,
436+
rabbit_khepri:transaction(
437+
fun () ->
438+
Ret1 = khepri_tx_adv:delete(Path),
439+
% logger:alert("Deleted queue ret = ~p", [Ret1]),
440+
case Ret1 of
441+
{ok, #{Path := #{data := _}}} when UsesUniformWriteRet ->
442+
%% we want to execute some things, as decided by rabbit_exchange,
443+
%% after the transaction.
444+
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
445+
{ok, #{data := _}} when not UsesUniformWriteRet ->
446+
%% we want to execute some things, as decided by rabbit_exchange,
447+
%% after the transaction.
448+
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
449+
{ok, _} ->
450+
ok
451+
end
452+
end, rw)
453+
end.
436454

437455
%% -------------------------------------------------------------------
438456
%% internal_delete().

0 commit comments

Comments
 (0)