160160 {2 , undefined , queue_messages_paged_out_bytes , gauge , " Size in bytes of messages paged out to disk" , message_bytes_paged_out },
161161 {2 , undefined , queue_head_message_timestamp , gauge , " Timestamp of the first message in the queue, if any" , head_message_timestamp },
162162 {2 , undefined , queue_disk_reads_total , counter , " Total number of times queue read messages from disk" , disk_reads },
163- {2 , undefined , queue_disk_writes_total , counter , " Total number of times queue wrote messages to disk" , disk_writes }
163+ {2 , undefined , queue_disk_writes_total , counter , " Total number of times queue wrote messages to disk" , disk_writes },
164+ {2 , undefined , stream_segments , counter , " Total number of stream segment files" , segments }
164165 ]},
165166
166167% %% Metrics that contain reference to a channel. Some of them also have
@@ -538,7 +539,7 @@ get_data(queue_consumer_count = MF, false, VHostsFilter, QueuesFilter) ->
538539 end , empty (MF ), Table ),
539540 [{Table , [{consumers , A1 }]}];
540541get_data (queue_metrics = Table , false , VHostsFilter , QueuesFilter ) ->
541- {Table , A1 , A2 , A3 , A4 , A5 , A6 , A7 , A8 , A9 , A10 , A11 , A12 , A13 , A14 , A15 , A16 } =
542+ {Table , A1 , A2 , A3 , A4 , A5 , A6 , A7 , A8 , A9 , A10 , A11 , A12 , A13 , A14 , A15 , A16 , A17 } =
542543 ets :foldl (fun
543544 ({# resource {kind = queue , virtual_host = VHost }, _ , _ }, Acc ) when is_map (VHostsFilter ), map_get (VHost , VHostsFilter ) == false ->
544545 Acc ;
@@ -559,7 +560,7 @@ get_data(queue_metrics = Table, false, VHostsFilter, QueuesFilter) ->
559560 {messages_bytes_persistent , A9 }, {message_bytes , A10 },
560561 {message_bytes_ready , A11 }, {message_bytes_unacknowledged , A12 },
561562 {messages_paged_out , A13 }, {message_bytes_paged_out , A14 },
562- {disk_reads , A15 }, {disk_writes , A16 }]}];
563+ {disk_reads , A15 }, {disk_writes , A16 }, { segments , A17 } ]}];
563564get_data (Table , false , VHostsFilter , QueuesFilter ) when Table == channel_exchange_metrics ;
564565 Table == queue_coarse_metrics ;
565566 Table == channel_queue_metrics ;
@@ -668,7 +669,7 @@ get_data(Table, _, _, _) ->
668669
669670
670671sum_queue_metrics (Props , {T , A1 , A2 , A3 , A4 , A5 , A6 , A7 , A8 , A9 , A10 , A11 ,
671- A12 , A13 , A14 , A15 , A16 }) ->
672+ A12 , A13 , A14 , A15 , A16 , A17 }) ->
672673 {T ,
673674 sum (proplists :get_value (consumers , Props ), A1 ),
674675 sum (proplists :get_value (consumer_utilisation , Props ), A2 ),
@@ -685,7 +686,8 @@ sum_queue_metrics(Props, {T, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11,
685686 sum (proplists :get_value (messages_paged_out , Props ), A13 ),
686687 sum (proplists :get_value (message_bytes_paged_out , Props ), A14 ),
687688 sum (proplists :get_value (disk_reads , Props ), A15 ),
688- sum (proplists :get_value (disk_writes , Props ), A16 )
689+ sum (proplists :get_value (disk_writes , Props ), A16 ),
690+ sum (proplists :get_value (segments , Props ), A17 )
689691 }.
690692
691693division (0 , 0 ) ->
@@ -707,7 +709,7 @@ empty(T) when T == ra_metrics ->
707709empty (T ) when T == channel_queue_metrics ; T == channel_metrics ->
708710 {T , 0 , 0 , 0 , 0 , 0 , 0 , 0 };
709711empty (queue_metrics = T ) ->
710- {T , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 }.
712+ {T , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 }.
711713
712714sum (undefined , B ) ->
713715 B ;
0 commit comments