@@ -411,10 +411,16 @@ delete_in_khepri(QueueName, OnlyDurable) ->
411411 fun () ->
412412 Path = khepri_queue_path (QueueName ),
413413 case khepri_tx_adv :delete (Path ) of
414+ % % Khepri 0.16 and below returned `khepri:node_props()' for
415+ % % adv queries and commands targeting one node:
414416 {ok , #{data := _ }} ->
415417 % % we want to execute some things, as decided by rabbit_exchange,
416418 % % after the transaction.
417419 rabbit_db_binding :delete_for_destination_in_khepri (QueueName , OnlyDurable );
420+ % % Khepri 0.17+ returns `khepri_adv:node_props_map()`
421+ % % instead.
422+ {ok , #{Path := #{data := _ }}} ->
423+ rabbit_db_binding :delete_for_destination_in_khepri (QueueName , OnlyDurable );
418424 {ok , _ } ->
419425 ok
420426 end
@@ -606,7 +612,19 @@ update_in_khepri(QName, Fun) ->
606612 Path = khepri_queue_path (QName ),
607613 Ret1 = rabbit_khepri :adv_get (Path ),
608614 case Ret1 of
609- {ok , #{data := Q , payload_version := Vsn }} ->
615+ {ok , QueryRet } ->
616+ {Q , Vsn } = case QueryRet of
617+ % % Khepri 0.16 and below returned
618+ % % `khepri:node_props()' for adv queries and
619+ % % commands targeting one node:
620+ #{data := Data , payload_version := V } ->
621+ {Data , V };
622+ % % Khepri 0.17+ returns `khepri_adv:node_props_map()`
623+ % % instead.
624+ #{Path := #{data := Data ,
625+ payload_version := V }} ->
626+ {Data , V }
627+ end ,
610628 UpdatePath = khepri_path :combine_with_conditions (
611629 Path , [# if_payload_version {version = Vsn }]),
612630 Q1 = Fun (Q ),
@@ -657,11 +675,23 @@ update_decorators_in_khepri(QName, Decorators) ->
657675 Path = khepri_queue_path (QName ),
658676 Ret1 = rabbit_khepri :adv_get (Path ),
659677 case Ret1 of
660- {ok , #{data := Q1 , payload_version := Vsn }} ->
661- Q2 = amqqueue :set_decorators (Q1 , Decorators ),
678+ {ok , QueryRet } ->
679+ {Q , Vsn } = case QueryRet of
680+ % % Khepri 0.16 and below returned
681+ % % `khepri:node_props()' for adv queries and
682+ % % commands targeting one node:
683+ #{data := Data , payload_version := V } ->
684+ {Data , V };
685+ % % Khepri 0.17+ returns
686+ % % `khepri_adv:node_props_map()` instead.
687+ #{Path := #{data := Data ,
688+ payload_version := V }} ->
689+ {Data , V }
690+ end ,
691+ Q1 = amqqueue :set_decorators (Q , Decorators ),
662692 UpdatePath = khepri_path :combine_with_conditions (
663693 Path , [# if_payload_version {version = Vsn }]),
664- Ret2 = rabbit_khepri :put (UpdatePath , Q2 ),
694+ Ret2 = rabbit_khepri :put (UpdatePath , Q1 ),
665695 case Ret2 of
666696 ok -> ok ;
667697 {error , {khepri , mismatching_node , _ }} ->
@@ -1075,15 +1105,12 @@ delete_transient_in_khepri(FilterFun) ->
10751105 case rabbit_khepri :adv_get_many (PathPattern ) of
10761106 {ok , Props } ->
10771107 Qs = maps :fold (
1078- fun (Path0 , #{data := Q , payload_version := Vsn }, Acc )
1108+ fun (Path , #{data := Q , payload_version := Vsn }, Acc )
10791109 when ? is_amqqueue (Q ) ->
10801110 case FilterFun (Q ) of
10811111 true ->
1082- Path = khepri_path :combine_with_conditions (
1083- Path0 ,
1084- [# if_payload_version {version = Vsn }]),
10851112 QName = amqqueue :get_name (Q ),
1086- [{Path , QName } | Acc ];
1113+ [{Path , Vsn , QName } | Acc ];
10871114 false ->
10881115 Acc
10891116 end
@@ -1102,20 +1129,7 @@ do_delete_transient_queues_in_khepri([], _FilterFun) ->
11021129do_delete_transient_queues_in_khepri (Qs , FilterFun ) ->
11031130 Res = rabbit_khepri :transaction (
11041131 fun () ->
1105- rabbit_misc :fold_while_ok (
1106- fun ({Path , QName }, Acc ) ->
1107- % % Also see `delete_in_khepri/2'.
1108- case khepri_tx_adv :delete (Path ) of
1109- {ok , #{data := _ }} ->
1110- Deletions = rabbit_db_binding :delete_for_destination_in_khepri (
1111- QName , false ),
1112- {ok , [{QName , Deletions } | Acc ]};
1113- {ok , _ } ->
1114- {ok , Acc };
1115- {error , _ } = Error ->
1116- Error
1117- end
1118- end , [], Qs )
1132+ do_delete_transient_queues_in_khepri_tx (Qs , [])
11191133 end ),
11201134 case Res of
11211135 {ok , Items } ->
@@ -1129,6 +1143,35 @@ do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
11291143 Error
11301144 end .
11311145
1146+ do_delete_transient_queues_in_khepri_tx ([], Acc ) ->
1147+ {ok , Acc };
1148+ do_delete_transient_queues_in_khepri_tx ([{Path , Vsn , QName } | Rest ], Acc ) ->
1149+ % % Also see `delete_in_khepri/2'.
1150+ VersionedPath = khepri_path :combine_with_conditions (
1151+ Path , [# if_payload_version {version = Vsn }]),
1152+ case khepri_tx_adv :delete (VersionedPath ) of
1153+ {ok , Res } ->
1154+ Acc1 = case Res of
1155+ % % Khepri 0.16 and below returned `khepri:node_props()'
1156+ % % for adv queries and commands targeting one node:
1157+ #{data := _ } ->
1158+ Deletions = rabbit_db_binding :delete_for_destination_in_khepri (
1159+ QName , false ),
1160+ [{QName , Deletions } | Acc ];
1161+ % % Khepri 0.17+ returns `khepri_adv:node_props_map()`
1162+ % % instead.
1163+ #{Path := #{data := _ }} ->
1164+ Deletions = rabbit_db_binding :delete_for_destination_in_khepri (
1165+ QName , false ),
1166+ [{QName , Deletions } | Acc ];
1167+ _ ->
1168+ Acc
1169+ end ,
1170+ do_delete_transient_queues_in_khepri_tx (Rest , Acc1 );
1171+ {error , _ } = Error ->
1172+ Error
1173+ end .
1174+
11321175% % -------------------------------------------------------------------
11331176% % foreach_transient().
11341177% % -------------------------------------------------------------------
0 commit comments