@@ -1012,7 +1012,8 @@ set_many_in_khepri(Qs) ->
10121012 Queue :: amqqueue :amqqueue (),
10131013 FilterFun :: fun ((Queue ) -> boolean ()),
10141014 QName :: rabbit_amqqueue :name (),
1015- Ret :: {[QName ], [Deletions :: rabbit_binding :deletions ()]}.
1015+ Ret :: {[QName ], [Deletions :: rabbit_binding :deletions ()]}
1016+ | rabbit_khepri :timeout_error ().
10161017% % @doc Deletes all transient queues that match `FilterFun'.
10171018% %
10181019% % @private
@@ -1073,26 +1074,59 @@ delete_transient_in_khepri(FilterFun) ->
10731074 % % process might call itself. Instead we can fetch all of the transient
10741075 % % queues with `get_many' and then filter and fold the results outside of
10751076 % % Khepri's Ra server process.
1076- case rabbit_khepri :get_many (PathPattern ) of
1077- {ok , Qs } ->
1078- Items = maps :fold (
1079- fun (Path , Queue , Acc ) when ? is_amqqueue (Queue ) ->
1080- case FilterFun (Queue ) of
1081- true ->
1082- QueueName = khepri_queue_path_to_name (
1083- Path ),
1084- case delete_in_khepri (QueueName , false ) of
1085- ok ->
1086- Acc ;
1087- Deletions ->
1088- [{QueueName , Deletions } | Acc ]
1089- end ;
1090- false ->
1091- Acc
1092- end
1093- end , [], Qs ),
1094- {QueueNames , Deletions } = lists :unzip (Items ),
1095- {QueueNames , lists :flatten (Deletions )};
1077+ case rabbit_khepri :adv_get_many (PathPattern ) of
1078+ {ok , Props } ->
1079+ Qs = maps :fold (
1080+ fun (Path0 , #{data := Q , payload_version := Vsn }, Acc )
1081+ when ? is_amqqueue (Q ) ->
1082+ case FilterFun (Q ) of
1083+ true ->
1084+ Path = khepri_path :combine_with_conditions (
1085+ Path0 ,
1086+ [# if_payload_version {version = Vsn }]),
1087+ QName = amqqueue :get_name (Q ),
1088+ [{Path , QName } | Acc ];
1089+ false ->
1090+ Acc
1091+ end
1092+ end , [], Props ),
1093+ do_delete_transient_queues_in_khepri (Qs , FilterFun );
1094+ {error , _ } = Error ->
1095+ Error
1096+ end .
1097+
1098+ do_delete_transient_queues_in_khepri ([], _FilterFun ) ->
1099+ % % If there are no changes to make, avoid performing a transaction. When
1100+ % % Khepri is in a minority this avoids a long timeout waiting for the
1101+ % % transaction command to be processed. Otherwise it avoids appending a
1102+ % % somewhat large transaction command to Khepri's log.
1103+ {[], []};
1104+ do_delete_transient_queues_in_khepri (Qs , FilterFun ) ->
1105+ Res = rabbit_khepri :transaction (
1106+ fun () ->
1107+ rabbit_misc :fold_while_ok (
1108+ fun ({Path , QName }, Acc ) ->
1109+ % % Also see `delete_in_khepri/2'.
1110+ case khepri_tx_adv :delete (Path ) of
1111+ {ok , #{data := _ }} ->
1112+ Deletions = rabbit_db_binding :delete_for_destination_in_khepri (
1113+ QName , false ),
1114+ {ok , [{QName , Deletions } | Acc ]};
1115+ {ok , _ } ->
1116+ {ok , Acc };
1117+ {error , _ } = Error ->
1118+ Error
1119+ end
1120+ end , [], Qs )
1121+ end ),
1122+ case Res of
1123+ {ok , Items } ->
1124+ {QNames , Deletions } = lists :unzip (Items ),
1125+ {QNames , lists :flatten (Deletions )};
1126+ {error , {khepri , mismatching_node , _ }} ->
1127+ % % One of the queues changed while attempting to update all
1128+ % % queues. Retry the operation.
1129+ delete_transient_in_khepri (FilterFun );
10961130 {error , _ } = Error ->
10971131 Error
10981132 end .
@@ -1366,6 +1400,3 @@ khepri_queues_path() ->
13661400
13671401khepri_queue_path (# resource {virtual_host = VHost , name = Name }) ->
13681402 [? MODULE , queues , VHost , Name ].
1369-
1370- khepri_queue_path_to_name ([? MODULE , queues , VHost , Name ]) ->
1371- rabbit_misc :r (VHost , queue , Name ).
0 commit comments