@@ -411,7 +411,7 @@ delete_in_khepri(QueueName, OnlyDurable) ->
411411 fun () ->
412412 Path = khepri_queue_path (QueueName ),
413413 case khepri_tx_adv :delete (Path ) of
414- {ok , #{data := _ }} ->
414+ {ok , #{Path : = #{ data := _ } }} ->
415415 % % we want to execute some things, as decided by rabbit_exchange,
416416 % % after the transaction.
417417 rabbit_db_binding :delete_for_destination_in_khepri (QueueName , OnlyDurable );
@@ -606,7 +606,7 @@ update_in_khepri(QName, Fun) ->
606606 Path = khepri_queue_path (QName ),
607607 Ret1 = rabbit_khepri :adv_get (Path ),
608608 case Ret1 of
609- {ok , #{data := Q , payload_version := Vsn }} ->
609+ {ok , #{Path : = #{ data := Q , payload_version := Vsn } }} ->
610610 UpdatePath = khepri_path :combine_with_conditions (
611611 Path , [# if_payload_version {version = Vsn }]),
612612 Q1 = Fun (Q ),
@@ -657,7 +657,7 @@ update_decorators_in_khepri(QName, Decorators) ->
657657 Path = khepri_queue_path (QName ),
658658 Ret1 = rabbit_khepri :adv_get (Path ),
659659 case Ret1 of
660- {ok , #{data := Q1 , payload_version := Vsn }} ->
660+ {ok , #{Path : = #{ data := Q1 , payload_version := Vsn } }} ->
661661 Q2 = amqqueue :set_decorators (Q1 , Decorators ),
662662 UpdatePath = khepri_path :combine_with_conditions (
663663 Path , [# if_payload_version {version = Vsn }]),
@@ -1075,15 +1075,12 @@ delete_transient_in_khepri(FilterFun) ->
10751075 case rabbit_khepri :adv_get_many (PathPattern ) of
10761076 {ok , Props } ->
10771077 Qs = maps :fold (
1078- fun (Path0 , #{data := Q , payload_version := Vsn }, Acc )
1078+ fun (Path , #{data := Q , payload_version := Vsn }, Acc )
10791079 when ? is_amqqueue (Q ) ->
10801080 case FilterFun (Q ) of
10811081 true ->
1082- Path = khepri_path :combine_with_conditions (
1083- Path0 ,
1084- [# if_payload_version {version = Vsn }]),
10851082 QName = amqqueue :get_name (Q ),
1086- [{Path , QName } | Acc ];
1083+ [{Path , Vsn , QName } | Acc ];
10871084 false ->
10881085 Acc
10891086 end
@@ -1102,20 +1099,7 @@ do_delete_transient_queues_in_khepri([], _FilterFun) ->
11021099do_delete_transient_queues_in_khepri (Qs , FilterFun ) ->
11031100 Res = rabbit_khepri :transaction (
11041101 fun () ->
1105- rabbit_misc :fold_while_ok (
1106- fun ({Path , QName }, Acc ) ->
1107- % % Also see `delete_in_khepri/2'.
1108- case khepri_tx_adv :delete (Path ) of
1109- {ok , #{data := _ }} ->
1110- Deletions = rabbit_db_binding :delete_for_destination_in_khepri (
1111- QName , false ),
1112- {ok , [{QName , Deletions } | Acc ]};
1113- {ok , _ } ->
1114- {ok , Acc };
1115- {error , _ } = Error ->
1116- Error
1117- end
1118- end , [], Qs )
1102+ do_delete_transient_queues_in_khepri_tx (Qs , [])
11191103 end ),
11201104 case Res of
11211105 {ok , Items } ->
@@ -1129,6 +1113,24 @@ do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
11291113 Error
11301114 end .
11311115
1116+ do_delete_transient_queues_in_khepri_tx ([], Acc ) ->
1117+ {ok , Acc };
1118+ do_delete_transient_queues_in_khepri_tx ([{Path , Vsn , QName } | Rest ], Acc ) ->
1119+ % % Also see `delete_in_khepri/2'.
1120+ VersionedPath = khepri_path :combine_with_conditions (
1121+ Path , [# if_payload_version {version = Vsn }]),
1122+ case khepri_tx_adv :delete (VersionedPath ) of
1123+ {ok , #{Path := #{data := _ }}} ->
1124+ Deletions = rabbit_db_binding :delete_for_destination_in_khepri (
1125+ QName , false ),
1126+ Acc1 = [{QName , Deletions } | Acc ],
1127+ do_delete_transient_queues_in_khepri_tx (Rest , Acc1 );
1128+ {ok , _ } ->
1129+ do_delete_transient_queues_in_khepri_tx (Rest , Acc );
1130+ {error , _ } = Error ->
1131+ Error
1132+ end .
1133+
11321134% % -------------------------------------------------------------------
11331135% % foreach_transient().
11341136% % -------------------------------------------------------------------
0 commit comments