@@ -1027,7 +1027,8 @@ set_many_in_khepri(Qs) ->
10271027 Queue :: amqqueue :amqqueue (),
10281028 FilterFun :: fun ((Queue ) -> boolean ()),
10291029 QName :: rabbit_amqqueue :name (),
1030- Ret :: {[QName ], [Deletions :: rabbit_binding :deletions ()]}.
1030+ Ret :: {[QName ], [Deletions :: rabbit_binding :deletions ()]}
1031+ | rabbit_khepri :timeout_error ().
10311032% % @doc Deletes all transient queues that match `FilterFun'.
10321033% %
10331034% % @private
@@ -1088,26 +1089,59 @@ delete_transient_in_khepri(FilterFun) ->
10881089 % % process might call itself. Instead we can fetch all of the transient
10891090 % % queues with `get_many' and then filter and fold the results outside of
10901091 % % Khepri's Ra server process.
1091- case rabbit_khepri :get_many (PathPattern ) of
1092- {ok , Qs } ->
1093- Items = maps :fold (
1094- fun (Path , Queue , Acc ) when ? is_amqqueue (Queue ) ->
1095- case FilterFun (Queue ) of
1096- true ->
1097- QueueName = khepri_queue_path_to_name (
1098- Path ),
1099- case delete_in_khepri (QueueName , false ) of
1100- ok ->
1101- Acc ;
1102- Deletions ->
1103- [{QueueName , Deletions } | Acc ]
1104- end ;
1105- false ->
1106- Acc
1107- end
1108- end , [], Qs ),
1109- {QueueNames , Deletions } = lists :unzip (Items ),
1110- {QueueNames , lists :flatten (Deletions )};
1092+ case rabbit_khepri :adv_get_many (PathPattern ) of
1093+ {ok , Props } ->
1094+ Qs = maps :fold (
1095+ fun (Path0 , #{data := Q , payload_version := Vsn }, Acc )
1096+ when ? is_amqqueue (Q ) ->
1097+ case FilterFun (Q ) of
1098+ true ->
1099+ Path = khepri_path :combine_with_conditions (
1100+ Path0 ,
1101+ [# if_payload_version {version = Vsn }]),
1102+ QName = amqqueue :get_name (Q ),
1103+ [{Path , QName } | Acc ];
1104+ false ->
1105+ Acc
1106+ end
1107+ end , [], Props ),
1108+ do_delete_transient_queues_in_khepri (Qs , FilterFun );
1109+ {error , _ } = Error ->
1110+ Error
1111+ end .
1112+
1113+ do_delete_transient_queues_in_khepri ([], _FilterFun ) ->
1114+ % % If there are no changes to make, avoid performing a transaction. When
1115+ % % Khepri is in a minority this avoids a long timeout waiting for the
1116+ % % transaction command to be processed. Otherwise it avoids appending a
1117+ % % somewhat large transaction command to Khepri's log.
1118+ {[], []};
1119+ do_delete_transient_queues_in_khepri (Qs , FilterFun ) ->
1120+ Res = rabbit_khepri :transaction (
1121+ fun () ->
1122+ rabbit_misc :fold_while_ok (
1123+ fun ({Path , QName }, Acc ) ->
1124+ % % Also see `delete_in_khepri/2'.
1125+ case khepri_tx_adv :delete (Path ) of
1126+ {ok , #{data := _ }} ->
1127+ Deletions = rabbit_db_binding :delete_for_destination_in_khepri (
1128+ QName , false ),
1129+ {ok , [{QName , Deletions } | Acc ]};
1130+ {ok , _ } ->
1131+ {ok , Acc };
1132+ {error , _ } = Error ->
1133+ Error
1134+ end
1135+ end , [], Qs )
1136+ end ),
1137+ case Res of
1138+ {ok , Items } ->
1139+ {QNames , Deletions } = lists :unzip (Items ),
1140+ {QNames , lists :flatten (Deletions )};
1141+ {error , {khepri , mismatching_node , _ }} ->
1142+ % % One of the queues changed while attempting to update all
1143+ % % queues. Retry the operation.
1144+ delete_transient_in_khepri (FilterFun );
11111145 {error , _ } = Error ->
11121146 Error
11131147 end .
@@ -1382,6 +1416,3 @@ khepri_queues_path() ->
13821416
13831417khepri_queue_path (# resource {virtual_host = VHost , name = Name }) ->
13841418 [? MODULE , queues , VHost , Name ].
1385-
1386- khepri_queue_path_to_name ([? MODULE , queues , VHost , Name ]) ->
1387- rabbit_misc :r (VHost , queue , Name ).
0 commit comments