|
11 | 11 | -include_lib("common_test/include/ct.hrl"). |
12 | 12 | -include_lib("eunit/include/eunit.hrl"). |
13 | 13 | -include_lib("rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl"). |
| 14 | +-include_lib("amqp10_common/include/amqp10_framing.hrl"). |
14 | 15 |
|
15 | 16 | -compile(export_all). |
16 | 17 |
|
@@ -70,7 +71,8 @@ groups() -> |
70 | 71 | queue_consumer_count_and_queue_metrics_mutually_exclusive_test, |
71 | 72 | vhost_status_metric, |
72 | 73 | exchange_bindings_metric, |
73 | | - exchange_names_metric |
| 74 | + exchange_names_metric, |
| 75 | + stream_pub_sub_metrics |
74 | 76 | ]}, |
75 | 77 | {special_chars, [], [core_metrics_special_chars]}, |
76 | 78 | {authentication, [], [basic_auth]} |
@@ -739,6 +741,104 @@ exchange_names_metric(Config) -> |
739 | 741 | }, Names), |
740 | 742 | ok. |
741 | 743 |
|
| 744 | +stream_pub_sub_metrics(Config) -> |
| 745 | + Stream = atom_to_binary(?FUNCTION_NAME), |
| 746 | + publish_via_stream_protocol(Stream, Config), |
| 747 | + |
| 748 | + %% wait for the stream to emit stats and the counters to become non-zero |
| 749 | + %% (collect_statistics_interval set to 100ms in this test group) |
| 750 | + rabbit_ct_helpers:await_condition( |
| 751 | + fun() -> |
| 752 | + {_, Body0} = http_get_with_pal(Config, "/metrics", [], 200), |
| 753 | + #{rabbitmq_stream_publisher_published_total := #{undefined := [P]}} |
| 754 | + = parse_response(Body0), |
| 755 | + P > 0 |
| 756 | + end, |
| 757 | + 100), |
| 758 | + |
| 759 | + %% aggregated metrics |
| 760 | + {_, Body1} = http_get_with_pal(Config, "/metrics", [], 200), |
| 761 | + ParsedBody1 = parse_response(Body1), |
| 762 | + ?assertEqual(#{rabbitmq_stream_publisher_published_total => #{undefined => [5]}, |
| 763 | + rabbitmq_stream_publisher_confirmed_total => #{undefined => [5]}, |
| 764 | + rabbitmq_stream_publisher_error_messages => #{undefined => [0]}, |
| 765 | + rabbitmq_stream_consumer_consumed_total => #{undefined => [2]}, |
| 766 | + rabbitmq_stream_consumer_offset => #{undefined => [0]}, |
| 767 | + rabbitmq_stream_consumer_offset_lag => #{undefined => [2]} |
| 768 | + }, |
| 769 | + maps:with([rabbitmq_stream_publisher_published_total, |
| 770 | + rabbitmq_stream_publisher_confirmed_total, |
| 771 | + rabbitmq_stream_publisher_error_messages, |
| 772 | + rabbitmq_stream_consumer_consumed_total, |
| 773 | + rabbitmq_stream_consumer_offset, |
| 774 | + rabbitmq_stream_consumer_offset_lag], |
| 775 | + ParsedBody1)), |
| 776 | + |
| 777 | + %% per-object metrics |
| 778 | + {_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=stream_publisher_metrics", |
| 779 | + [], 200), |
| 780 | + ParsedBody2 = parse_response(Body2), |
| 781 | + |
| 782 | + #{rabbitmq_detailed_stream_publisher_published_total := Published} = ParsedBody2, |
| 783 | + ?assertMatch([{#{vhost := "/", |
| 784 | + queue := "stream_pub_sub_metrics", |
| 785 | + publisher_id := "98", |
| 786 | + connection := _}, [2]}, |
| 787 | + {#{vhost := "/", |
| 788 | + queue := "stream_pub_sub_metrics", |
| 789 | + publisher_id := "99", |
| 790 | + connection := _}, [3]}], |
| 791 | + lists:sort(maps:to_list(Published))), |
| 792 | + |
| 793 | + #{rabbitmq_detailed_stream_publisher_confirmed_total := PubConfirmed} = ParsedBody2, |
| 794 | + ?assertMatch([{#{vhost := "/", |
| 795 | + queue := "stream_pub_sub_metrics", |
| 796 | + publisher_id := "98", |
| 797 | + connection := _}, [2]}, |
| 798 | + {#{vhost := "/", |
| 799 | + queue := "stream_pub_sub_metrics", |
| 800 | + publisher_id := "99", |
| 801 | + connection := _}, [3]}], |
| 802 | + lists:sort(maps:to_list(PubConfirmed))), |
| 803 | + |
| 804 | + #{rabbitmq_detailed_stream_publisher_error_messages := PubError} = ParsedBody2, |
| 805 | + ?assertMatch([{#{vhost := "/", |
| 806 | + queue := "stream_pub_sub_metrics", |
| 807 | + publisher_id := "98", |
| 808 | + connection := _}, [0]}, |
| 809 | + {#{vhost := "/", |
| 810 | + queue := "stream_pub_sub_metrics", |
| 811 | + publisher_id := "99", |
| 812 | + connection := _}, [0]}], |
| 813 | + lists:sort(maps:to_list(PubError))), |
| 814 | + |
| 815 | + {_, Body3} = http_get_with_pal(Config, "/metrics/detailed?family=stream_consumer_metrics", |
| 816 | + [], 200), |
| 817 | + ParsedBody3 = parse_response(Body3), |
| 818 | + |
| 819 | + #{rabbitmq_detailed_stream_consumer_consumed_total := Consumed} = ParsedBody3, |
| 820 | + ?assertMatch([{#{vhost := "/", |
| 821 | + queue := "stream_pub_sub_metrics", |
| 822 | + subscription_id := "97", |
| 823 | + connection := _}, [2]}], |
| 824 | + lists:sort(maps:to_list(Consumed))), |
| 825 | + |
| 826 | + #{rabbitmq_detailed_stream_consumer_offset := Offset} = ParsedBody3, |
| 827 | + ?assertMatch([{#{vhost := "/", |
| 828 | + queue := "stream_pub_sub_metrics", |
| 829 | + subscription_id := "97", |
| 830 | + connection := _}, [0]}], |
| 831 | + lists:sort(maps:to_list(Offset))), |
| 832 | + |
| 833 | + #{rabbitmq_detailed_stream_consumer_offset_lag := OffsetLag} = ParsedBody3, |
| 834 | + ?assertMatch([{#{vhost := "/", |
| 835 | + queue := "stream_pub_sub_metrics", |
| 836 | + subscription_id := "97", |
| 837 | + connection := _}, [2]}], |
| 838 | + lists:sort(maps:to_list(OffsetLag))), |
| 839 | + |
| 840 | + ok. |
| 841 | + |
742 | 842 | core_metrics_special_chars(Config) -> |
743 | 843 | {_, Body1} = http_get_with_pal(Config, "/metrics/detailed?family=queue_coarse_metrics", [], 200), |
744 | 844 | ?assertMatch(#{rabbitmq_detailed_queue_messages := |
@@ -784,6 +884,28 @@ basic_auth(Config) -> |
784 | 884 | rabbit_ct_broker_helpers:delete_user(Config, <<"monitor">>), |
785 | 885 | rabbit_ct_broker_helpers:delete_user(Config, <<"management">>). |
786 | 886 |
|
| 887 | +%% ------------------------------------------------------------------- |
| 888 | +%% Helpers |
| 889 | +%% ------------------------------------------------------------------- |
| 890 | + |
| 891 | +publish_via_stream_protocol(Stream, Config) -> |
| 892 | + {ok, S, C0} = stream_test_utils:connect(Config, 0), |
| 893 | + {ok, C1} = stream_test_utils:create_stream(S, C0, Stream), |
| 894 | + PublisherId = 98, |
| 895 | + {ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId), |
| 896 | + Payloads = [<<"m1">>, <<"m2">>], |
| 897 | + {ok, C3} = stream_test_utils:publish(S, C2, PublisherId, _SequenceFrom0 = 1, Payloads), |
| 898 | + |
| 899 | + PublisherId2 = 99, |
| 900 | + {ok, C4} = stream_test_utils:declare_publisher(S, C3, Stream, PublisherId2), |
| 901 | + Payloads2 = [<<"m3">>, <<"m4">>, <<"m5">>], |
| 902 | + {ok, C5} = stream_test_utils:publish(S, C4, PublisherId2, _SequenceFrom1 = 3, Payloads2), |
| 903 | + |
| 904 | + SubscriptionId = 97, |
| 905 | + {ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 1), |
| 906 | + %% delivery of <<"m1">> and <<"m2">> |
| 907 | + {{deliver, SubscriptionId, _Bin1}, _C7} = stream_test_utils:receive_stream_commands(S, C6), |
| 908 | + ok. |
787 | 909 |
|
788 | 910 | http_get(Config, ReqHeaders, CodeExp) -> |
789 | 911 | Path = proplists:get_value(prometheus_path, Config, "/metrics"), |
|
0 commit comments