Skip to content

Commit f516c95

Browse files
committed
WIP
1 parent fd04424 commit f516c95

File tree

3 files changed

+103
-26
lines changed

3 files changed

+103
-26
lines changed

deps/rabbit/src/rabbit_db_binding.erl

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
has_for_source_in_mnesia/1,
3636
has_for_source_in_khepri/1,
3737
match_source_and_destination_in_khepri_tx/2,
38-
clear_in_khepri/0
38+
clear_in_khepri/0,
39+
khepri_ret_to_deletions/2
3940
]).
4041

4142
-export([
@@ -201,6 +202,14 @@ create_in_khepri(#binding{source = SrcName,
201202
case ChecksFun(Src, Dst) of
202203
ok ->
203204
RoutePath = khepri_route_path(Binding),
205+
DstPath = case DstName of
206+
#resource{kind = queue} ->
207+
rabbit_db_queue:khepri_queue_path(DstName);
208+
#resource{kind = exchange} ->
209+
rabbit_db_exchange:khepri_exchange_path(DstName)
210+
end,
211+
KeepWhile = #{DstPath => #if_node_exists{}},
212+
PutOptions = #{keep_while => KeepWhile},
204213
MaybeSerial = rabbit_exchange:serialise_events(Src),
205214
Serial = rabbit_khepri:transaction(
206215
fun() ->
@@ -210,11 +219,17 @@ create_in_khepri(#binding{source = SrcName,
210219
true ->
211220
already_exists;
212221
false ->
213-
ok = khepri_tx:put(RoutePath, sets:add_element(Binding, Set)),
222+
ok = khepri_tx:put(
223+
RoutePath,
224+
sets:add_element(Binding, Set),
225+
PutOptions),
214226
serial_in_khepri(MaybeSerial, Src)
215227
end;
216228
_ ->
217-
ok = khepri_tx:put(RoutePath, sets:add_element(Binding, sets:new([{version, 2}]))),
229+
ok = khepri_tx:put(
230+
RoutePath,
231+
sets:add_element(Binding, sets:new([{version, 2}])),
232+
PutOptions),
218233
serial_in_khepri(MaybeSerial, Src)
219234
end
220235
end, rw),
@@ -906,6 +921,7 @@ delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, na
906921
Name,
907922
?KHEPRI_WILDCARD_STAR), %% RoutingKey
908923
{ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern),
924+
% logger:alert("BindingsMap = ~p", [BindingsMap]),
909925
Bindings = maps:fold(
910926
fun(Path, Props, Acc) ->
911927
case {Path, Props} of
@@ -920,6 +936,38 @@ delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, na
920936
rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4,
921937
lists:keysort(#binding.source, Bindings), OnlyDurable).
922938

939+
khepri_ret_to_deletions(Deleted, OnlyDurable) ->
940+
Bindings0 = maps:fold(
941+
fun(Path, Props, Acc) ->
942+
case {Path, Props} of
943+
{?RABBITMQ_KHEPRI_ROUTE_PATH(
944+
_VHost, _SrcName, _Kind, _Name, _RoutingKey),
945+
#{data := Set}} ->
946+
sets:to_list(Set) ++ Acc;
947+
{_, _} ->
948+
Acc
949+
end
950+
end, [], Deleted),
951+
Bindings1 = lists:keysort(#binding.source, Bindings0),
952+
rabbit_binding:group_bindings_fold(
953+
fun(XName, Bindings, Deletions, _OnlyDurable) ->
954+
ExchangePath = rabbit_db_exchange:khepri_exchange_path(XName),
955+
case Deleted of
956+
#{ExchangePath := #{data := X}} ->
957+
rabbit_binding:add_deletion(
958+
XName, X, deleted, Bindings, Deletions);
959+
_ ->
960+
case rabbit_db_exchange:get(XName) of
961+
{ok, X} ->
962+
rabbit_binding:add_deletion(
963+
XName, X, not_deleted, Bindings, Deletions);
964+
_ ->
965+
Deletions
966+
end
967+
end
968+
end,
969+
Bindings1, OnlyDurable).
970+
923971
%% -------------------------------------------------------------------
924972
%% delete_transient_for_destination_in_mnesia().
925973
%% -------------------------------------------------------------------

deps/rabbit/src/rabbit_db_exchange.erl

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,24 @@ create_or_get_in_khepri(#exchange{name = XName} = X) ->
414414
Path0, [#if_any{conditions =
415415
[#if_node_exists{exists = false},
416416
#if_has_payload{has_payload = false}]}]),
417-
case rabbit_khepri:put(Path1, X) of
417+
Options = case X of
418+
#exchange{name = #resource{virtual_host = VHost,
419+
name = Name},
420+
auto_delete = true} ->
421+
Path = rabbit_db_binding:khepri_route_path(
422+
VHost,
423+
Name,
424+
_Kind = ?KHEPRI_WILDCARD_STAR,
425+
_DstName = ?KHEPRI_WILDCARD_STAR,
426+
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
427+
KeepWhile = #{Path => #if_all{conditions =
428+
[#if_node_exists{},
429+
#if_has_data{}]}},
430+
#{keep_while => KeepWhile};
431+
_ ->
432+
#{}
433+
end,
434+
case rabbit_khepri:put(Path1, X, Options) of
418435
ok ->
419436
{new, X};
420437
{error, {khepri, mismatching_node, #{node_props := #{data := ExistingX}}}} ->

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -411,28 +411,40 @@ delete_in_khepri(QueueName) ->
411411
delete_in_khepri(QueueName, false).
412412

413413
delete_in_khepri(QueueName, OnlyDurable) ->
414-
rabbit_khepri:transaction(
415-
fun () ->
416-
Path = khepri_queue_path(QueueName),
417-
UsesUniformWriteRet = try
418-
khepri_tx:does_api_comply_with(uniform_write_ret)
419-
catch
420-
error:undef ->
421-
false
422-
end,
423-
case khepri_tx_adv:delete(Path) of
424-
{ok, #{Path := #{data := _}}} when UsesUniformWriteRet ->
425-
%% we want to execute some things, as decided by rabbit_exchange,
426-
%% after the transaction.
427-
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
428-
{ok, #{data := _}} when not UsesUniformWriteRet ->
429-
%% we want to execute some things, as decided by rabbit_exchange,
430-
%% after the transaction.
431-
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
432-
{ok, _} ->
433-
ok
434-
end
435-
end, rw).
414+
FeatureFlag = true,
415+
case FeatureFlag of
416+
true ->
417+
Path = khepri_queue_path(QueueName),
418+
{ok, Deleted} = khepri_adv:delete(Path),
419+
Deletions = rabbit_db_binding:khepri_ret_to_deletions(
420+
Deleted, OnlyDurable),
421+
Deletions;
422+
false ->
423+
rabbit_khepri:transaction(
424+
fun () ->
425+
Path = khepri_queue_path(QueueName),
426+
UsesUniformWriteRet = try
427+
khepri_tx:does_api_comply_with(uniform_write_ret)
428+
catch
429+
error:undef ->
430+
false
431+
end,
432+
Ret1 = khepri_tx_adv:delete(Path),
433+
% logger:alert("Deleted queue ret = ~p", [Ret1]),
434+
case Ret1 of
435+
{ok, #{Path := #{data := _}}} when UsesUniformWriteRet ->
436+
%% we want to execute some things, as decided by rabbit_exchange,
437+
%% after the transaction.
438+
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
439+
{ok, #{data := _}} when not UsesUniformWriteRet ->
440+
%% we want to execute some things, as decided by rabbit_exchange,
441+
%% after the transaction.
442+
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
443+
{ok, _} ->
444+
ok
445+
end
446+
end, rw)
447+
end.
436448

437449
%% -------------------------------------------------------------------
438450
%% internal_delete().

0 commit comments

Comments
 (0)