Skip to content

Commit b398d40

Browse files
committed
Publish aggregated/per-object/detailed RAFT metrics
For aggregated metrics, we just pick specific metrics (currently num_segments and commit_latency) and only publish the maximum value, without labels (`max_` is added to the metric name). For example: ``` > curl -s localhost:15692/metrics/per-object | rg -e ^rabbitmq_raft_num_segments -e ^rabbitmq_raft_commit_latency rabbitmq_raft_commit_latency_seconds{module="rabbit_khepri",ra_system="coordination"} 0.0 rabbitmq_raft_commit_latency_seconds{queue="qq2",vhost="/"} 0.02 rabbitmq_raft_commit_latency_seconds{queue="qqq-1",vhost="/"} 0.01 rabbitmq_raft_commit_latency_seconds{queue="qqq-2",vhost="/"} 0.0 rabbitmq_raft_num_segments{module="rabbit_khepri",ra_system="coordination"} 1.0 rabbitmq_raft_num_segments{queue="qq2",vhost="/"} 132.0 rabbitmq_raft_num_segments{queue="qqq-2",vhost="/"} 245.0 > curl -s localhost:15692/metrics/ | rg ^rabbitmq_raft_max rabbitmq_raft_max_commit_latency_seconds 0.02 rabbitmq_raft_max_num_segments 245.0 ```
1 parent b882bfb commit b398d40

File tree

3 files changed

+159
-20
lines changed

3 files changed

+159
-20
lines changed

deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_raft_metrics_collector.erl

Lines changed: 106 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
-module(prometheus_rabbitmq_raft_metrics_collector).
88

99
-behaviour(prometheus_collector).
10-
-include_lib("prometheus/include/prometheus.hrl").
1110

