@@ -301,14 +301,25 @@ register() ->
301301deregister_cleanup (_ ) -> ok .
302302
303303collect_mf ('detailed' , Callback ) ->
304- collect (true , ? DETAILED_METRIC_NAME_PREFIX , vhosts_filter_from_pdict (), enabled_mfs_from_pdict (? METRICS_RAW ), Callback ),
304+ IncludedMFs = enabled_mfs_from_pdict (? METRICS_RAW ),
305+ collect (true , ? DETAILED_METRIC_NAME_PREFIX , vhosts_filter_from_pdict (), IncludedMFs , Callback ),
305306 collect (true , ? CLUSTER_METRIC_NAME_PREFIX , vhosts_filter_from_pdict (), enabled_mfs_from_pdict (? METRICS_CLUSTER ), Callback ),
307+ % % the detailed endpoint should emit queue_info only if queue metrics were requested
308+ MFs = proplists :get_keys (IncludedMFs ),
309+ case lists :member (queue_coarse_metrics , MFs ) orelse
310+ lists :member (queue_consumer_count , MFs ) orelse
311+ lists :member (queue_metrics , MFs ) of
312+ true ->
313+ emit_queue_info (? DETAILED_METRIC_NAME_PREFIX , vhosts_filter_from_pdict (), Callback );
314+ false -> ok
315+ end ,
306316 % % identity is here to enable filtering on a cluster name (as already happens in existing dashboards)
307317 emit_identity_info (<<" detailed" >>, Callback ),
308318 ok ;
309319collect_mf ('per-object' , Callback ) ->
310320 collect (true , ? METRIC_NAME_PREFIX , false , ? METRICS_RAW , Callback ),
311321 totals (Callback ),
322+ emit_queue_info (? METRIC_NAME_PREFIX , false , Callback ),
312323 emit_identity_info (<<" per-object" >>, Callback ),
313324 ok ;
314325collect_mf ('memory-breakdown' , Callback ) ->
@@ -406,6 +417,47 @@ identity_info(Endpoint) ->
406417 }]
407418 }.
408419
420+ has_leader_running_locally (Q ) when is_pid (Q ) ->
421+ node (Q ) =:= node () andalso is_process_alive (Q );
422+ has_leader_running_locally ({Name , Node }) when Node =:= node () ->
423+ is_process_alive (whereis (Name ));
424+ has_leader_running_locally (_ ) ->
425+ false .
426+
427+ emit_queue_info (Prefix , VHostsFilter , Callback ) ->
428+ Help = <<" A metric with a constant '1' value and labels that provide some queue details" >>,
429+ QInfos = lists :foldl (
430+ fun (Q , Acc ) ->
431+ # resource {virtual_host = VHost , name = Name } = amqqueue :get_name (Q ),
432+ case is_map (VHostsFilter ) andalso maps :get (VHost , VHostsFilter ) == false of
433+ true -> Acc ;
434+ false ->
435+ Type = amqqueue :get_type (Q ),
436+ TypeState = amqqueue :get_type_state (Q ),
437+ QInfo0 = [{vhost , VHost }, {queue , Name }, {queue_type , Type }],
438+ Members = maps :get (nodes , TypeState , []),
439+ case {rabbit_amqqueue :is_replicated (Q ),
440+ has_leader_running_locally (amqqueue :get_pid (Q )),
441+ lists :member (node (), Members )} of
442+ {false , true , _ } ->
443+ % % non-replicated queue running on this node
444+ [{QInfo0 , 1 }|Acc ];
445+ {true , true , _ } ->
446+ % % replicated queue with a leader on this node
447+ QInfo = [{membership , leader } | QInfo0 ],
448+ [{QInfo , 1 }|Acc ];
449+ {true , false , true } ->
450+ % % replicated queue with a non-leader member on this node
451+ QInfo = [{membership , follower } | QInfo0 ],
452+ [{QInfo , 1 }|Acc ];
453+ _ ->
454+ % % ignore queues with no local members
455+ Acc
456+ end
457+ end
458+ end , [], rabbit_amqqueue :list ()),
459+ Callback (prometheus_model_helpers :create_mf (<<Prefix /binary , " queue_info" >>, Help , gauge , QInfos )).
460+
409461add_metric_family ({Name , Type , Help , Metrics }, Callback ) ->
410462 MN = <<? METRIC_NAME_PREFIX /binary , (prometheus_model_helpers :metric_name (Name ))/binary >>,
411463 Callback (create_mf (MN , Help , Type , Metrics )).
@@ -890,4 +942,3 @@ vhosts_filter_from_pdict() ->
890942 Enabled = maps :from_list ([ {VHost , true } || VHost <- L ]),
891943 maps :merge (All , Enabled )
892944 end .
893-
0 commit comments