|
203 | 203 | {4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"} |
204 | 204 | ]}, |
205 | 205 |
|
| 206 | + %% the family name for these metrics is stream_publisher_metrics but the real table used for data is rabbit_stream_publisher_created. |
| 207 | + {stream_publisher_metrics, [ |
| 208 | + {2, undefined, stream_publisher_published_total, counter, "Total number of messages published", published}, |
| 209 | + {2, undefined, stream_publisher_confirmed_total, counter, "Total number of messages confirmed", confirmed}, |
| 210 | + {2, undefined, stream_publisher_error_messages, counter, "Total number of messages errored", errored} |
| 211 | + ]}, |
| 212 | + |
| 213 | + %% the family name for these metrics is stream_consumer_metrics but the real table used for data is rabbit_stream_consumer_created. |
| 214 | + {stream_consumer_metrics, [ |
| 215 | + {2, undefined, stream_consumer_consumed_total, counter, "Total number of messages consumed", consumed}, |
| 216 | + {2, undefined, stream_consumer_offset, counter, "Total offset", offset}, |
| 217 | + {2, undefined, stream_consumer_offset_lag, counter, "Total offset lag", offset_lag} |
| 218 | + ]}, |
| 219 | + |
206 | 220 | {connection_metrics, [ |
207 | 221 | {2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt}, |
208 | 222 | {2, undefined, connection_outgoing_packets_total, counter, "Total number of packets sent on a connection", send_cnt}, |
@@ -417,6 +431,11 @@ label(M) when is_map(M) -> |
417 | 431 | label(#resource{virtual_host = VHost, kind = exchange, name = Name}) -> |
418 | 432 | <<"vhost=\"", (escape_label_value(VHost))/binary, "\",", |
419 | 433 | "exchange=\"", (escape_label_value(Name))/binary, "\"">>; |
| 434 | + |
| 435 | +label({#resource{virtual_host = VHost, kind = queue, name = Name}, P, _}) when is_pid(P) -> |
| 436 | + <<"vhost=\"", (escape_label_value(VHost))/binary, "\",", |
| 437 | + "queue=\"", (escape_label_value(Name))/binary, "\",", |
| 438 | + "channel=\"", (iolist_to_binary(pid_to_list(P)))/binary, "\"">>; |
420 | 439 | label(#resource{virtual_host = VHost, kind = queue, name = Name}) -> |
421 | 440 | <<"vhost=\"", (escape_label_value(VHost))/binary, "\",", |
422 | 441 | "queue=\"", (escape_label_value(Name))/binary, "\"">>; |
@@ -516,6 +535,36 @@ get_data(channel_metrics = Table, false, _) -> |
516 | 535 | [{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3}, |
517 | 536 | {messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6}, |
518 | 537 | {global_prefetch_count, A7}]}]; |
| 538 | +get_data(stream_publisher_metrics = Table, false, _) -> |
| 539 | + RealTable = rabbit_stream_publisher_created, %% real table name |
| 540 | + try ets:foldl(fun({_, Props}, {T, A1, A2, A3}) -> |
| 541 | + {T, |
| 542 | + sum(proplists:get_value(published, Props), A1), |
| 543 | + sum(proplists:get_value(confirmed, Props), A2), |
| 544 | + sum(proplists:get_value(errored, Props), A3) |
| 545 | + } |
| 546 | + end, empty(Table), RealTable) of |
| 547 | + {Table, A1, A2, A3} -> |
| 548 | + [{Table, [{published, A1}, {confirmed, A2}, {errored, A3}]}] |
| 549 | + catch error:badarg -> |
| 550 | + %% rabbitmq_stream plugin is not enabled |
| 551 | + [] |
| 552 | + end; |
| 553 | +get_data(stream_consumer_metrics = Table, false, _) -> |
| 554 | + RealTable = rabbit_stream_consumer_created, %% real table name |
| 555 | + try ets:foldl(fun({_, Props}, {T, A1, A2, A3}) -> |
| 556 | + {T, |
| 557 | + sum(proplists:get_value(consumed, Props), A1), |
| 558 | + sum(proplists:get_value(offset, Props), A2), |
| 559 | + sum(proplists:get_value(offset_lag, Props), A3) |
| 560 | + } |
| 561 | + end, empty(Table), RealTable) of |
| 562 | + {Table, A1, A2, A3} -> |
| 563 | + [{Table, [{consumed, A1}, {offset, A2}, {offset_lag, A3}]}] |
| 564 | + catch error:badarg -> |
| 565 | + %% rabbitmq_stream plugin is not enabled |
| 566 | + [] |
| 567 | + end; |
519 | 568 | get_data(queue_consumer_count = MF, false, VHostsFilter) -> |
520 | 569 | Table = queue_metrics, %% Real table name |
521 | 570 | {_, A1} = ets:foldl(fun |
@@ -596,6 +645,20 @@ get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics |
596 | 645 | end, [], Table); |
597 | 646 | get_data(queue_consumer_count, true, _) -> |
598 | 647 | ets:tab2list(queue_metrics); |
| 648 | +get_data(stream_publisher_metrics, true, _) -> |
| 649 | + try |
| 650 | + ets:tab2list(rabbit_stream_publisher_created) |
| 651 | + catch error:badarg -> |
| 652 | + %% rabbitmq_stream plugin is not enabled |
| 653 | + [] |
| 654 | + end; |
| 655 | +get_data(stream_consumer_metrics, true, _) -> |
| 656 | + try |
| 657 | + ets:tab2list(rabbit_stream_consumer_created) |
| 658 | + catch error:badarg -> |
| 659 | + %% rabbitmq_stream plugin is not enabled |
| 660 | + [] |
| 661 | + end; |
599 | 662 | get_data(vhost_status, _, _) -> |
600 | 663 | [ { #{<<"vhost">> => VHost}, |
601 | 664 | case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of |
@@ -673,7 +736,11 @@ accumulate_count_and_sum(Value, {Count, Sum}) -> |
673 | 736 |
|
674 | 737 | empty(T) when T == channel_queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count -> |
675 | 738 | {T, 0}; |
676 | | -empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics -> |
| 739 | +empty(T) when T == connection_coarse_metrics; |
| 740 | + T == stream_publisher_metrics; |
| 741 | + T == stream_consumer_metrics; |
| 742 | + T == auth_attempt_metrics; |
| 743 | + T == auth_attempt_detailed_metrics -> |
677 | 744 | {T, 0, 0, 0}; |
678 | 745 | empty(T) when T == channel_exchange_metrics; T == queue_coarse_metrics; T == connection_metrics -> |
679 | 746 | {T, 0, 0, 0, 0}; |
|
0 commit comments