Skip to content

Commit 051c917

Browse files
markusgustgomoripeti
authored andcommitted
Expose Stream Connection metrics via Prometheus
Supports both per connection (detailed) and aggregated total (metrics) values.
1 parent 3cd832b commit 051c917

File tree

1 file changed

+50
-1
lines changed

1 file changed

+50
-1
lines changed

deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,20 @@
203203
{4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"}
204204
]},
205205

206+
%% the family name for these metrics is stream_publisher_metrics but the real table used for data is rabbit_stream_publisher_created.
207+
{stream_publisher_metrics, [
208+
{2, undefined, stream_publisher_published_total, counter, "Total number of messages published", published},
209+
{2, undefined, stream_publisher_confirmed_total, counter, "Total number of messages confirmed", confirmed},
210+
{2, undefined, stream_publisher_error_messages, counter, "Total number of messages errored", errored}
211+
]},
212+
213+
%% the family name for these metrics is stream_consumer_metrics but the real table used for data is rabbit_stream_consumer_created.
214+
{stream_consumer_metrics, [
215+
{2, undefined, stream_consumer_consumed_total, counter, "Total number of messages consumed", consumed},
216+
{2, undefined, stream_consumer_offset, counter, "Total offset", offset},
217+
{2, undefined, stream_consumer_offset_lag, counter, "Total offset lag", offset_lag}
218+
]},
219+
206220
{connection_metrics, [
207221
{2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt},
208222
{2, undefined, connection_outgoing_packets_total, counter, "Total number of packets sent on a connection", send_cnt},
@@ -417,6 +431,11 @@ label(M) when is_map(M) ->
417431
label(#resource{virtual_host = VHost, kind = exchange, name = Name}) ->
418432
<<"vhost=\"", (escape_label_value(VHost))/binary, "\",",
419433
"exchange=\"", (escape_label_value(Name))/binary, "\"">>;
434+
435+
label({#resource{virtual_host = VHost, kind = queue, name = Name}, P, _}) when is_pid(P) ->
436+
<<"vhost=\"", (escape_label_value(VHost))/binary, "\",",
437+
"queue=\"", (escape_label_value(Name))/binary, "\",",
438+
"channel=\"", (iolist_to_binary(pid_to_list(P)))/binary, "\"">>;
420439
label(#resource{virtual_host = VHost, kind = queue, name = Name}) ->
421440
<<"vhost=\"", (escape_label_value(VHost))/binary, "\",",
422441
"queue=\"", (escape_label_value(Name))/binary, "\"">>;
@@ -516,6 +535,28 @@ get_data(channel_metrics = Table, false, _, _) ->
516535
[{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3},
517536
{messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6},
518537
{global_prefetch_count, A7}]}];
538+
get_data(stream_publisher_metrics = Table, false, _, _) ->
539+
RealTable = rabbit_stream_publisher_created, %% real table name
540+
{Table, A1, A2, A3} =
541+
ets:foldl(fun({_, Props}, {T, A1, A2, A3}) ->
542+
{T,
543+
sum(proplists:get_value(published, Props), A1),
544+
sum(proplists:get_value(confirmed, Props), A2),
545+
sum(proplists:get_value(errored, Props), A3)
546+
}
547+
end, empty(Table), RealTable),
548+
[{Table, [{published, A1}, {confirmed, A2}, {errored, A3}]}];
549+
get_data(stream_consumer_metrics = Table, false, _, _) ->
550+
RealTable = rabbit_stream_consumer_created, %% real table name
551+
{Table, A1, A2, A3} =
552+
ets:foldl(fun({_, Props}, {T, A1, A2, A3}) ->
553+
{T,
554+
sum(proplists:get_value(consumed, Props), A1),
555+
sum(proplists:get_value(offset, Props), A2),
556+
sum(proplists:get_value(offset_lag, Props), A3)
557+
}
558+
end, empty(Table), RealTable),
559+
[{Table, [{consumed, A1}, {offset, A2}, {offset_lag, A3}]}];
519560
get_data(queue_consumer_count = MF, false, VHostsFilter, QueuesFilter) ->
520561
Table = queue_metrics, %% Real table name
521562
{_, A1} = ets:foldl(fun
@@ -622,6 +663,10 @@ get_data(MF, true, VHostsFilter, _) when is_map(VHostsFilter), MF == queue_metri
622663
end, [], Table);
623664
get_data(queue_consumer_count, true, _, _) ->
624665
ets:tab2list(queue_metrics);
666+
get_data(stream_publisher_metrics, true, _, _) ->
667+
ets:tab2list(rabbit_stream_publisher_created);
668+
get_data(stream_consumer_metrics, true, _, _) ->
669+
ets:tab2list(rabbit_stream_consumer_created);
625670
get_data(vhost_status, _, _, _) ->
626671
[ { #{<<"vhost">> => VHost},
627672
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
@@ -698,7 +743,11 @@ accumulate_count_and_sum(Value, {Count, Sum}) ->
698743

699744
empty(T) when T == channel_queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count ->
700745
{T, 0};
701-
empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics ->
746+
empty(T) when T == connection_coarse_metrics;
747+
T == stream_publisher_metrics;
748+
T == stream_consumer_metrics;
749+
T == auth_attempt_metrics;
750+
T == auth_attempt_detailed_metrics ->
702751
{T, 0, 0, 0};
703752
empty(T) when T == channel_exchange_metrics; T == queue_coarse_metrics; T == connection_metrics ->
704753
{T, 0, 0, 0, 0};

0 commit comments

Comments
 (0)