Skip to content

Commit d12b9f5

Browse files
the-mikedavisdumbbell
authored andcommitted
rabbit_db_*: Handle breaking change in khepri adv API return type
All callers of `khepri_adv` and `khepri_tx_adv` need updates to handle the now consistent return type of `khepri:node_props_map()` in Khepri 0.17. We don't need any compatibility code to handle "either the old return type or the new return type" because the translation is done entirely in the "client side" code in Khepri - meaning that the return value from the Ra server is the same but it is translated differently by the functions in `khepri_adv` and `khepri_tx_adv`.
1 parent 854c335 commit d12b9f5

File tree

11 files changed

+138
-126
lines changed

11 files changed

+138
-126
lines changed

deps/rabbit/src/rabbit_db_binding.erl

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -837,17 +837,25 @@ delete_all_for_exchange_in_khepri(X = #exchange{name = XName}, OnlyDurable, Remo
837837
end,
838838
{deleted, X, Bindings, delete_for_destination_in_khepri(XName, OnlyDurable)}.
839839

840-
delete_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
841-
Path = khepri_route_path(
842-
VHost,
843-
Name,
844-
_Kind = ?KHEPRI_WILDCARD_STAR,
845-
_DstName = ?KHEPRI_WILDCARD_STAR,
846-
_RoutingKey = #if_has_data{}),
847-
{ok, Bindings} = khepri_tx_adv:delete_many(Path),
848-
maps:fold(fun(_P, #{data := Set}, Acc) ->
849-
sets:to_list(Set) ++ Acc
850-
end, [], Bindings).
840+
delete_for_source_in_khepri(#resource{virtual_host = VHost, name = SrcName}) ->
841+
Pattern = khepri_route_path(
842+
VHost,
843+
SrcName,
844+
?KHEPRI_WILDCARD_STAR, %% Kind
845+
?KHEPRI_WILDCARD_STAR, %% DstName
846+
#if_has_data{}), %% RoutingKey
847+
{ok, Bindings} = khepri_tx_adv:delete_many(Pattern),
848+
maps:fold(
849+
fun(Path, Props, Acc) ->
850+
case {Path, Props} of
851+
{?RABBITMQ_KHEPRI_ROUTE_PATH(
852+
VHost, SrcName, _Kind, _Name, _RoutingKey),
853+
#{data := Set}} ->
854+
sets:to_list(Set) ++ Acc;
855+
{_, _} ->
856+
Acc
857+
end
858+
end, [], Bindings).
851859

852860
%% -------------------------------------------------------------------
853861
%% delete_for_destination_in_mnesia().
@@ -892,14 +900,22 @@ delete_for_destination_in_mnesia(DstName, OnlyDurable, Fun) ->
892900
delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}, OnlyDurable) ->
893901
Pattern = khepri_route_path(
894902
VHost,
895-
_SrcName = ?KHEPRI_WILDCARD_STAR,
903+
?KHEPRI_WILDCARD_STAR, %% SrcName
896904
Kind,
897905
Name,
898-
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
906+
?KHEPRI_WILDCARD_STAR), %% RoutingKey
899907
{ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern),
900-
Bindings = maps:fold(fun(_, #{data := Set}, Acc) ->
901-
sets:to_list(Set) ++ Acc
902-
end, [], BindingsMap),
908+
Bindings = maps:fold(
909+
fun(Path, Props, Acc) ->
910+
case {Path, Props} of
911+
{?RABBITMQ_KHEPRI_ROUTE_PATH(
912+
VHost, _SrcName, Kind, Name, _RoutingKey),
913+
#{data := Set}} ->
914+
sets:to_list(Set) ++ Acc;
915+
{_, _} ->
916+
Acc
917+
end
918+
end, [], BindingsMap),
903919
rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4,
904920
lists:keysort(#binding.source, Bindings), OnlyDurable).
905921

deps/rabbit/src/rabbit_db_exchange.erl

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ update_in_khepri(XName, Fun) ->
331331
Path = khepri_exchange_path(XName),
332332
Ret1 = rabbit_khepri:adv_get(Path),
333333
case Ret1 of
334-
{ok, #{data := X, payload_version := Vsn}} ->
334+
{ok, #{Path := #{data := X, payload_version := Vsn}}} ->
335335
X1 = Fun(X),
336336
UpdatePath =
337337
khepri_path:combine_with_conditions(
@@ -534,8 +534,7 @@ next_serial_in_khepri(XName) ->
534534
Path = khepri_exchange_serial_path(XName),
535535
Ret1 = rabbit_khepri:adv_get(Path),
536536
case Ret1 of
537-
{ok, #{data := Serial,
538-
payload_version := Vsn}} ->
537+
{ok, #{Path := #{data := Serial, payload_version := Vsn}}} ->
539538
UpdatePath =
540539
khepri_path:combine_with_conditions(
541540
Path, [#if_payload_version{version = Vsn}]),
@@ -711,13 +710,20 @@ delete_all_in_khepri_tx(VHostName) ->
711710
{ok, NodeProps} = khepri_tx_adv:delete_many(Pattern),
712711
Deletions =
713712
maps:fold(
714-
fun(_Path, #{data := X}, Deletions) ->
715-
{deleted, #exchange{name = XName}, Bindings, XDeletions} =
716-
rabbit_db_binding:delete_all_for_exchange_in_khepri(
717-
X, false, true),
718-
Deletions1 = rabbit_binding:add_deletion(
719-
XName, X, deleted, Bindings, XDeletions),
720-
rabbit_binding:combine_deletions(Deletions, Deletions1)
713+
fun(Path, Props, Deletions) ->
714+
case {Path, Props} of
715+
{?RABBITMQ_KHEPRI_EXCHANGE_PATH(VHostName, _),
716+
#{data := X}} ->
717+
{deleted,
718+
#exchange{name = XName}, Bindings, XDeletions} =
719+
rabbit_db_binding:delete_all_for_exchange_in_khepri(
720+
X, false, true),
721+
Deletions1 = rabbit_binding:add_deletion(
722+
XName, X, deleted, Bindings, XDeletions),
723+
rabbit_binding:combine_deletions(Deletions, Deletions1);
724+
{_, _} ->
725+
Deletions
726+
end
721727
end, rabbit_binding:new_deletions(), NodeProps),
722728
{ok, Deletions}.
723729

deps/rabbit/src/rabbit_db_msup.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,8 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) ->
135135
mirroring_pid = Overall,
136136
childspec = ChildSpec},
137137
case rabbit_khepri:adv_get(Path) of
138-
{ok, #{data := #mirrored_sup_childspec{mirroring_pid = Pid},
139-
payload_version := Vsn}} ->
138+
{ok, #{Path := #{data := #mirrored_sup_childspec{mirroring_pid = Pid},
139+
payload_version := Vsn}}} ->
140140
case Overall of
141141
Pid ->
142142
Delegate;

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ delete_in_khepri(QueueName, OnlyDurable) ->
412412
fun () ->
413413
Path = khepri_queue_path(QueueName),
414414
case khepri_tx_adv:delete(Path) of
415-
{ok, #{data := _}} ->
415+
{ok, #{Path := #{data := _}}} ->
416416
%% we want to execute some things, as decided by rabbit_exchange,
417417
%% after the transaction.
418418
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
@@ -607,7 +607,7 @@ update_in_khepri(QName, Fun) ->
607607
Path = khepri_queue_path(QName),
608608
Ret1 = rabbit_khepri:adv_get(Path),
609609
case Ret1 of
610-
{ok, #{data := Q, payload_version := Vsn}} ->
610+
{ok, #{Path := #{data := Q, payload_version := Vsn}}} ->
611611
UpdatePath = khepri_path:combine_with_conditions(
612612
Path, [#if_payload_version{version = Vsn}]),
613613
Q1 = Fun(Q),
@@ -658,7 +658,7 @@ update_decorators_in_khepri(QName, Decorators) ->
658658
Path = khepri_queue_path(QName),
659659
Ret1 = rabbit_khepri:adv_get(Path),
660660
case Ret1 of
661-
{ok, #{data := Q1, payload_version := Vsn}} ->
661+
{ok, #{Path := #{data := Q1, payload_version := Vsn}}} ->
662662
Q2 = amqqueue:set_decorators(Q1, Decorators),
663663
UpdatePath = khepri_path:combine_with_conditions(
664664
Path, [#if_payload_version{version = Vsn}]),
@@ -1098,15 +1098,12 @@ delete_transient_in_khepri(FilterFun) ->
10981098
case rabbit_khepri:adv_get_many(PathPattern) of
10991099
{ok, Props} ->
11001100
Qs = maps:fold(
1101-
fun(Path0, #{data := Q, payload_version := Vsn}, Acc)
1101+
fun(Path, #{data := Q, payload_version := Vsn}, Acc)
11021102
when ?is_amqqueue(Q) ->
11031103
case FilterFun(Q) of
11041104
true ->
1105-
Path = khepri_path:combine_with_conditions(
1106-
Path0,
1107-
[#if_payload_version{version = Vsn}]),
11081105
QName = amqqueue:get_name(Q),
1109-
[{Path, QName} | Acc];
1106+
[{Path, Vsn, QName} | Acc];
11101107
false ->
11111108
Acc
11121109
end
@@ -1125,20 +1122,7 @@ do_delete_transient_queues_in_khepri([], _FilterFun) ->
11251122
do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
11261123
Res = rabbit_khepri:transaction(
11271124
fun() ->
1128-
rabbit_misc:fold_while_ok(
1129-
fun({Path, QName}, Acc) ->
1130-
%% Also see `delete_in_khepri/2'.
1131-
case khepri_tx_adv:delete(Path) of
1132-
{ok, #{data := _}} ->
1133-
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
1134-
QName, false),
1135-
{ok, [{QName, Deletions} | Acc]};
1136-
{ok, _} ->
1137-
{ok, Acc};
1138-
{error, _} = Error ->
1139-
Error
1140-
end
1141-
end, [], Qs)
1125+
do_delete_transient_queues_in_khepri_tx(Qs, [])
11421126
end),
11431127
case Res of
11441128
{ok, Items} ->
@@ -1152,6 +1136,24 @@ do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
11521136
Error
11531137
end.
11541138

1139+
do_delete_transient_queues_in_khepri_tx([], Acc) ->
1140+
{ok, Acc};
1141+
do_delete_transient_queues_in_khepri_tx([{Path, Vsn, QName} | Rest], Acc) ->
1142+
%% Also see `delete_in_khepri/2'.
1143+
VersionedPath = khepri_path:combine_with_conditions(
1144+
Path, [#if_payload_version{version = Vsn}]),
1145+
case khepri_tx_adv:delete(VersionedPath) of
1146+
{ok, #{Path := #{data := _}}} ->
1147+
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
1148+
QName, false),
1149+
Acc1 = [{QName, Deletions} | Acc],
1150+
do_delete_transient_queues_in_khepri_tx(Rest, Acc1);
1151+
{ok, _} ->
1152+
do_delete_transient_queues_in_khepri_tx(Rest, Acc);
1153+
{error, _} = Error ->
1154+
Error
1155+
end.
1156+
11551157
%% -------------------------------------------------------------------
11561158
%% foreach_transient().
11571159
%% -------------------------------------------------------------------

deps/rabbit/src/rabbit_db_rtparams.erl

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ set_in_khepri(Key, Term) ->
5959
Record = #runtime_parameters{key = Key,
6060
value = Term},
6161
case rabbit_khepri:adv_put(Path, Record) of
62-
{ok, #{data := Params}} ->
62+
{ok, #{Path := #{data := Params}}} ->
6363
{old, Params#runtime_parameters.value};
6464
{ok, _} ->
6565
new
@@ -114,7 +114,7 @@ set_in_khepri_tx(Key, Term) ->
114114
Record = #runtime_parameters{key = Key,
115115
value = Term},
116116
case khepri_tx_adv:put(Path, Record) of
117-
{ok, #{data := Params}} ->
117+
{ok, #{Path := #{data := Params}}} ->
118118
{old, Params#runtime_parameters.value};
119119
{ok, _} ->
120120
new
@@ -347,11 +347,23 @@ delete_vhost_in_mnesia_tx(VHostName) ->
347347
<- mnesia:match_object(?MNESIA_TABLE, Match, read)].
348348

349349
delete_vhost_in_khepri(VHostName) ->
350-
Path = khepri_vhost_rp_path(
351-
VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
352-
case rabbit_khepri:adv_delete_many(Path) of
353-
{ok, Props} ->
354-
{ok, rabbit_khepri:collect_payloads(Props)};
350+
Pattern = khepri_vhost_rp_path(
351+
VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
352+
case rabbit_khepri:adv_delete_many(Pattern) of
353+
{ok, NodePropsMap} ->
354+
RTParams =
355+
maps:fold(
356+
fun(Path, Props, Acc) ->
357+
case {Path, Props} of
358+
{?RABBITMQ_KHEPRI_VHOST_RUNTIME_PARAM_PATH(
359+
VHostName, _, _),
360+
#{data := RTParam}} ->
361+
[RTParam | Acc];
362+
{_, _} ->
363+
Acc
364+
end
365+
end, [], NodePropsMap),
366+
{ok, RTParams};
355367
{error, _} = Err ->
356368
Err
357369
end.

deps/rabbit/src/rabbit_db_user.erl

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -628,20 +628,42 @@ clear_all_permissions_for_vhost_in_mnesia(VHostName) ->
628628
clear_all_permissions_for_vhost_in_khepri(VHostName) ->
629629
rabbit_khepri:transaction(
630630
fun() ->
631-
UserPermissionsPath = khepri_user_permission_path(
632-
?KHEPRI_WILDCARD_STAR, VHostName),
633-
TopicPermissionsPath = khepri_topic_permission_path(
634-
?KHEPRI_WILDCARD_STAR, VHostName,
635-
?KHEPRI_WILDCARD_STAR),
636-
{ok, UserProps} = khepri_tx_adv:delete_many(UserPermissionsPath),
637-
{ok, TopicProps} = khepri_tx_adv:delete_many(
638-
TopicPermissionsPath),
639-
Deletions = rabbit_khepri:collect_payloads(
640-
TopicProps,
641-
rabbit_khepri:collect_payloads(UserProps)),
642-
{ok, Deletions}
631+
clear_all_permissions_for_vhost_in_khepri_tx(VHostName)
643632
end, rw, #{timeout => infinity}).
644633

634+
clear_all_permissions_for_vhost_in_khepri_tx(VHostName) ->
635+
UserPermissionsPattern = khepri_user_permission_path(
636+
?KHEPRI_WILDCARD_STAR, VHostName),
637+
TopicPermissionsPattern = khepri_topic_permission_path(
638+
?KHEPRI_WILDCARD_STAR, VHostName,
639+
?KHEPRI_WILDCARD_STAR),
640+
{ok, UserNodePropsMap} = khepri_tx_adv:delete_many(UserPermissionsPattern),
641+
{ok, TopicNodePropsMap} = khepri_tx_adv:delete_many(
642+
TopicPermissionsPattern),
643+
Deletions0 =
644+
maps:fold(
645+
fun(Path, Props, Acc) ->
646+
case {Path, Props} of
647+
{?RABBITMQ_KHEPRI_USER_PERMISSION_PATH(VHostName, _),
648+
#{data := Permission}} ->
649+
[Permission | Acc];
650+
{_, _} ->
651+
Acc
652+
end
653+
end, [], UserNodePropsMap),
654+
Deletions1 =
655+
maps:fold(
656+
fun(Path, Props, Acc) ->
657+
case {Path, Props} of
658+
{?RABBITMQ_KHEPRI_TOPIC_PERMISSION_PATH(VHostName, _, _),
659+
#{data := Permission}} ->
660+
[Permission | Acc];
661+
{_, _} ->
662+
Acc
663+
end
664+
end, Deletions0, TopicNodePropsMap),
665+
{ok, Deletions1}.
666+
645667
%% -------------------------------------------------------------------
646668
%% get_topic_permissions().
647669
%% -------------------------------------------------------------------

deps/rabbit/src/rabbit_db_vhost.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ merge_metadata_in_khepri(VHostName, Metadata) ->
167167
Path = khepri_vhost_path(VHostName),
168168
Ret1 = rabbit_khepri:adv_get(Path),
169169
case Ret1 of
170-
{ok, #{data := VHost0, payload_version := DVersion}} ->
170+
{ok, #{Path := #{data := VHost0, payload_version := DVersion}}} ->
171171
VHost = vhost:merge_metadata(VHost0, Metadata),
172172
rabbit_log:debug("Updating a virtual host record ~p", [VHost]),
173173
Path1 = khepri_path:combine_with_conditions(
@@ -443,10 +443,10 @@ update_in_mnesia_tx(VHostName, UpdateFun)
443443
update_in_khepri(VHostName, UpdateFun) ->
444444
Path = khepri_vhost_path(VHostName),
445445
case rabbit_khepri:adv_get(Path) of
446-
{ok, #{data := V, payload_version := DVersion}} ->
446+
{ok, #{Path := #{data := V, payload_version := Vsn}}} ->
447447
V1 = UpdateFun(V),
448448
Path1 = khepri_path:combine_with_conditions(
449-
Path, [#if_payload_version{version = DVersion}]),
449+
Path, [#if_payload_version{version = Vsn}]),
450450
case rabbit_khepri:put(Path1, V1) of
451451
ok ->
452452
V1;

0 commit comments

Comments
 (0)