Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions deps/rabbit/src/rabbit_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ init() ->
end,

Ret = case rabbit_khepri:is_enabled() of
true -> init_using_khepri();
false -> init_using_mnesia()
true -> init_using_khepri(IsVirgin);
false -> init_using_mnesia(IsVirgin)
end,
case Ret of
ok ->
Expand All @@ -91,19 +91,19 @@ pre_init(IsVirgin) ->
OtherMembers = rabbit_nodes:nodes_excl_me(Members),
rabbit_db_cluster:ensure_feature_flags_are_in_sync(OtherMembers, IsVirgin).

init_using_mnesia() ->
init_using_mnesia(_IsVirgin) ->
?LOG_DEBUG(
"DB: initialize Mnesia",
#{domain => ?RMQLOG_DOMAIN_DB}),
ok = rabbit_mnesia:init(),
?assertEqual(rabbit:data_dir(), mnesia_dir()),
rabbit_sup:start_child(mnesia_sync).

init_using_khepri() ->
init_using_khepri(IsVirgin) ->
?LOG_DEBUG(
"DB: initialize Khepri",
#{domain => ?RMQLOG_DOMAIN_DB}),
rabbit_khepri:init().
rabbit_khepri:init(IsVirgin).

init_finished() ->
%% Used during initialisation by rabbit_logger_exchange_h.erl
Expand Down
30 changes: 12 additions & 18 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
-define(MNESIA_SEMI_DURABLE_TABLE, rabbit_semi_durable_route).
-define(MNESIA_REVERSE_TABLE, rabbit_reverse_route).
-define(MNESIA_INDEX_TABLE, rabbit_index_route).
-define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_bindings).
-define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_binding).
-define(KHEPRI_INDEX_ROUTE_PROJECTION, rabbit_khepri_index_route).