1211
-export([register/0,
1312
deregister_cleanup/1,
@@ -16,7 +15,8 @@
1615
-import(prometheus_model_helpers, [create_mf/4,
1716
counter_metric/2]).
1817

19-
-define(METRIC_NAME_PREFIX, "rabbitmq_raft_").
18+
-define(METRIC_NAME_PREFIX, <<"rabbitmq_raft_">>).
19+
-define(DETAILED_METRIC_NAME_PREFIX, <<"rabbitmq_raft_detailed_">>).
2020

2121
%%====================================================================
2222
%% Collector API
@@ -28,19 +28,115 @@ register() ->
2828
deregister_cleanup(_) ->
2929
ok.
3030

31+
collect_mf('per-object', Callback) ->
32+
collect_per_object_metrics(?METRIC_NAME_PREFIX, Callback);
33+
collect_mf('detailed', Callback) ->
34+
case get(prometheus_mf_filter) of
35+
undefined ->
36+
ok;
37+
MFNames ->
38+
case lists:member(raft_metrics, MFNames) of
39+
true ->
40+
collect_detailed_metrics(?DETAILED_METRIC_NAME_PREFIX, Callback);
41+
false ->
42+
ok
43+
end
44+
end;
3145
collect_mf(_Registry, Callback) ->
46+
case application:get_env(rabbitmq_prometheus, return_per_object_metrics, false) of
47+
false ->
48+
collect_aggregate_metrics(?METRIC_NAME_PREFIX, Callback);
49+
true ->
50+
collect_per_object_metrics(?METRIC_NAME_PREFIX, Callback)
51+
end.
52+
53+
%% INTERNAL
54+
55+
collect_aggregate_metrics(Prefix, Callback) ->
56+
collect_max_values(Prefix, Callback),
57+
collect_key_component_metrics(Prefix, Callback).
58+
59+
collect_per_object_metrics(Prefix, Callback) ->
60+
collect_key_component_metrics(Prefix, Callback),
61+
collect_key_per_object_metrics(Prefix, Callback).
62+
63+
collect_detailed_metrics(Prefix, Callback) ->
64+
VHostFilterFun = case get(prometheus_vhost_filter) of
65+
undefined ->
66+
fun(_) -> true end;
67+
VHosts ->
68+
fun(#{vhost := V}) ->
69+
lists:member(V, VHosts);
70+
(_) ->
71+
false
72+
end
73+
end,
74+
75+
collect_key_component_metrics(Prefix, Callback),
76+
collect_all_matching_metrics(Prefix, Callback, VHostFilterFun).
77+
78+
collect_key_per_object_metrics(Prefix, Callback) ->
79+
QQMetrics = [term,
80+
snapshot_index,
81+
last_applied,
82+
commit_index,
83+
last_written_index,
84+
commit_latency,
85+
num_segments],
86+
maps:foreach(
87+
fun(Name, #{type := Type, help := Help, values := Values}) ->
88+
Callback(
89+
create_mf(<<Prefix/binary, (prometheus_model_helpers:metric_name(Name))/binary>>,
90+
Help,
91+
Type,
92+
Values))
93+
end,
94+
seshat:format(ra, #{labels => as_binary, metrics => QQMetrics})).
95+
96+
collect_all_matching_metrics(Prefix, Callback, VHostFilterFun) ->
97+
maps:foreach(
98+
fun(Name, #{type := Type, help := Help, values := Values0}) ->
99+
Values = maps:filter(fun(#{vhost := V}, _) ->
100+
VHostFilterFun(V);
101+
(_, _) -> true
102+
end, Values0),
103+
Callback(
104+
create_mf(<<Prefix/binary, (prometheus_model_helpers:metric_name(Name))/binary>>,
105+
Help,
106+
Type,
107+
Values))
108+
end,
109+
seshat:format(ra, #{labels => as_binary, metrics => all, filter_fun => VHostFilterFun})).
110+
111+
collect_max_values(Prefix, Callback) ->
112+
%% max values for QQ metrics
113+
%% eg.
114+
%% rabbitmq_raft_num_segments{queue="q1",vhost="/"} 5.0
115+
%% rabbitmq_raft_num_segments{queue="q2",vhost="/"} 10.0
116+
%% becomes
117+
%% rabbitmq_raft_max_num_segments 10.0
118+
QQMetrics = [num_segments],
119+
maps:foreach(
120+
fun(Name, #{type := Type, help := Help, values := Values}) ->
121+
Max = lists:max(maps:values(Values)),
122+
Callback(
123+
create_mf(<<Prefix/binary, "max_", (prometheus_model_helpers:metric_name(Name))/binary>>,
124+
Help,
125+
Type,
126+
#{#{} => Max}))
127+
128+
end,
129+
seshat:format(ra, #{labels => as_binary, metrics => QQMetrics})).
130+
131+
collect_key_component_metrics(Prefix, Callback) ->
132+
WALMetrics = [wal_files, bytes_written, mem_tables],
133+
SegmentWriterMetrics = [entries, segments],
32134
maps:foreach(
33135
fun(Name, #{type := Type, help := Help, values := Values}) ->
34136
Callback(
35-
create_mf(?METRIC_NAME(Name),
137+
create_mf(<<Prefix/binary, (prometheus_model_helpers:metric_name(Name))/binary>>,
36138
Help,
37139
Type,
38140
Values))
39141
end,
40-
seshat:format(ra, [term,
41-
snapshot_index,
42-
last_applied,
43-
commit_index,
44-
last_written_index,
45-
commit_latency,
46-
num_segments])).
142+
seshat:format(ra, #{labels => as_binary, metrics => WALMetrics ++ SegmentWriterMetrics})).

deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ build_dispatcher() ->
1717
prometheus_rabbitmq_core_metrics_collector,
1818
prometheus_rabbitmq_global_metrics_collector,
1919
prometheus_rabbitmq_message_size_metrics_collector,
20+
prometheus_rabbitmq_raft_metrics_collector,
2021
prometheus_rabbitmq_alarm_metrics_collector,
2122
prometheus_rabbitmq_dynamic_collector,
2223
prometheus_process_collector],
@@ -26,8 +27,7 @@ build_dispatcher() ->
2627
prometheus_vm_memory_collector,
2728
prometheus_mnesia_collector,
2829
prometheus_vm_statistics_collector,
29-
prometheus_vm_msacc_collector,
30-
prometheus_rabbitmq_raft_metrics_collector
30+
prometheus_vm_msacc_collector
3131
],
3232
prometheus_registry:register_collectors(
3333
case application:get_env(rabbitmq_prometheus, return_per_object_metrics, fasle) of
@@ -38,7 +38,8 @@ build_dispatcher() ->
3838
prometheus_registry:register_collectors('per-object',
3939
CoreCollectors ++ PerObjectCollectors),
4040
prometheus_registry:register_collectors('detailed', [
41-
prometheus_rabbitmq_core_metrics_collector
41+
prometheus_rabbitmq_core_metrics_collector,
42+
prometheus_rabbitmq_raft_metrics_collector
4243
]),
4344
prometheus_registry:register_collectors('memory-breakdown', [
4445
prometheus_rabbitmq_core_metrics_collector

deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ groups() ->
7272
vhost_status_metric,
7373
exchange_bindings_metric,
7474
exchange_names_metric,
75-
stream_pub_sub_metrics
75+
stream_pub_sub_metrics,
76+
raft_detailed_metrics_test
7677
]},
7778
{special_chars, [], [core_metrics_special_chars]},
7879
{authentication, [], [basic_auth]}
@@ -158,6 +159,12 @@ init_per_group(detailed_metrics, Config0) ->
158159
Q <- [ <<"queue-with-messages">>, <<"queue-with-consumer">> ]
159160
],
160161

162+
amqp_channel:call(DefaultCh,
163+
#'queue.declare'{queue = <<"a_quorum_queue">>,
164+
durable = true,
165+
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]
166+
}),
167+
161168
DefaultConsumer = sleeping_consumer(),
162169
#'basic.consume_ok'{consumer_tag = DefaultCTag} =
163170
amqp_channel:subscribe(DefaultCh, #'basic.consume'{queue = <<"default-queue-with-consumer">>}, DefaultConsumer),
@@ -400,7 +407,14 @@ aggregated_metrics_test(Config) ->
400407
?assertEqual(match, re:run(Body, "^rabbitmq_erlang_uptime_seconds ", [{capture, none}, multiline])),
401408
?assertEqual(match, re:run(Body, "^rabbitmq_io_read_time_seconds_total ", [{capture, none}, multiline])),
402409
%% Check the first TOTALS metric value
403-
?assertEqual(match, re:run(Body, "^rabbitmq_connections ", [{capture, none}, multiline])).
410+
?assertEqual(match, re:run(Body, "^rabbitmq_connections ", [{capture, none}, multiline])),
411+
?assertEqual(nomatch, re:run(Body, "^rabbitmq_raft_commit_latency_seconds", [{capture, none}, multiline])),
412+
?assertEqual(match, re:run(Body, "^rabbitmq_raft_bytes_written.*ra_log_segment_writer", [{capture, none}, multiline])),
413+
?assertEqual(match, re:run(Body, "^rabbitmq_raft_bytes_written.*ra_log_wal", [{capture, none}, multiline])),
414+
?assertEqual(match, re:run(Body, "^rabbitmq_raft_entries{", [{capture, none}, multiline])),
415+
?assertEqual(match, re:run(Body, "^rabbitmq_raft_mem_tables{", [{capture, none}, multiline])),
416+
?assertEqual(match, re:run(Body, "^rabbitmq_raft_segments{", [{capture, none}, multiline])),
417+
?assertEqual(match, re:run(Body, "^rabbitmq_raft_wal_files{", [{capture, none}, multiline])).
404418

405419
endpoint_per_object_metrics(Config) ->
406420
per_object_metrics_test(Config, "/metrics/per-object").
@@ -438,7 +452,8 @@ per_object_metrics_test(Config, Path) ->
438452
?assertEqual(match, re:run(Body, "^rabbitmq_io_read_time_seconds_total ", [{capture, none}, multiline])),
439453
?assertEqual(match, re:run(Body, "^rabbitmq_raft_commit_latency_seconds{", [{capture, none}, multiline])),
440454
%% Check the first TOTALS metric value
441-
?assertEqual(match, re:run(Body, "^rabbitmq_connections ", [{capture, none}, multiline])).
455+
?assertEqual(match, re:run(Body, "^rabbitmq_connections ", [{capture, none}, multiline])),
456+
?assertEqual(match, re:run(Body, "^rabbitmq_raft_num_segments{", [{capture, none}, multiline])).
442457

443458
memory_breakdown_metrics_test(Config) ->
444459
{_Headers, Body} = http_get_with_pal(Config, "/metrics/memory-breakdown", [], 200),
@@ -552,7 +567,8 @@ queue_consumer_count_all_vhosts_per_object_test(Config) ->
552567
#{queue => "vhost-2-queue-with-consumer",vhost => "vhost-2"} => [1],
553568
#{queue => "vhost-2-queue-with-messages",vhost => "vhost-2"} => [0],
554569
#{queue => "default-queue-with-consumer",vhost => "/"} => [1],
555-
#{queue => "default-queue-with-messages",vhost => "/"} => [0]},
570+
#{queue => "default-queue-with-messages",vhost => "/"} => [0],
571+
#{queue => "a_quorum_queue",vhost => "/"} => [0]},
556572

557573
rabbitmq_detailed_queue_info =>
558574
#{#{queue => "default-queue-with-consumer",
@@ -578,7 +594,10 @@ queue_consumer_count_all_vhosts_per_object_test(Config) ->
578594
#{queue => "vhost-2-queue-with-messages",
579595
vhost => "vhost-2",
580596
queue_type => "rabbit_classic_queue",
581-
membership => "leader"} => [1]}
597+
membership => "leader"} => [1],
598+
#{membership => "leader",
599+
queue => "a_quorum_queue",vhost => "/",
600+
queue_type => "rabbit_quorum_queue"} => [1]}
582601
},
583602

584603
%% No vhost given, all should be returned
@@ -596,7 +615,8 @@ queue_coarse_metrics_per_object_test(Config) ->
596615
Expected2 = #{#{queue => "vhost-2-queue-with-consumer", vhost => "vhost-2"} => [11],
597616
#{queue => "vhost-2-queue-with-messages", vhost => "vhost-2"} => [11]},
598617
ExpectedD = #{#{queue => "default-queue-with-consumer", vhost => "/"} => [3],
599-
#{queue => "default-queue-with-messages", vhost => "/"} => [3]},
618+
#{queue => "default-queue-with-messages", vhost => "/"} => [3],
619+
#{queue => "a_quorum_queue",vhost => "/"} => [0]},
600620

601621
{_, Body1} = http_get_with_pal(Config, "/metrics/detailed?vhost=vhost-1&family=queue_coarse_metrics", [], 200),
602622
?assertEqual(Expected1,
@@ -704,7 +724,8 @@ queue_metrics_per_object_test(Config) ->
704724
Expected2 = #{#{queue => "vhost-2-queue-with-consumer", vhost => "vhost-2"} => [11],
705725
#{queue => "vhost-2-queue-with-messages", vhost => "vhost-2"} => [1]},
706726
ExpectedD = #{#{queue => "default-queue-with-consumer", vhost => "/"} => [3],
707-
#{queue => "default-queue-with-messages", vhost => "/"} => [1]},
727+
#{queue => "default-queue-with-messages", vhost => "/"} => [1],
728+
#{queue => "a_quorum_queue",vhost => "/"} => [0]},
708729
{_, Body1} = http_get_with_pal(Config, "/metrics/detailed?vhost=vhost-1&family=queue_metrics", [], 200),
709730
?assertEqual(Expected1,
710731
map_get(rabbitmq_detailed_queue_messages_ram, parse_response(Body1))),
@@ -835,6 +856,27 @@ core_metrics_special_chars(Config) ->
835856
maps:to_list(LabelValue3)),
836857
ok.
837858

859+
raft_detailed_metrics_test(Config) ->
860+
ComponentMetrics = #{#{module => "ra_log_wal", ra_system => "coordination"} => ["1.0"],
861+
#{module => "ra_log_wal", ra_system => "quorum_queues"} => ["1.0"]},
862+
QQMetrics = #{#{queue => "a_quorum_queue", vhost => "/"} => ["1.0"]},
863+
864+
{_, Body1} = http_get_with_pal(Config, "/metrics/detailed?family=raft_metrics&vhost=foo", [], 200),
865+
%% no queues in vhost foo, so no QQ metrics
866+
?assertEqual(ComponentMetrics,
867+
map_get(rabbitmq_raft_detailed_wal_files, parse_response(Body1))),
868+
?assertEqual(undefined,
869+
maps:get(rabbitmq_raft_detailed_term, parse_response(Body1), undefined)),
870+
871+
{_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=raft_metrics&vhost=/", [], 200),
872+
%% there's a queue in vhost /
873+
?assertEqual(ComponentMetrics,
874+
map_get(rabbitmq_raft_detailed_wal_files, parse_response(Body2))),
875+
?assertEqual(QQMetrics,
876+
map_get(rabbitmq_raft_detailed_term, parse_response(Body2))),
877+
878+
ok.
879+
838880
basic_auth(Config) ->
839881
http_get(Config, [{"accept-encoding", "deflate"}], 401),
840882
AuthHeader = rabbit_mgmt_test_util:auth_header("guest", "guest"),

0 commit comments

Comments
 (0)