Skip to content

Commit 2affbe8

Browse files
markusgustgomoripeti
authored andcommitted
Expose Stream Connection metrics via Prometheus
Supports both per connection (detailed) and aggregated total (metrics) values.
1 parent c67c940 commit 2affbe8

File tree

1 file changed

+68
-1
lines changed

1 file changed

+68
-1
lines changed

deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,20 @@
203203
{4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"}
204204
]},
205205

206+
%% the family name for these metrics is stream_publisher_metrics but the real table used for data is rabbit_stream_publisher_created.
207+
{stream_publisher_metrics, [
208+
{2, undefined, stream_publisher_published_total, counter, "Total number of messages published", published},
209+
{2, undefined, stream_publisher_confirmed_total, counter, "Total number of messages confirmed", confirmed},
210+
{2, undefined, stream_publisher_error_messages, counter, "Total number of messages errored", errored}
211+
]},
212+
213+
%% the family name for these metrics is stream_consumer_metrics but the real table used for data is rabbit_stream_consumer_created.
214+
{stream_consumer_metrics, [
215+
{2, undefined, stream_consumer_consumed_total, counter, "Total number of messages consumed", consumed},
216+
{2, undefined, stream_consumer_offset, counter, "Total offset", offset},
217+
{2, undefined, stream_consumer_offset_lag, counter, "Total offset lag", offset_lag}
218+
]},
219+
206220
{connection_metrics, [
207221
{2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt},
208222
{2, undefined, connection_outgoing_packets_total, counter, "Total number of packets sent on a connection", send_cnt},
@@ -417,6 +431,11 @@ label(M) when is_map(M) ->
417431
label(#resource{virtual_host = VHost, kind = exchange, name = Name}) ->
418432
<<"vhost=\"", (escape_label_value(VHost))/binary, "\",",
419433
"exchange=\"", (escape_label_value(Name))/binary, "\"">>;
434+
435+
label({#resource{virtual_host = VHost, kind = queue, name = Name}, P, _}) when is_pid(P) ->
436+
<<"vhost=\"", (escape_label_value(VHost))/binary, "\",",
437+
"queue=\"", (escape_label_value(Name))/binary, "\",",
438+
"channel=\"", (iolist_to_binary(pid_to_list(P)))/binary, "\"">>;
420439
label(#resource{virtual_host = VHost, kind = queue, name = Name}) ->
421440
<<"vhost=\"", (escape_label_value(VHost))/binary, "\",",
422441
"queue=\"", (escape_label_value(Name))/binary, "\"">>;
@@ -516,6 +535,36 @@ get_data(channel_metrics = Table, false, _, _) ->
516535
[{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3},
517536
{messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6},
518537
{global_prefetch_count, A7}]}];
538+
get_data(stream_publisher_metrics = Table, false, _, _) ->
539+
RealTable = rabbit_stream_publisher_created, %% real table name
540+
try ets:foldl(fun({_, Props}, {T, A1, A2, A3}) ->
541+
{T,
542+
sum(proplists:get_value(published, Props), A1),
543+
sum(proplists:get_value(confirmed, Props), A2),
544+
sum(proplists:get_value(errored, Props), A3)
545+
}
546+
end, empty(Table), RealTable) of
547+
{Table, A1, A2, A3} ->
548+
[{Table, [{published, A1}, {confirmed, A2}, {errored, A3}]}]
549+
catch error:badarg ->
550+
%% rabbitmq_stream plugin is not enabled
551+
[]
552+
end;
553+
get_data(stream_consumer_metrics = Table, false, _, _) ->
554+
RealTable = rabbit_stream_consumer_created, %% real table name
555+
try ets:foldl(fun({_, Props}, {T, A1, A2, A3}) ->
556+
{T,
557+
sum(proplists:get_value(consumed, Props), A1),
558+
sum(proplists:get_value(offset, Props), A2),
559+
sum(proplists:get_value(offset_lag, Props), A3)
560+
}
561+
end, empty(Table), RealTable) of
562+
{Table, A1, A2, A3} ->
563+
[{Table, [{consumed, A1}, {offset, A2}, {offset_lag, A3}]}]
564+
catch error:badarg ->
565+
%% rabbitmq_stream plugin is not enabled
566+
[]
567+
end;
519568
get_data(queue_consumer_count = MF, false, VHostsFilter, QueuesFilter) ->
520569
Table = queue_metrics, %% Real table name
521570
{_, A1} = ets:foldl(fun
@@ -622,6 +671,20 @@ get_data(MF, true, VHostsFilter, _) when is_map(VHostsFilter), MF == queue_metri
622671
end, [], Table);
623672
get_data(queue_consumer_count, true, _, _) ->
624673
ets:tab2list(queue_metrics);
674+
get_data(stream_publisher_metrics, true, _, _) ->
675+
try
676+
ets:tab2list(rabbit_stream_publisher_created)
677+
catch error:badarg ->
678+
%% rabbitmq_stream plugin is not enabled
679+
[]
680+
end;
681+
get_data(stream_consumer_metrics, true, _, _) ->
682+
try
683+
ets:tab2list(rabbit_stream_consumer_created)
684+
catch error:badarg ->
685+
%% rabbitmq_stream plugin is not enabled
686+
[]
687+
end;
625688
get_data(vhost_status, _, _, _) ->
626689
[ { #{<<"vhost">> => VHost},
627690
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
@@ -699,7 +762,11 @@ accumulate_count_and_sum(Value, {Count, Sum}) ->
699762

700763
empty(T) when T == channel_queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count ->
701764
{T, 0};
702-
empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics ->
765+
empty(T) when T == connection_coarse_metrics;
766+
T == stream_publisher_metrics;
767+
T == stream_consumer_metrics;
768+
T == auth_attempt_metrics;
769+
T == auth_attempt_detailed_metrics ->
703770
{T, 0, 0, 0};
704771
empty(T) when T == channel_exchange_metrics; T == queue_coarse_metrics; T == connection_metrics ->
705772
{T, 0, 0, 0, 0};

0 commit comments

Comments
 (0)