159159 {2 , undefined , queue_messages_paged_out_bytes , gauge , " Size in bytes of messages paged out to disk" , message_bytes_paged_out },
160160 {2 , undefined , queue_head_message_timestamp , gauge , " Timestamp of the first message in the queue, if any" , head_message_timestamp },
161161 {2 , undefined , queue_disk_reads_total , counter , " Total number of times queue read messages from disk" , disk_reads },
162- {2 , undefined , queue_disk_writes_total , counter , " Total number of times queue wrote messages to disk" , disk_writes }
162+ {2 , undefined , queue_disk_writes_total , counter , " Total number of times queue wrote messages to disk" , disk_writes },
163+ {2 , undefined , stream_segments , counter , " Total number of stream segment files" , segments }
163164 ]},
164165
165166% %% Metrics that contain reference to a channel. Some of them also have
@@ -537,7 +538,7 @@ get_data(queue_consumer_count = MF, false, VHostsFilter, QueuesFilter) ->
537538 end , empty (MF ), Table ),
538539 [{Table , [{consumers , A1 }]}];
539540get_data (queue_metrics = Table , false , VHostsFilter , QueuesFilter ) ->
540- {Table , A1 , A2 , A3 , A4 , A5 , A6 , A7 , A8 , A9 , A10 , A11 , A12 , A13 , A14 , A15 , A16 } =
541+ {Table , A1 , A2 , A3 , A4 , A5 , A6 , A7 , A8 , A9 , A10 , A11 , A12 , A13 , A14 , A15 , A16 , A17 } =
541542 ets :foldl (fun
542543 ({# resource {kind = queue , virtual_host = VHost }, _ , _ }, Acc ) when is_map (VHostsFilter ), map_get (VHost , VHostsFilter ) == false ->
543544 Acc ;
@@ -558,7 +559,7 @@ get_data(queue_metrics = Table, false, VHostsFilter, QueuesFilter) ->
558559 {messages_bytes_persistent , A9 }, {message_bytes , A10 },
559560 {message_bytes_ready , A11 }, {message_bytes_unacknowledged , A12 },
560561 {messages_paged_out , A13 }, {message_bytes_paged_out , A14 },
561- {disk_reads , A15 }, {disk_writes , A16 }]}];
562+ {disk_reads , A15 }, {disk_writes , A16 }, { segments , A17 } ]}];
562563get_data (Table , false , VHostsFilter , QueuesFilter ) when Table == channel_exchange_metrics ;
563564 Table == queue_coarse_metrics ;
564565 Table == channel_queue_metrics ;
@@ -667,7 +668,7 @@ get_data(Table, _, _, _) ->
667668
668669
669670sum_queue_metrics (Props , {T , A1 , A2 , A3 , A4 , A5 , A6 , A7 , A8 , A9 , A10 , A11 ,
670- A12 , A13 , A14 , A15 , A16 }) ->
671+ A12 , A13 , A14 , A15 , A16 , A17 }) ->
671672 {T ,
672673 sum (proplists :get_value (consumers , Props ), A1 ),
673674 sum (proplists :get_value (consumer_utilisation , Props ), A2 ),
@@ -684,7 +685,8 @@ sum_queue_metrics(Props, {T, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11,
684685 sum (proplists :get_value (messages_paged_out , Props ), A13 ),
685686 sum (proplists :get_value (message_bytes_paged_out , Props ), A14 ),
686687 sum (proplists :get_value (disk_reads , Props ), A15 ),
687- sum (proplists :get_value (disk_writes , Props ), A16 )
688+ sum (proplists :get_value (disk_writes , Props ), A16 ),
689+ sum (proplists :get_value (segments , Props ), A17 )
688690 }.
689691
690692division (0 , 0 ) ->
@@ -706,7 +708,7 @@ empty(T) when T == ra_metrics ->
706708empty (T ) when T == channel_queue_metrics ; T == channel_metrics ->
707709 {T , 0 , 0 , 0 , 0 , 0 , 0 , 0 };
708710empty (queue_metrics = T ) ->
709- {T , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 }.
711+ {T , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 }.
710712
711713sum (undefined , B ) ->
712714 B ;
0 commit comments