Skip to content

Commit 289029f

Browse files
markusgustgomoripeti
authored andcommitted
Expose Stream Connection metrics via Prometheus
Supports both per connection (detailed) and aggregated total (metrics) values.
1 parent beebf76 commit 289029f

File tree

1 file changed

+68
-1
lines changed

1 file changed

+68
-1
lines changed

deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,20 @@
200200
{4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"}
201201
]},
202202

203+
%% the family name for these metrics is stream_publisher_metrics but the real table used for data is rabbit_stream_publisher_created.
204+
{stream_publisher_metrics, [
205+
{2, undefined, stream_publisher_published_total, counter, "Total number of messages published", published},
206+
{2, undefined, stream_publisher_confirmed_total, counter, "Total number of messages confirmed", confirmed},
207+
{2, undefined, stream_publisher_error_messages, counter, "Total number of messages errored", errored}
208+
]},
209+
210+
%% the family name for these metrics is stream_consumer_metrics but the real table used for data is rabbit_stream_consumer_created.
211+
{stream_consumer_metrics, [
212+
{2, undefined, stream_consumer_consumed_total, counter, "Total number of messages consumed", consumed},
213+
{2, undefined, stream_consumer_offset, counter, "Total offset", offset},
214+
{2, undefined, stream_consumer_offset_lag, counter, "Total offset lag", offset_lag}
215+
]},
216+
203217
{connection_metrics, [
204218
{2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt},
205219
{2, undefined, connection_outgoing_packets_total, counter, "Total number of packets sent on a connection", send_cnt},
@@ -471,6 +485,11 @@ label(M) when is_map(M) ->
471485
label(#resource{virtual_host = VHost, kind = exchange, name = Name}) ->
472486
<<"vhost=\"", (escape_label_value(VHost))/binary, "\",",
473487
"exchange=\"", (escape_label_value(Name))/binary, "\"">>;
488+
489+
label({#resource{virtual_host = VHost, kind = queue, name = Name}, P, _}) when is_pid(P) ->
490+
<<"vhost=\"", (escape_label_value(VHost))/binary, "\",",
491+
"queue=\"", (escape_label_value(Name))/binary, "\",",
492+
"channel=\"", (iolist_to_binary(pid_to_list(P)))/binary, "\"">>;
474493
label(#resource{virtual_host = VHost, kind = queue, name = Name}) ->
475494
<<"vhost=\"", (escape_label_value(VHost))/binary, "\",",
476495
"queue=\"", (escape_label_value(Name))/binary, "\"">>;
@@ -578,6 +597,36 @@ get_data(channel_metrics = Table, false, _) ->
578597
[{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3},
579598
{messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6},
580599
{global_prefetch_count, A7}]}];
600+
get_data(stream_publisher_metrics = Table, false, _) ->
601+
RealTable = rabbit_stream_publisher_created, %% real table name
602+
try ets:foldl(fun({_, Props}, {T, A1, A2, A3}) ->
603+
{T,
604+
sum(proplists:get_value(published, Props), A1),
605+
sum(proplists:get_value(confirmed, Props), A2),
606+
sum(proplists:get_value(errored, Props), A3)
607+
}
608+
end, empty(Table), RealTable) of
609+
{Table, A1, A2, A3} ->
610+
[{Table, [{published, A1}, {confirmed, A2}, {errored, A3}]}]
611+
catch error:badarg ->
612+
%% rabbitmq_stream plugin is not enabled
613+
[]
614+
end;
615+
get_data(stream_consumer_metrics = Table, false, _) ->
616+
RealTable = rabbit_stream_consumer_created, %% real table name
617+
try ets:foldl(fun({_, Props}, {T, A1, A2, A3}) ->
618+
{T,
619+
sum(proplists:get_value(consumed, Props), A1),
620+
sum(proplists:get_value(offset, Props), A2),
621+
sum(proplists:get_value(offset_lag, Props), A3)
622+
}
623+
end, empty(Table), RealTable) of
624+
{Table, A1, A2, A3} ->
625+
[{Table, [{consumed, A1}, {offset, A2}, {offset_lag, A3}]}]
626+
catch error:badarg ->
627+
%% rabbitmq_stream plugin is not enabled
628+
[]
629+
end;
581630
get_data(queue_consumer_count = MF, false, VHostsFilter) ->
582631
Table = queue_metrics, %% Real table name
583632
{_, A1} = ets:foldl(fun
@@ -708,6 +757,20 @@ get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics
708757
end, [], Table);
709758
get_data(queue_consumer_count, true, _) ->
710759
ets:tab2list(queue_metrics);
760+
get_data(stream_publisher_metrics, true, _) ->
761+
try
762+
ets:tab2list(rabbit_stream_publisher_created)
763+
catch error:badarg ->
764+
%% rabbitmq_stream plugin is not enabled
765+
[]
766+
end;
767+
get_data(stream_consumer_metrics, true, _) ->
768+
try
769+
ets:tab2list(rabbit_stream_consumer_created)
770+
catch error:badarg ->
771+
%% rabbitmq_stream plugin is not enabled
772+
[]
773+
end;
711774
get_data(vhost_status, _, _) ->
712775
[ { #{<<"vhost">> => VHost},
713776
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
@@ -817,7 +880,11 @@ accumulate_count_and_sum(Value, {Count, Sum}) ->
817880

818881
empty(T) when T == channel_queue_exchange_metrics; T == queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count ->
819882
{T, 0};
820-
empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics ->
883+
empty(T) when T == connection_coarse_metrics;
884+
T == stream_publisher_metrics;
885+
T == stream_consumer_metrics;
886+
T == auth_attempt_metrics;
887+
T == auth_attempt_detailed_metrics ->
821888
{T, 0, 0, 0};
822889
empty(T) when T == channel_exchange_metrics; T == exchange_metrics; T == queue_coarse_metrics; T == connection_metrics ->
823890
{T, 0, 0, 0, 0};

0 commit comments

Comments
 (0)