Skip to content
Merged
3 changes: 2 additions & 1 deletion .github/workflows/test-make-target.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ jobs:
uses: dsaltares/fetch-gh-release-asset@master
if: inputs.mixed_clusters
with:
version: 'tags/v4.0.5'
repo: 'rabbitmq/server-packages'
version: 'tags/alphas.1744021065493'
regex: true
file: "rabbitmq-server-generic-unix-\\d.+\\.tar\\.xz"
target: ./
Expand Down
48 changes: 32 additions & 16 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -837,17 +837,25 @@ delete_all_for_exchange_in_khepri(X = #exchange{name = XName}, OnlyDurable, Remo
end,
{deleted, X, Bindings, delete_for_destination_in_khepri(XName, OnlyDurable)}.

delete_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
Path = khepri_route_path(
VHost,
Name,
_Kind = ?KHEPRI_WILDCARD_STAR,
_DstName = ?KHEPRI_WILDCARD_STAR,
_RoutingKey = #if_has_data{}),
{ok, Bindings} = khepri_tx_adv:delete_many(Path),
maps:fold(fun(_P, #{data := Set}, Acc) ->
sets:to_list(Set) ++ Acc
end, [], Bindings).
delete_for_source_in_khepri(#resource{virtual_host = VHost, name = SrcName}) ->
Pattern = khepri_route_path(
VHost,
SrcName,
?KHEPRI_WILDCARD_STAR, %% Kind
?KHEPRI_WILDCARD_STAR, %% DstName
#if_has_data{}), %% RoutingKey
{ok, Bindings} = khepri_tx_adv:delete_many(Pattern),
maps:fold(
fun(Path, Props, Acc) ->
case {Path, Props} of
{?RABBITMQ_KHEPRI_ROUTE_PATH(
VHost, SrcName, _Kind, _Name, _RoutingKey),
#{data := Set}} ->
sets:to_list(Set) ++ Acc;
{_, _} ->
Acc
end
end, [], Bindings).

%% -------------------------------------------------------------------
%% delete_for_destination_in_mnesia().
Expand Down Expand Up @@ -892,14 +900,22 @@ delete_for_destination_in_mnesia(DstName, OnlyDurable, Fun) ->
delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}, OnlyDurable) ->
Pattern = khepri_route_path(
VHost,
_SrcName = ?KHEPRI_WILDCARD_STAR,
?KHEPRI_WILDCARD_STAR, %% SrcName
Kind,
Name,
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
?KHEPRI_WILDCARD_STAR), %% RoutingKey
{ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern),
Bindings = maps:fold(fun(_, #{data := Set}, Acc) ->
sets:to_list(Set) ++ Acc
end, [], BindingsMap),
Bindings = maps:fold(
fun(Path, Props, Acc) ->
case {Path, Props} of
{?RABBITMQ_KHEPRI_ROUTE_PATH(
VHost, _SrcName, Kind, Name, _RoutingKey),
#{data := Set}} ->
sets:to_list(Set) ++ Acc;
{_, _} ->
Acc
end
end, [], BindingsMap),
rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4,
lists:keysort(#binding.source, Bindings), OnlyDurable).

Expand Down
26 changes: 16 additions & 10 deletions deps/rabbit/src/rabbit_db_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ update_in_khepri(XName, Fun) ->
Path = khepri_exchange_path(XName),
Ret1 = rabbit_khepri:adv_get(Path),
case Ret1 of
{ok, #{data := X, payload_version := Vsn}} ->
{ok, #{Path := #{data := X, payload_version := Vsn}}} ->
X1 = Fun(X),
UpdatePath =
khepri_path:combine_with_conditions(
Expand Down Expand Up @@ -534,8 +534,7 @@ next_serial_in_khepri(XName) ->
Path = khepri_exchange_serial_path(XName),
Ret1 = rabbit_khepri:adv_get(Path),
case Ret1 of
{ok, #{data := Serial,
payload_version := Vsn}} ->
{ok, #{Path := #{data := Serial, payload_version := Vsn}}} ->
UpdatePath =
khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = Vsn}]),
Expand Down Expand Up @@ -711,13 +710,20 @@ delete_all_in_khepri_tx(VHostName) ->
{ok, NodeProps} = khepri_tx_adv:delete_many(Pattern),
Deletions =
maps:fold(
fun(_Path, #{data := X}, Deletions) ->
{deleted, #exchange{name = XName}, Bindings, XDeletions} =
rabbit_db_binding:delete_all_for_exchange_in_khepri(
X, false, true),
Deletions1 = rabbit_binding:add_deletion(
XName, X, deleted, Bindings, XDeletions),
rabbit_binding:combine_deletions(Deletions, Deletions1)
fun(Path, Props, Deletions) ->
case {Path, Props} of
{?RABBITMQ_KHEPRI_EXCHANGE_PATH(VHostName, _),
#{data := X}} ->
{deleted,
#exchange{name = XName}, Bindings, XDeletions} =
rabbit_db_binding:delete_all_for_exchange_in_khepri(
X, false, true),
Deletions1 = rabbit_binding:add_deletion(
XName, X, deleted, Bindings, XDeletions),
rabbit_binding:combine_deletions(Deletions, Deletions1);
{_, _} ->
Deletions
end
end, rabbit_binding:new_deletions(), NodeProps),
{ok, Deletions}.

Expand Down
5 changes: 3 additions & 2 deletions deps/rabbit/src/rabbit_db_msup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) ->
mirroring_pid = Overall,
childspec = ChildSpec},
case rabbit_khepri:adv_get(Path) of
{ok, #{data := #mirrored_sup_childspec{mirroring_pid = Pid},
payload_version := Vsn}} ->
{ok, #{Path := #{data := #mirrored_sup_childspec{mirroring_pid = Pid},
payload_version := Vsn}}} ->
case Overall of
Pid ->
Delegate;
Expand All @@ -160,6 +160,7 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) ->
end
end;
_ ->
%% FIXME: Not atomic with the get above.
ok = rabbit_khepri:put(Path, S),
start
end.
Expand Down
67 changes: 45 additions & 22 deletions deps/rabbit/src/rabbit_db_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,18 @@ delete_in_khepri(QueueName, OnlyDurable) ->
rabbit_khepri:transaction(
fun () ->
Path = khepri_queue_path(QueueName),
UsesUniformWriteRet = try
khepri_tx:does_api_comply_with(uniform_write_ret)
catch
error:undef ->
false
end,
case khepri_tx_adv:delete(Path) of
{ok, #{data := _}} ->
{ok, #{Path := #{data := _}}} when UsesUniformWriteRet ->
%% we want to execute some things, as decided by rabbit_exchange,
%% after the transaction.
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
{ok, #{data := _}} when not UsesUniformWriteRet ->
%% we want to execute some things, as decided by rabbit_exchange,
%% after the transaction.
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
Expand Down Expand Up @@ -607,7 +617,7 @@ update_in_khepri(QName, Fun) ->
Path = khepri_queue_path(QName),
Ret1 = rabbit_khepri:adv_get(Path),
case Ret1 of
{ok, #{data := Q, payload_version := Vsn}} ->
{ok, #{Path := #{data := Q, payload_version := Vsn}}} ->
UpdatePath = khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = Vsn}]),
Q1 = Fun(Q),
Expand Down Expand Up @@ -658,7 +668,7 @@ update_decorators_in_khepri(QName, Decorators) ->
Path = khepri_queue_path(QName),
Ret1 = rabbit_khepri:adv_get(Path),
case Ret1 of
{ok, #{data := Q1, payload_version := Vsn}} ->
{ok, #{Path := #{data := Q1, payload_version := Vsn}}} ->
Q2 = amqqueue:set_decorators(Q1, Decorators),
UpdatePath = khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = Vsn}]),
Expand Down Expand Up @@ -1098,15 +1108,12 @@ delete_transient_in_khepri(FilterFun) ->
case rabbit_khepri:adv_get_many(PathPattern) of
{ok, Props} ->
Qs = maps:fold(
fun(Path0, #{data := Q, payload_version := Vsn}, Acc)
fun(Path, #{data := Q, payload_version := Vsn}, Acc)
when ?is_amqqueue(Q) ->
case FilterFun(Q) of
true ->
Path = khepri_path:combine_with_conditions(
Path0,
[#if_payload_version{version = Vsn}]),
QName = amqqueue:get_name(Q),
[{Path, QName} | Acc];
[{Path, Vsn, QName} | Acc];
false ->
Acc
end
Expand All @@ -1125,20 +1132,7 @@ do_delete_transient_queues_in_khepri([], _FilterFun) ->
do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
Res = rabbit_khepri:transaction(
fun() ->
rabbit_misc:fold_while_ok(
fun({Path, QName}, Acc) ->
%% Also see `delete_in_khepri/2'.
case khepri_tx_adv:delete(Path) of
{ok, #{data := _}} ->
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
QName, false),
{ok, [{QName, Deletions} | Acc]};
{ok, _} ->
{ok, Acc};
{error, _} = Error ->
Error
end
end, [], Qs)
do_delete_transient_queues_in_khepri_tx(Qs, [])
end),
case Res of
{ok, Items} ->
Expand All @@ -1152,6 +1146,35 @@ do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
Error
end.

do_delete_transient_queues_in_khepri_tx([], Acc) ->
{ok, Acc};
do_delete_transient_queues_in_khepri_tx([{Path, Vsn, QName} | Rest], Acc) ->
%% Also see `delete_in_khepri/2'.
VersionedPath = khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = Vsn}]),
UsesUniformWriteRet = try
khepri_tx:does_api_comply_with(uniform_write_ret)
catch
error:undef ->
false
end,
case khepri_tx_adv:delete(VersionedPath) of
{ok, #{Path := #{data := _}}} when UsesUniformWriteRet ->
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
QName, false),
Acc1 = [{QName, Deletions} | Acc],
do_delete_transient_queues_in_khepri_tx(Rest, Acc1);
{ok, #{data := _}} when not UsesUniformWriteRet ->
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
QName, false),
Acc1 = [{QName, Deletions} | Acc],
do_delete_transient_queues_in_khepri_tx(Rest, Acc1);
{ok, _} ->
do_delete_transient_queues_in_khepri_tx(Rest, Acc);
{error, _} = Error ->
Error
end.

%% -------------------------------------------------------------------
%% foreach_transient().
%% -------------------------------------------------------------------
Expand Down
34 changes: 27 additions & 7 deletions deps/rabbit/src/rabbit_db_rtparams.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ set_in_khepri(Key, Term) ->
Record = #runtime_parameters{key = Key,
value = Term},
case rabbit_khepri:adv_put(Path, Record) of
{ok, #{data := Params}} ->
{ok, #{Path := #{data := Params}}} ->
{old, Params#runtime_parameters.value};
{ok, _} ->
new
Expand Down Expand Up @@ -113,8 +113,16 @@ set_in_khepri_tx(Key, Term) ->
Path = khepri_rp_path(Key),
Record = #runtime_parameters{key = Key,
value = Term},
UsesUniformWriteRet = try
khepri_tx:does_api_comply_with(uniform_write_ret)
catch
error:undef ->
false
end,
case khepri_tx_adv:put(Path, Record) of
{ok, #{data := Params}} ->
{ok, #{Path := #{data := Params}}} when UsesUniformWriteRet ->
{old, Params#runtime_parameters.value};
{ok, #{data := Params}} when not UsesUniformWriteRet ->
{old, Params#runtime_parameters.value};
{ok, _} ->
new
Expand Down Expand Up @@ -347,11 +355,23 @@ delete_vhost_in_mnesia_tx(VHostName) ->
<- mnesia:match_object(?MNESIA_TABLE, Match, read)].

delete_vhost_in_khepri(VHostName) ->
Path = khepri_vhost_rp_path(
VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
case rabbit_khepri:adv_delete_many(Path) of
{ok, Props} ->
{ok, rabbit_khepri:collect_payloads(Props)};
Pattern = khepri_vhost_rp_path(
VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
case rabbit_khepri:adv_delete_many(Pattern) of
{ok, NodePropsMap} ->
RTParams =
maps:fold(
fun(Path, Props, Acc) ->
case {Path, Props} of
{?RABBITMQ_KHEPRI_VHOST_RUNTIME_PARAM_PATH(
VHostName, _, _),
#{data := RTParam}} ->
[RTParam | Acc];
{_, _} ->
Acc
end
end, [], NodePropsMap),
{ok, RTParams};
{error, _} = Err ->
Err
end.
Expand Down
46 changes: 34 additions & 12 deletions deps/rabbit/src/rabbit_db_user.erl
Original file line number Diff line number Diff line change
Expand Up @@ -628,20 +628,42 @@ clear_all_permissions_for_vhost_in_mnesia(VHostName) ->
clear_all_permissions_for_vhost_in_khepri(VHostName) ->
rabbit_khepri:transaction(
fun() ->
UserPermissionsPath = khepri_user_permission_path(
?KHEPRI_WILDCARD_STAR, VHostName),
TopicPermissionsPath = khepri_topic_permission_path(
?KHEPRI_WILDCARD_STAR, VHostName,
?KHEPRI_WILDCARD_STAR),
{ok, UserProps} = khepri_tx_adv:delete_many(UserPermissionsPath),
{ok, TopicProps} = khepri_tx_adv:delete_many(
TopicPermissionsPath),
Deletions = rabbit_khepri:collect_payloads(
TopicProps,
rabbit_khepri:collect_payloads(UserProps)),
{ok, Deletions}
clear_all_permissions_for_vhost_in_khepri_tx(VHostName)
end, rw, #{timeout => infinity}).

clear_all_permissions_for_vhost_in_khepri_tx(VHostName) ->
UserPermissionsPattern = khepri_user_permission_path(
?KHEPRI_WILDCARD_STAR, VHostName),
TopicPermissionsPattern = khepri_topic_permission_path(
?KHEPRI_WILDCARD_STAR, VHostName,
?KHEPRI_WILDCARD_STAR),
{ok, UserNodePropsMap} = khepri_tx_adv:delete_many(UserPermissionsPattern),
{ok, TopicNodePropsMap} = khepri_tx_adv:delete_many(
TopicPermissionsPattern),
Deletions0 =
maps:fold(
fun(Path, Props, Acc) ->
case {Path, Props} of
{?RABBITMQ_KHEPRI_USER_PERMISSION_PATH(VHostName, _),
#{data := Permission}} ->
[Permission | Acc];
{_, _} ->
Acc
end
end, [], UserNodePropsMap),
Deletions1 =
maps:fold(
fun(Path, Props, Acc) ->
case {Path, Props} of
{?RABBITMQ_KHEPRI_TOPIC_PERMISSION_PATH(VHostName, _, _),
#{data := Permission}} ->
[Permission | Acc];
{_, _} ->
Acc
end
end, Deletions0, TopicNodePropsMap),
{ok, Deletions1}.

%% -------------------------------------------------------------------
%% get_topic_permissions().
%% -------------------------------------------------------------------
Expand Down
6 changes: 3 additions & 3 deletions deps/rabbit/src/rabbit_db_vhost.erl
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ merge_metadata_in_khepri(VHostName, Metadata) ->
Path = khepri_vhost_path(VHostName),
Ret1 = rabbit_khepri:adv_get(Path),
case Ret1 of
{ok, #{data := VHost0, payload_version := DVersion}} ->
{ok, #{Path := #{data := VHost0, payload_version := DVersion}}} ->
VHost = vhost:merge_metadata(VHost0, Metadata),
rabbit_log:debug("Updating a virtual host record ~p", [VHost]),
Path1 = khepri_path:combine_with_conditions(
Expand Down Expand Up @@ -443,10 +443,10 @@ update_in_mnesia_tx(VHostName, UpdateFun)
update_in_khepri(VHostName, UpdateFun) ->
Path = khepri_vhost_path(VHostName),
case rabbit_khepri:adv_get(Path) of
{ok, #{data := V, payload_version := DVersion}} ->
{ok, #{Path := #{data := V, payload_version := Vsn}}} ->
V1 = UpdateFun(V),
Path1 = khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = DVersion}]),
Path, [#if_payload_version{version = Vsn}]),
case rabbit_khepri:put(Path1, V1) of
ok ->
V1;
Expand Down
Loading
Loading