@@ -411,10 +411,16 @@ delete_in_khepri(QueueName, OnlyDurable) ->
411411 fun () ->
412412 Path = khepri_queue_path (QueueName ),
413413 case khepri_tx_adv :delete (Path ) of
414+ % % Khepri 0.16 and below returned `khepri:node_props()' for
415+ % % adv queries and commands targeting one node:
414416 {ok , #{data := _ }} ->
415417 % % we want to execute some things, as decided by rabbit_exchange,
416418 % % after the transaction.
417419 rabbit_db_binding :delete_for_destination_in_khepri (QueueName , OnlyDurable );
420+ % % Khepri 0.17+ return `khepri_adv:node_props_map()`
421+ % % instead.
422+ {ok , #{Path := #{data := _ }}} ->
423+ rabbit_db_binding :delete_for_destination_in_khepri (QueueName , OnlyDurable );
418424 {ok , _ } ->
419425 ok
420426 end
@@ -606,7 +612,19 @@ update_in_khepri(QName, Fun) ->
606612 Path = khepri_queue_path (QName ),
607613 Ret1 = rabbit_khepri :adv_get (Path ),
608614 case Ret1 of
609- {ok , #{data := Q , payload_version := Vsn }} ->
615+ {ok , QueryRet } ->
616+ {Q , Vsn } = case QueryRet of
617+ % % Khepri 0.16 and below returned
618+ % % `khepri:node_props()' for adv queries and
619+ % % commands targeting one node:
620+ #{data := Data , payload_version := V } ->
621+ {Data , V };
622+ % % Khepri 0.17+ return `khepri_adv:node_props_map()`
623+ % % instead.
624+ #{Path := #{data := Data ,
625+ payload_version := V }} ->
626+ {Data , V }
627+ end ,
610628 UpdatePath = khepri_path :combine_with_conditions (
611629 Path , [# if_payload_version {version = Vsn }]),
612630 Q1 = Fun (Q ),
@@ -657,11 +675,23 @@ update_decorators_in_khepri(QName, Decorators) ->
657675 Path = khepri_queue_path (QName ),
658676 Ret1 = rabbit_khepri :adv_get (Path ),
659677 case Ret1 of
660- {ok , #{data := Q1 , payload_version := Vsn }} ->
661- Q2 = amqqueue :set_decorators (Q1 , Decorators ),
678+ {ok , QueryRet } ->
679+ {Q , Vsn } = case QueryRet of
680+ % % Khepri 0.16 and below returned
681+ % % `khepri:node_props()' for adv queries and
682+ % % commands targeting one node:
683+ #{data := Data , payload_version := V } ->
684+ {Data , V };
685+ % % Khepri 0.17+ return `khepri_adv:node_props_map()`
686+ % % instead.
687+ #{Path := #{data := Data ,
688+ payload_version := V }} ->
689+ {Data , V }
690+ end ,
691+ Q1 = amqqueue :set_decorators (Q , Decorators ),
662692 UpdatePath = khepri_path :combine_with_conditions (
663693 Path , [# if_payload_version {version = Vsn }]),
664- Ret2 = rabbit_khepri :put (UpdatePath , Q2 ),
694+ Ret2 = rabbit_khepri :put (UpdatePath , Q1 ),
665695 case Ret2 of
666696 ok -> ok ;
667697 {error , {khepri , mismatching_node , _ }} ->
@@ -1102,20 +1132,7 @@ do_delete_transient_queues_in_khepri([], _FilterFun) ->
11021132do_delete_transient_queues_in_khepri (Qs , FilterFun ) ->
11031133 Res = rabbit_khepri :transaction (
11041134 fun () ->
1105- rabbit_misc :fold_while_ok (
1106- fun ({Path , QName }, Acc ) ->
1107- % % Also see `delete_in_khepri/2'.
1108- case khepri_tx_adv :delete (Path ) of
1109- {ok , #{data := _ }} ->
1110- Deletions = rabbit_db_binding :delete_for_destination_in_khepri (
1111- QName , false ),
1112- {ok , [{QName , Deletions } | Acc ]};
1113- {ok , _ } ->
1114- {ok , Acc };
1115- {error , _ } = Error ->
1116- Error
1117- end
1118- end , [], Qs )
1135+ do_delete_transient_queues_in_khepri_tx (Qs , [])
11191136 end ),
11201137 case Res of
11211138 {ok , Items } ->
@@ -1129,6 +1146,33 @@ do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
11291146 Error
11301147 end .
11311148
1149+ do_delete_transient_queues_in_khepri_tx ([], Acc ) ->
1150+ {ok , Acc };
1151+ do_delete_transient_queues_in_khepri_tx ([{Path , QName } | Rest ], Acc ) ->
1152+ % % Also see `delete_in_khepri/2'.
1153+ case khepri_tx_adv :delete (Path ) of
1154+ {ok , Res } ->
1155+ Acc1 = case Res of
1156+ % % Khepri 0.16 and below returned `khepri:node_props()'
1157+ % % for adv queries and commands targeting one node:
1158+ #{data := _ } ->
1159+ Deletions = rabbit_db_binding :delete_for_destination_in_khepri (
1160+ QName , false ),
1161+ [{QName , Deletions } | Acc ];
1162+ % % Khepri 0.17+ return `khepri_adv:node_props_map()`
1163+ % % instead.
1164+ #{Path := #{data := _ }} ->
1165+ Deletions = rabbit_db_binding :delete_for_destination_in_khepri (
1166+ QName , false ),
1167+ [{QName , Deletions } | Acc ];
1168+ _ ->
1169+ Acc
1170+ end ,
1171+ do_delete_transient_queues_in_khepri_tx (Rest , Acc1 );
1172+ {error , _ } = Error ->
1173+ Error
1174+ end .
1175+
11321176% % -------------------------------------------------------------------
11331177% % foreach_transient().
11341178% % -------------------------------------------------------------------
0 commit comments