@@ -427,8 +427,12 @@ maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
427427 [Type , VhostSpec , QueueSpec ]),
428428 Running = rabbit_maintenance :filter_out_drained_nodes_consistent_read (rabbit_nodes :list_running ()),
429429 NumRunning = length (Running ),
430+ TypeModule = case Type of
431+ all -> all ;
432+ _ -> rabbit_queue_type :discover (Type )
433+ end ,
430434 ToRebalance = [Q || Q <- list (),
431- filter_per_type ( Type , Q ),
435+ filter_per_type_for_rebalance ( TypeModule , Q ),
432436 is_replicable (Q ),
433437 is_match (amqqueue :get_vhost (Q ), VhostSpec ) andalso
434438 is_match (get_resource_name (amqqueue :get_name (Q )), QueueSpec )],
@@ -448,14 +452,10 @@ maybe_rebalance(false, _Type, _VhostSpec, _QueueSpec) ->
448452 {error , rebalance_in_progress }.
449453
450454% % Stream queues don't yet support rebalance
451- filter_per_type (all , Q ) ->
452- ? amqqueue_is_quorum (Q ) or ? amqqueue_is_stream (Q );
453- filter_per_type (quorum , Q ) ->
454- ? amqqueue_is_quorum (Q );
455- filter_per_type (stream , Q ) ->
456- ? amqqueue_is_stream (Q );
457- filter_per_type (classic , Q ) ->
458- ? amqqueue_is_classic (Q ).
455+ filter_per_type_for_rebalance (all , Q ) ->
456+ rabbit_queue_type :rebalance_module (Q ) /= undefined ;
457+ filter_per_type_for_rebalance (TypeModule , Q ) ->
458+ ? amqqueue_type_is (Q , TypeModule ).
459459
460460% % TODO: note that it can return {error, not_supported}.
461461% % this will result in a badmatch. However that's fine
0 commit comments