Skip to content

Commit d50dd43

Browse files
committed
expose Stream Connection metrics through prometheus. supports both per connection (detailed) and aggregated total (metrics).
1 parent 8f1be1a commit d50dd43

File tree

1 file changed

+45
-1
lines changed

1 file changed

+45
-1
lines changed

deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,20 @@
203203
{4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"}
204204
]},
205205

206+
%% the family name for these metric is stream_publisher_metrics but the real table used for data is rabbit_stream_publisher_created.
207+
{stream_publisher_metrics, [
208+
{2, undefined, stream_publisher_published_total, counter, "Total number of messages published", published},
209+
{2, undefined, stream_publisher_confirmed_total, counter, "Total number of messages confirmed", confirmed},
210+
{2, undefined, stream_publisher_error_messages, counter, "Total number of messages errored", errored}
211+
]},
212+
213+
%% the family name for these metric is stream_consumer_metrics but the real table used for data is rabbit_stream_consumer_created.
214+
{stream_consumer_metrics, [
215+
{2, undefined, stream_consumer_consumed_total, counter, "Total number of messages consumed", consumed},
216+
{2, undefined, stream_consumer_offset, counter, "Total offset", offset},
217+
{2, undefined, stream_consumer_offset_lag, counter, "Total offset lag", offset_lag}
218+
]},
219+
206220
{connection_metrics, [
207221
{2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt},
208222
{2, undefined, connection_outgoing_packets_total, counter, "Total number of packets sent on a connection", send_cnt},
@@ -417,6 +431,11 @@ label(M) when is_map(M) ->
417431
label(#resource{virtual_host = VHost, kind = exchange, name = Name}) ->
418432
<<"vhost=\"", (escape_label_value(VHost))/binary, "\",",
419433
"exchange=\"", (escape_label_value(Name))/binary, "\"">>;
434+
435+
label({#resource{virtual_host = VHost, kind = queue, name = Name}, P, _}) when is_pid(P) ->
436+
<<"vhost=\"", (escape_label_value(VHost))/binary, "\",",
437+
"queue=\"", (escape_label_value(Name))/binary, "\",",
438+
"channel=\"", (iolist_to_binary(pid_to_list(P)))/binary, "\"">>;
420439
label(#resource{virtual_host = VHost, kind = queue, name = Name}) ->
421440
<<"vhost=\"", (escape_label_value(VHost))/binary, "\",",
422441
"queue=\"", (escape_label_value(Name))/binary, "\"">>;
@@ -516,6 +535,26 @@ get_data(channel_metrics = Table, false, _, _) ->
516535
[{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3},
517536
{messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6},
518537
{global_prefetch_count, A7}]}];
538+
get_data(stream_publisher_metrics = Table, false, _, _) ->
539+
RealTable = rabbit_stream_publisher_created, %% real table name
540+
{Table, A1, A2, A3} = ets:foldl(fun({_, Props}, {T, A1, A2, A3}) ->
541+
{T,
542+
sum(proplists:get_value(published, Props), A1),
543+
sum(proplists:get_value(confirmed, Props), A2),
544+
sum(proplists:get_value(errored, Props), A3)
545+
}
546+
end, empty(Table), RealTable),
547+
[{Table, [{published, A1}, {confirmed, A2}, {errored, A3}]}];
548+
get_data(stream_consumer_metrics = Table, false, _, _) ->
549+
RealTable = rabbit_stream_consumer_created, %% real table name
550+
{Table, A1, A2, A3} = ets:foldl(fun({_, Props}, {T, A1, A2, A3}) ->
551+
{T,
552+
sum(proplists:get_value(consumed, Props), A1),
553+
sum(proplists:get_value(offset, Props), A2),
554+
sum(proplists:get_value(offset_lag, Props), A3)
555+
}
556+
end, empty(Table), RealTable),
557+
[{Table, [{offset, A1}, {offset_lag, A2}, {consumed, A3}]}];
519558
get_data(queue_consumer_count = MF, false, VHostsFilter, QueuesFilter) ->
520559
Table = queue_metrics, %% Real table name
521560
{_, A1} = ets:foldl(fun
@@ -622,6 +661,10 @@ get_data(MF, true, VHostsFilter, _) when is_map(VHostsFilter), MF == queue_metri
622661
end, [], Table);
623662
get_data(queue_consumer_count, true, _, _) ->
624663
ets:tab2list(queue_metrics);
664+
get_data(stream_publisher_metrics, true, _, _) ->
665+
ets:tab2list(rabbit_stream_publisher_created);
666+
get_data(stream_consumer_metrics, true, _, _) ->
667+
ets:tab2list(rabbit_stream_consumer_created);
625668
get_data(vhost_status, _, _, _) ->
626669
[ { #{<<"vhost">> => VHost},
627670
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
@@ -698,7 +741,8 @@ accumulate_count_and_sum(Value, {Count, Sum}) ->
698741

699742
empty(T) when T == channel_queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count ->
700743
{T, 0};
701-
empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics ->
744+
empty(T) when T == connection_coarse_metrics; T == stream_publisher_metrics;
745+
T == stream_consumer_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics ->
702746
{T, 0, 0, 0};
703747
empty(T) when T == channel_exchange_metrics; T == queue_coarse_metrics; T == connection_metrics ->
704748
{T, 0, 0, 0, 0};

0 commit comments

Comments
 (0)