Skip to content

Commit 703c9ce

Browse files
committed
rabbit_db_*: Handle khepri_adv:node_props_map() returns from adv API
Khepri 0.17.x will change the return of functions from the khepri_adv and khepri_tx_adv modules. Previously, functions that target one specific tree node, for example `khepri_adv:delete/3`, would return the node props map (`khepri:node_props()`) for the affected node. Now all of the "adv API" returns `khepri_adv:node_props_map()` for consistency.
1 parent 3e41bff commit 703c9ce

File tree

8 files changed

+185
-36
lines changed

8 files changed

+185
-36
lines changed

deps/rabbit/src/rabbit_db_exchange.erl

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,19 @@ update_in_khepri(XName, Fun) ->
331331
Path = khepri_exchange_path(XName),
332332
Ret1 = rabbit_khepri:adv_get(Path),
333333
case Ret1 of
334-
{ok, #{data := X, payload_version := Vsn}} ->
334+
{ok, QueryRet} ->
335+
{X, Vsn} = case QueryRet of
336+
%% Khepri 0.16 and below returned
337+
%% `khepri:node_props()' for adv queries and
338+
%% commands targeting one node:
339+
#{data := Data, payload_version := V} ->
340+
{Data, V};
341+
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
342+
%% instead.
343+
#{Path := #{data := Data,
344+
payload_version := V}} ->
345+
{Data, V}
346+
end,
335347
X1 = Fun(X),
336348
UpdatePath =
337349
khepri_path:combine_with_conditions(
@@ -534,8 +546,19 @@ next_serial_in_khepri(XName) ->
534546
Path = khepri_exchange_serial_path(XName),
535547
Ret1 = rabbit_khepri:adv_get(Path),
536548
case Ret1 of
537-
{ok, #{data := Serial,
538-
payload_version := Vsn}} ->
549+
{ok, QueryRet} ->
550+
{Serial, Vsn} = case QueryRet of
551+
%% Khepri 0.16 and below returned
552+
%% `khepri:node_props()' for adv queries and
553+
%% commands targeting one node:
554+
#{data := Data, payload_version := V} ->
555+
{Data, V};
556+
%% Khepri 0.17+ return
557+
%% `khepri_adv:node_props_map()` instead.
558+
#{Path := #{data := Data,
559+
payload_version := V}} ->
560+
{Data, V}
561+
end,
539562
UpdatePath =
540563
khepri_path:combine_with_conditions(
541564
Path, [#if_payload_version{version = Vsn}]),

deps/rabbit/src/rabbit_db_msup.erl

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,20 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) ->
135135
mirroring_pid = Overall,
136136
childspec = ChildSpec},
137137
case rabbit_khepri:adv_get(Path) of
138-
{ok, #{data := #mirrored_sup_childspec{mirroring_pid = Pid},
139-
payload_version := Vsn}} ->
138+
{ok, QueryRet} ->
139+
{#mirrored_sup_childspec{mirroring_pid = Pid}, Vsn} =
140+
case QueryRet of
141+
%% Khepri 0.16 and below returned
142+
%% `khepri:node_props()' for adv queries and
143+
%% commands targeting one node:
144+
#{data := Data, payload_version := V} ->
145+
{Data, V};
146+
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
147+
%% instead.
148+
#{Path := #{data := Data,
149+
payload_version := V}} ->
150+
{Data, V}
151+
end,
140152
case Overall of
141153
Pid ->
142154
Delegate;

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 62 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -409,10 +409,16 @@ delete_in_khepri(QueueName, OnlyDurable) ->
409409
fun () ->
410410
Path = khepri_queue_path(QueueName),
411411
case khepri_tx_adv:delete(Path) of
412+
%% Khepri 0.16 and below returned `khepri:node_props()' for
413+
%% adv queries and commands targeting one node:
412414
{ok, #{data := _}} ->
413415
%% we want to execute some things, as decided by rabbit_exchange,
414416
%% after the transaction.
415417
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
418+
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
419+
%% instead.
420+
{ok, #{Path := #{data := _}}} ->
421+
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
416422
{ok, _} ->
417423
ok
418424
end
@@ -604,7 +610,19 @@ update_in_khepri(QName, Fun) ->
604610
Path = khepri_queue_path(QName),
605611
Ret1 = rabbit_khepri:adv_get(Path),
606612
case Ret1 of
607-
{ok, #{data := Q, payload_version := Vsn}} ->
613+
{ok, QueryRet} ->
614+
{Q, Vsn} = case QueryRet of
615+
%% Khepri 0.16 and below returned
616+
%% `khepri:node_props()' for adv queries and
617+
%% commands targeting one node:
618+
#{data := Data, payload_version := V} ->
619+
{Data, V};
620+
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
621+
%% instead.
622+
#{Path := #{data := Data,
623+
payload_version := V}} ->
624+
{Data, V}
625+
end,
608626
UpdatePath = khepri_path:combine_with_conditions(
609627
Path, [#if_payload_version{version = Vsn}]),
610628
Q1 = Fun(Q),
@@ -655,11 +673,23 @@ update_decorators_in_khepri(QName, Decorators) ->
655673
Path = khepri_queue_path(QName),
656674
Ret1 = rabbit_khepri:adv_get(Path),
657675
case Ret1 of
658-
{ok, #{data := Q1, payload_version := Vsn}} ->
659-
Q2 = amqqueue:set_decorators(Q1, Decorators),
676+
{ok, QueryRet} ->
677+
{Q, Vsn} = case QueryRet of
678+
%% Khepri 0.16 and below returned
679+
%% `khepri:node_props()' for adv queries and
680+
%% commands targeting one node:
681+
#{data := Data, payload_version := V} ->
682+
{Data, V};
683+
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
684+
%% instead.
685+
#{Path := #{data := Data,
686+
payload_version := V}} ->
687+
{Data, V}
688+
end,
689+
Q1 = amqqueue:set_decorators(Q, Decorators),
660690
UpdatePath = khepri_path:combine_with_conditions(
661691
Path, [#if_payload_version{version = Vsn}]),
662-
Ret2 = rabbit_khepri:put(UpdatePath, Q2),
692+
Ret2 = rabbit_khepri:put(UpdatePath, Q1),
663693
case Ret2 of
664694
ok -> ok;
665695
{error, {khepri, mismatching_node, _}} ->
@@ -1100,20 +1130,7 @@ do_delete_transient_queues_in_khepri([], _FilterFun) ->
11001130
do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
11011131
Res = rabbit_khepri:transaction(
11021132
fun() ->
1103-
rabbit_misc:fold_while_ok(
1104-
fun({Path, QName}, Acc) ->
1105-
%% Also see `delete_in_khepri/2'.
1106-
case khepri_tx_adv:delete(Path) of
1107-
{ok, #{data := _}} ->
1108-
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
1109-
QName, false),
1110-
{ok, [{QName, Deletions} | Acc]};
1111-
{ok, _} ->
1112-
{ok, Acc};
1113-
{error, _} = Error ->
1114-
Error
1115-
end
1116-
end, [], Qs)
1133+
do_delete_transient_queues_in_khepri_tx(Qs, [])
11171134
end),
11181135
case Res of
11191136
{ok, Items} ->
@@ -1127,6 +1144,33 @@ do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
11271144
Error
11281145
end.
11291146

1147+
do_delete_transient_queues_in_khepri_tx([], Acc) ->
1148+
{ok, Acc};
1149+
do_delete_transient_queues_in_khepri_tx([{Path, QName} | Rest], Acc) ->
1150+
%% Also see `delete_in_khepri/2'.
1151+
case khepri_tx_adv:delete(Path) of
1152+
{ok, Res} ->
1153+
Acc1 = case Res of
1154+
%% Khepri 0.16 and below returned `khepri:node_props()'
1155+
%% for adv queries and commands targeting one node:
1156+
#{data := _} ->
1157+
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
1158+
QName, false),
1159+
[{QName, Deletions} | Acc];
1160+
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
1161+
%% instead.
1162+
#{Path := #{data := _}} ->
1163+
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
1164+
QName, false),
1165+
[{QName, Deletions} | Acc];
1166+
_ ->
1167+
Acc
1168+
end,
1169+
do_delete_transient_queues_in_khepri_tx(Rest, Acc1);
1170+
{error, _} = Error ->
1171+
Error
1172+
end.
1173+
11301174
%% -------------------------------------------------------------------
11311175
%% foreach_transient().
11321176
%% -------------------------------------------------------------------

deps/rabbit/src/rabbit_db_rtparams.erl

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,13 @@ set_in_khepri(Key, Term) ->
5959
Record = #runtime_parameters{key = Key,
6060
value = Term},
6161
case rabbit_khepri:adv_put(Path, Record) of
62+
%% Khepri 0.16 and below returned `khepri:node_props()' for adv queries
63+
%% and commands targeting one node:
6264
{ok, #{data := Params}} ->
6365
{old, Params#runtime_parameters.value};
66+
%% Khepri 0.17+ return `khepri_adv:node_props_map()` instead.
67+
{ok, #{Path := #{data := Params}}} ->
68+
{old, Params#runtime_parameters.value};
6469
{ok, _} ->
6570
new
6671
end.
@@ -114,8 +119,13 @@ set_in_khepri_tx(Key, Term) ->
114119
Record = #runtime_parameters{key = Key,
115120
value = Term},
116121
case khepri_tx_adv:put(Path, Record) of
122+
%% Khepri 0.16 and below returned `khepri:node_props()' for adv
123+
%% queries and commands targeting one node:
117124
{ok, #{data := Params}} ->
118125
{old, Params#runtime_parameters.value};
126+
%% Khepri 0.17+ return `khepri_adv:node_props_map()` instead.
127+
{ok, #{Path := #{data := Params}}} ->
128+
{old, Params#runtime_parameters.value};
119129
{ok, _} ->
120130
new
121131
end.

deps/rabbit/src/rabbit_db_vhost.erl

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -165,11 +165,23 @@ merge_metadata_in_khepri(VHostName, Metadata) ->
165165
Path = khepri_vhost_path(VHostName),
166166
Ret1 = rabbit_khepri:adv_get(Path),
167167
case Ret1 of
168-
{ok, #{data := VHost0, payload_version := DVersion}} ->
168+
{ok, QueryRet} ->
169+
{VHost0, Vsn} = case QueryRet of
170+
%% Khepri 0.16 and below returned
171+
%% `khepri:node_props()' for adv queries and
172+
%% commands targeting one node:
173+
#{data := Data, payload_version := V} ->
174+
{Data, V};
175+
%% Khepri 0.17+ return
176+
%% `khepri_adv:node_props_map()` instead.
177+
#{Path := #{data := Data,
178+
payload_version := V}} ->
179+
{Data, V}
180+
end,
169181
VHost = vhost:merge_metadata(VHost0, Metadata),
170182
rabbit_log:debug("Updating a virtual host record ~p", [VHost]),
171183
Path1 = khepri_path:combine_with_conditions(
172-
Path, [#if_payload_version{version = DVersion}]),
184+
Path, [#if_payload_version{version = Vsn}]),
173185
Ret2 = rabbit_khepri:put(Path1, VHost),
174186
case Ret2 of
175187
ok ->
@@ -411,13 +423,25 @@ update_in_mnesia_tx(VHostName, UpdateFun)
411423
update_in_khepri(VHostName, UpdateFun) ->
412424
Path = khepri_vhost_path(VHostName),
413425
case rabbit_khepri:adv_get(Path) of
414-
{ok, #{data := V, payload_version := DVersion}} ->
415-
V1 = UpdateFun(V),
426+
{ok, QueryRet} ->
427+
{VHost0, Vsn} = case QueryRet of
428+
%% Khepri 0.16 and below returned
429+
%% `khepri:node_props()' for adv queries and
430+
%% commands targeting one node:
431+
#{data := Data, payload_version := V} ->
432+
{Data, V};
433+
%% Khepri 0.17+ return
434+
%% `khepri_adv:node_props_map()` instead.
435+
#{Path := #{data := Data,
436+
payload_version := V}} ->
437+
{Data, V}
438+
end,
439+
VHost1 = UpdateFun(VHost0),
416440
Path1 = khepri_path:combine_with_conditions(
417-
Path, [#if_payload_version{version = DVersion}]),
418-
case rabbit_khepri:put(Path1, V1) of
441+
Path, [#if_payload_version{version = Vsn}]),
442+
case rabbit_khepri:put(Path1, VHost1) of
419443
ok ->
420-
V1;
444+
VHost1;
421445
{error, {khepri, mismatching_node, _}} ->
422446
update_in_khepri(VHostName, UpdateFun);
423447
Error ->

deps/rabbitmq_consistent_hash_exchange/src/rabbit_db_ch_exchange.erl

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,13 +104,25 @@ create_binding_in_mnesia_tx(Src, Dst, Weight, UpdateFun) ->
104104
create_binding_in_khepri(Src, Dst, Weight, UpdateFun) ->
105105
Path = khepri_consistent_hash_path(Src),
106106
case rabbit_khepri:adv_get(Path) of
107-
{ok, #{data := Chx0, payload_version := DVersion}} ->
107+
{ok, QueryRet} ->
108+
{Chx0, Vsn} = case QueryRet of
109+
%% Khepri 0.16 and below returned
110+
%% `khepri:node_props()' for adv queries and
111+
%% commands targeting one node:
112+
#{data := Data, payload_version := V} ->
113+
{Data, V};
114+
%% Khepri 0.17+ return
115+
%% `khepri_adv:node_props_map()` instead.
116+
#{Path := #{data := Data,
117+
payload_version := V}} ->
118+
{Data, V}
119+
end,
108120
case UpdateFun(Chx0, Dst, Weight) of
109121
already_exists ->
110122
already_exists;
111123
Chx ->
112124
Path1 = khepri_path:combine_with_conditions(
113-
Path, [#if_payload_version{version = DVersion}]),
125+
Path, [#if_payload_version{version = Vsn}]),
114126
Ret2 = rabbit_khepri:put(Path1, Chx),
115127
case Ret2 of
116128
ok ->

deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,21 @@ create_or_update_in_mnesia(XName, BindingKeyAndFun, ErrorFun) ->
108108
update_in_khepri(XName, BindingKeyAndFun, UpdateFun, ErrorFun) ->
109109
Path = khepri_jms_topic_exchange_path(XName),
110110
case rabbit_khepri:adv_get(Path) of
111-
{ok, #{data := BindingFuns, payload_version := DVersion}} ->
111+
{ok, QueryRet} ->
112+
{BindingFuns, Vsn} = case QueryRet of
113+
%% Khepri 0.16 and below returned
114+
%% `khepri:node_props()' for adv queries
115+
%% and commands targeting one node:
116+
#{data := Data, payload_version := V} ->
117+
{Data, V};
118+
%% Khepri 0.17+ return
119+
%% `khepri_adv:node_props_map()` instead.
120+
#{Path := #{data := Data,
121+
payload_version := V}} ->
122+
{Data, V}
123+
end,
112124
Path1 = khepri_path:combine_with_conditions(
113-
Path, [#if_payload_version{version = DVersion}]),
125+
Path, [#if_payload_version{version = Vsn}]),
114126
Ret = rabbit_khepri:put(Path1, UpdateFun(BindingFuns, BindingKeyAndFun)),
115127
case Ret of
116128
ok -> ok;

deps/rabbitmq_recent_history_exchange/src/rabbit_db_rh_exchange.erl

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,22 @@ insert0_in_mnesia(Key, Cached, Message, Length) ->
106106
insert_in_khepri(XName, Message, Length) ->
107107
Path = khepri_recent_history_path(XName),
108108
case rabbit_khepri:adv_get(Path) of
109-
{ok, #{data := Cached0, payload_version := DVersion}} ->
109+
{ok, QueryRet} ->
110+
{Cached0, Vsn} = case QueryRet of
111+
%% Khepri 0.16 and below returned
112+
%% `khepri:node_props()' for adv queries and
113+
%% commands targeting one node:
114+
#{data := Data, payload_version := V} ->
115+
{Data, V};
116+
%% Khepri 0.17+ return
117+
%% `khepri_adv:node_props_map()` instead.
118+
#{Path := #{data := Data,
119+
payload_version := V}} ->
120+
{Data, V}
121+
end,
110122
Cached = add_to_cache(Cached0, Message, Length),
111123
Path1 = khepri_path:combine_with_conditions(
112-
Path, [#if_payload_version{version = DVersion}]),
124+
Path, [#if_payload_version{version = Vsn}]),
113125
Ret = rabbit_khepri:put(Path1, Cached),
114126
case Ret of
115127
ok ->

0 commit comments

Comments
 (0)