%% -------------------------------------------------------------------
Expand Down Expand Up @@ -835,9 +835,8 @@ delete_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
_Kind = ?KHEPRI_WILDCARD_STAR,
_DstName = ?KHEPRI_WILDCARD_STAR,
_RoutingKey = #if_has_data{}),
{ok, Bindings} = khepri_tx:get_many(Path),
ok = khepri_tx:delete_many(Path),
maps:fold(fun(_P, Set, Acc) ->
{ok, Bindings} = khepri_tx_adv:delete_many(Path),
maps:fold(fun(_P, #{data := Set}, Acc) ->
sets:to_list(Set) ++ Acc
end, [], Bindings).

Expand Down Expand Up @@ -881,25 +880,20 @@ delete_for_destination_in_mnesia(DstName, OnlyDurable, Fun) ->
OnlyDurable :: boolean(),
Deletions :: rabbit_binding:deletions().

delete_for_destination_in_khepri(DstName, OnlyDurable) ->
BindingsMap = match_destination_in_khepri(DstName),
maps:foreach(fun(K, _V) -> khepri_tx:delete(K) end, BindingsMap),
Bindings = maps:fold(fun(_, Set, Acc) ->
delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}, OnlyDurable) ->
Pattern = khepri_route_path(
VHost,
_SrcName = ?KHEPRI_WILDCARD_STAR,
Kind,
Name,
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
{ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern),
Bindings = maps:fold(fun(_, #{data := Set}, Acc) ->
sets:to_list(Set) ++ Acc
end, [], BindingsMap),
rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4,
lists:keysort(#binding.source, Bindings), OnlyDurable).

match_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}) ->
Path = khepri_route_path(
VHost,
_SrcName = ?KHEPRI_WILDCARD_STAR,
Kind,
Name,
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
{ok, Map} = khepri_tx:get_many(Path),
Map.

%% -------------------------------------------------------------------
%% delete_transient_for_destination_in_mnesia().
%% -------------------------------------------------------------------
Expand Down
64 changes: 64 additions & 0 deletions deps/rabbit/src/rabbit_db_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
peek_serial/1,
next_serial/1,
delete/2,
delete_all/1,
delete_serial/1,
recover/1,
match/1,
Expand Down Expand Up @@ -657,6 +658,69 @@ delete_in_khepri(X = #exchange{name = XName}, OnlyDurable, RemoveBindingsForSour
ok = khepri_tx:delete(khepri_exchange_path(XName)),
rabbit_db_binding:delete_all_for_exchange_in_khepri(X, OnlyDurable, RemoveBindingsForSource).

%% -------------------------------------------------------------------
%% delete_all().
%% -------------------------------------------------------------------

-spec delete_all(VHostName) -> Ret when
VHostName :: vhost:name(),
Deletions :: rabbit_binding:deletions(),
Ret :: {ok, Deletions}.
%% @doc Deletes all exchanges for a given vhost.
%%
%% @returns an `{ok, Deletions}' tuple containing the {@link
%% rabbit_binding:deletions()} caused by deleting the exchanges under the given
%% vhost.
%%
%% @private

delete_all(VHostName) ->
rabbit_khepri:handle_fallback(
#{mnesia => fun() -> delete_all_in_mnesia(VHostName) end,
khepri => fun() -> delete_all_in_khepri(VHostName) end
}).

delete_all_in_mnesia(VHostName) ->
rabbit_mnesia:execute_mnesia_transaction(
fun() ->
delete_all_in_mnesia_tx(VHostName)
end).

delete_all_in_mnesia_tx(VHostName) ->
Match = #exchange{name = rabbit_misc:r(VHostName, exchange), _ = '_'},
Xs = mnesia:match_object(?MNESIA_TABLE, Match, write),
Deletions =
lists:foldl(
fun(X, Acc) ->
{deleted, #exchange{name = XName}, Bindings, XDeletions} =
unconditional_delete_in_mnesia( X, false),
XDeletions1 = rabbit_binding:add_deletion(
XName, {X, deleted, Bindings}, XDeletions),
rabbit_binding:combine_deletions(Acc, XDeletions1)
end, rabbit_binding:new_deletions(), Xs),
{ok, Deletions}.

delete_all_in_khepri(VHostName) ->
rabbit_khepri:transaction(
fun() ->
delete_all_in_khepri_tx(VHostName)
end, rw, #{timeout => infinity}).

delete_all_in_khepri_tx(VHostName) ->
Pattern = khepri_exchange_path(VHostName, ?KHEPRI_WILDCARD_STAR),
{ok, NodeProps} = khepri_tx_adv:delete_many(Pattern),
Deletions =
maps:fold(
fun(_Path, #{data := X}, Deletions) ->
{deleted, #exchange{name = XName}, Bindings, XDeletions} =
rabbit_db_binding:delete_all_for_exchange_in_khepri(
X, false, true),
Deletions1 = rabbit_binding:add_deletion(
XName, {X, deleted, Bindings}, XDeletions),
rabbit_binding:combine_deletions(Deletions, Deletions1)
end, rabbit_binding:new_deletions(), NodeProps),
{ok, Deletions}.

%% -------------------------------------------------------------------
%% delete_serial().
%% -------------------------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_db_rtparams.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
]).

-define(MNESIA_TABLE, rabbit_runtime_parameters).
-define(KHEPRI_GLOBAL_PROJECTION, rabbit_khepri_global_rtparams).
-define(KHEPRI_VHOST_PROJECTION, rabbit_khepri_per_vhost_rtparams).
-define(KHEPRI_GLOBAL_PROJECTION, rabbit_khepri_global_rtparam).
-define(KHEPRI_VHOST_PROJECTION, rabbit_khepri_per_vhost_rtparam).
-define(any(Value), case Value of
'_' -> ?KHEPRI_WILDCARD_STAR;
_ -> Value
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_db_user.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@
-define(MNESIA_TABLE, rabbit_user).
-define(PERM_MNESIA_TABLE, rabbit_user_permission).
-define(TOPIC_PERM_MNESIA_TABLE, rabbit_topic_permission).
-define(KHEPRI_USERS_PROJECTION, rabbit_khepri_users).
-define(KHEPRI_PERMISSIONS_PROJECTION, rabbit_khepri_user_permissions).
-define(KHEPRI_USERS_PROJECTION, rabbit_khepri_user).
-define(KHEPRI_PERMISSIONS_PROJECTION, rabbit_khepri_user_permission).

%% -------------------------------------------------------------------
%% create().
Expand Down
13 changes: 12 additions & 1 deletion deps/rabbit/src/rabbit_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
update_scratch/3, update_decorators/2, immutable/1,
info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4,
route/2, route/3, delete/3, validate_binding/2, count/0,
ensure_deleted/3]).
ensure_deleted/3, delete_all/2]).
-export([list_names/0]).
-export([serialise_events/1]).
-export([serial/1, peek_serial/1]).
Expand Down Expand Up @@ -484,6 +484,17 @@ delete(XName, IfUnused, Username) ->
XName#resource.name, Username)
end.

-spec delete_all(VHostName, ActingUser) -> Ret when
VHostName :: vhost:name(),
ActingUser :: rabbit_types:username(),
Ret :: ok.

delete_all(VHostName, ActingUser) ->
{ok, Deletions} = rabbit_db_exchange:delete_all(VHostName),
Deletions1 = rabbit_binding:process_deletions(Deletions),
rabbit_binding:notify_deletions(Deletions1, ActingUser),
ok.

process_deletions({error, _} = E) ->
E;
process_deletions({deleted, #exchange{name = XName} = X, Bs, Deletions}) ->
Expand Down
Loading