Skip to content

Commit b4cd0a1

Browse files
markusgustmichaelklishin
authored andcommitted
Expose max offset lag of stream consumers via Prometheus
Supports both per stream (detailed) and aggregated (metrics) values. (cherry picked from commit 00daaf4)
1 parent 83cd955 commit b4cd0a1

File tree

1 file changed

+32
-0
lines changed

1 file changed

+32
-0
lines changed

deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,11 @@
200200
{4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"}
201201
]},
202202

203+
%% the family name for this metric is stream_consumer_metrics but the real table used for data is rabbit_stream_consumer_created.
204+
{stream_consumer_metrics, [
205+
{2, undefined, stream_consumer_max_offset_lag, gauge, "Current maximum of offset lag of consumers"}
206+
]},
207+
203208
{connection_metrics, [
204209
{2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt},
205210
{2, undefined, connection_outgoing_packets_total, counter, "Total number of packets sent on a connection", send_cnt},
@@ -578,6 +583,17 @@ get_data(channel_metrics = Table, false, _) ->
578583
[{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3},
579584
{messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6},
580585
{global_prefetch_count, A7}]}];
586+
get_data(stream_consumer_metrics = MF, false, _) ->
587+
Table = rabbit_stream_consumer_created, %% real table name
588+
try ets:foldl(fun({_, Props}, OldMax) ->
589+
erlang:max(proplists:get_value(offset_lag, Props, 0), OldMax)
590+
end, 0, Table) of
591+
MaxOffsetLag ->
592+
[{MF, MaxOffsetLag}]
593+
catch error:badarg ->
594+
%% rabbitmq_stream plugin is not enabled
595+
[]
596+
end;
581597
get_data(queue_consumer_count = MF, false, VHostsFilter) ->
582598
Table = queue_metrics, %% Real table name
583599
{_, A1} = ets:foldl(fun
@@ -708,6 +724,22 @@ get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics
708724
end, [], Table);
709725
get_data(queue_consumer_count, true, _) ->
710726
ets:tab2list(queue_metrics);
727+
get_data(stream_consumer_metrics, true, _) ->
728+
Table = rabbit_stream_consumer_created, %% real table name
729+
try ets:foldl(fun({{QueueName, _Pid, _SubId}, Props}, Map0) ->
730+
Value = proplists:get_value(offset_lag, Props, 0),
731+
maps:update_with(
732+
QueueName,
733+
fun(OldMax) -> erlang:max(Value, OldMax) end,
734+
Value,
735+
Map0)
736+
end, #{}, Table) of
737+
Map1 ->
738+
maps:to_list(Map1)
739+
catch error:badarg ->
740+
%% rabbitmq_stream plugin is not enabled
741+
[]
742+
end;
711743
get_data(vhost_status, _, _) ->
712744
[ { #{<<"vhost">> => VHost},
713745
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of

0 commit comments

Comments
 (0)