Skip to content

Commit 52e63c7

Browse files
markusgustgomoripeti
authored andcommitted
Expose Stream Connection metrics via Prometheus
Supports both per connection (detailed) and aggregated total (metrics) values.
1 parent 9db4317 commit 52e63c7

File tree

1 file changed

+78
-1
lines changed

1 file changed

+78
-1
lines changed

deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,20 @@
200200
{4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"}
201201
]},
202202

203+
%% the family name for these metrics is stream_publisher_metrics but the real table used for data is rabbit_stream_publisher_created.
204+
{stream_publisher_metrics, [
205+
{2, undefined, stream_publisher_published_total, counter, "Total number of messages published", published},
206+
{2, undefined, stream_publisher_confirmed_total, counter, "Total number of messages confirmed", confirmed},
207+
{2, undefined, stream_publisher_error_messages, counter, "Total number of messages errored", errored}
208+
]},
209+
210+
%% the family name for these metrics is stream_consumer_metrics but the real table used for data is rabbit_stream_consumer_created.
211+
{stream_consumer_metrics, [
212+
{2, undefined, stream_consumer_consumed_total, counter, "Total number of messages consumed", consumed},
213+
{2, undefined, stream_consumer_offset, counter, "Total offset", offset},
214+
{2, undefined, stream_consumer_offset_lag, counter, "Total offset lag", offset_lag}
215+
]},
216+
203217
{connection_metrics, [
204218
{2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt},
205219
{2, undefined, connection_outgoing_packets_total, counter, "Total number of packets sent on a connection", send_cnt},
@@ -471,6 +485,19 @@ label(M) when is_map(M) ->
471485
label(#resource{virtual_host = VHost, kind = exchange, name = Name}) ->
472486
<<"vhost=\"", (escape_label_value(VHost))/binary, "\",",
473487
"exchange=\"", (escape_label_value(Name))/binary, "\"">>;
488+
489+
label({stream_publisher, {#resource{virtual_host = VHost, kind = queue, name = Name}, P, PubId}}) ->
490+
%% stream_consumer_metrics {queue_id, connection_pid, subscription_id}
491+
<<"vhost=\"", (escape_label_value(VHost))/binary, "\",",
492+
"queue=\"", (escape_label_value(Name))/binary, "\",",
493+
"connection=\"", (iolist_to_binary(pid_to_list(P)))/binary, "\",",
494+
"publisher_id=\"", (integer_to_binary(PubId))/binary, "\"">>;
495+
label({stream_consumer, {#resource{virtual_host = VHost, kind = queue, name = Name}, P, SubId}}) ->
496+
%% stream_consumer_metrics {queue_id, connection_pid, subscription_id}
497+
<<"vhost=\"", (escape_label_value(VHost))/binary, "\",",
498+
"queue=\"", (escape_label_value(Name))/binary, "\",",
499+
"connection=\"", (iolist_to_binary(pid_to_list(P)))/binary, "\",",
500+
"subscription_id=\"", (integer_to_binary(SubId))/binary, "\"">>;
474501
label(#resource{virtual_host = VHost, kind = queue, name = Name}) ->
475502
<<"vhost=\"", (escape_label_value(VHost))/binary, "\",",
476503
"queue=\"", (escape_label_value(Name))/binary, "\"">>;
@@ -578,6 +605,36 @@ get_data(channel_metrics = Table, false, _) ->
578605
[{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3},
579606
{messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6},
580607
{global_prefetch_count, A7}]}];
608+
get_data(stream_publisher_metrics = Table, false, _) ->
609+
RealTable = rabbit_stream_publisher_created, %% real table name
610+
try ets:foldl(fun({_, Props}, {T, A1, A2, A3}) ->
611+
{T,
612+
sum(proplists:get_value(published, Props), A1),
613+
sum(proplists:get_value(confirmed, Props), A2),
614+
sum(proplists:get_value(errored, Props), A3)
615+
}
616+
end, empty(Table), RealTable) of
617+
{Table, A1, A2, A3} ->
618+
[{Table, [{published, A1}, {confirmed, A2}, {errored, A3}]}]
619+
catch error:badarg ->
620+
%% rabbitmq_stream plugin is not enabled
621+
[]
622+
end;
623+
get_data(stream_consumer_metrics = Table, false, _) ->
624+
RealTable = rabbit_stream_consumer_created, %% real table name
625+
try ets:foldl(fun({_, Props}, {T, A1, A2, A3}) ->
626+
{T,
627+
sum(proplists:get_value(consumed, Props), A1),
628+
sum(proplists:get_value(offset, Props), A2),
629+
sum(proplists:get_value(offset_lag, Props), A3)
630+
}
631+
end, empty(Table), RealTable) of
632+
{Table, A1, A2, A3} ->
633+
[{Table, [{consumed, A1}, {offset, A2}, {offset_lag, A3}]}]
634+
catch error:badarg ->
635+
%% rabbitmq_stream plugin is not enabled
636+
[]
637+
end;
581638
get_data(queue_consumer_count = MF, false, VHostsFilter) ->
582639
Table = queue_metrics, %% Real table name
583640
{_, A1} = ets:foldl(fun
@@ -708,6 +765,22 @@ get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics
708765
end, [], Table);
709766
get_data(queue_consumer_count, true, _) ->
710767
ets:tab2list(queue_metrics);
768+
get_data(stream_publisher_metrics, true, _) ->
769+
try
770+
ets:select(rabbit_stream_publisher_created,
771+
[{{'$1', '$2'}, [], [{{ {{stream_publisher, '$1'}}, '$2' }}]}])
772+
catch error:badarg ->
773+
%% rabbitmq_stream plugin is not enabled
774+
[]
775+
end;
776+
get_data(stream_consumer_metrics, true, _) ->
777+
try
778+
ets:select(rabbit_stream_consumer_created,
779+
[{{'$1', '$2'}, [], [{{ {{stream_consumer, '$1'}}, '$2' }}]}])
780+
catch error:badarg ->
781+
%% rabbitmq_stream plugin is not enabled
782+
[]
783+
end;
711784
get_data(vhost_status, _, _) ->
712785
[ { #{<<"vhost">> => VHost},
713786
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
@@ -817,7 +890,11 @@ accumulate_count_and_sum(Value, {Count, Sum}) ->
817890

818891
empty(T) when T == channel_queue_exchange_metrics; T == queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count ->
819892
{T, 0};
820-
empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics ->
893+
empty(T) when T == connection_coarse_metrics;
894+
T == stream_publisher_metrics;
895+
T == stream_consumer_metrics;
896+
T == auth_attempt_metrics;
897+
T == auth_attempt_detailed_metrics ->
821898
{T, 0, 0, 0};
822899
empty(T) when T == channel_exchange_metrics; T == exchange_metrics; T == queue_coarse_metrics; T == connection_metrics ->
823900
{T, 0, 0, 0, 0};

0 commit comments

Comments
 (0)