|
11 | 11 | -include_lib("common_test/include/ct.hrl"). |
12 | 12 | -include_lib("eunit/include/eunit.hrl"). |
13 | 13 | -include_lib("rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl"). |
| 14 | +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). |
14 | 15 |
|
15 | | --compile(export_all). |
| 16 | +-compile([export_all, nowarn_export_all]). |
16 | 17 |
|
17 | 18 | all() -> |
18 | 19 | [ |
@@ -68,7 +69,8 @@ groups() -> |
68 | 69 | queue_consumer_count_and_queue_metrics_mutually_exclusive_test, |
69 | 70 | vhost_status_metric, |
70 | 71 | exchange_bindings_metric, |
71 | | - exchange_names_metric |
| 72 | + exchange_names_metric, |
| 73 | + stream_pub_sub_metrics |
72 | 74 | ]}, |
73 | 75 | {special_chars, [], [core_metrics_special_chars]}, |
74 | 76 | {authentication, [], [basic_auth]} |
@@ -708,6 +710,37 @@ exchange_names_metric(Config) -> |
708 | 710 | }, Names), |
709 | 711 | ok. |
710 | 712 |
|
| 713 | +stream_pub_sub_metrics(Config) -> |
| 714 | + Stream1 = atom_to_list(?FUNCTION_NAME) ++ "1", |
| 715 | + MsgPerBatch1 = 2, |
| 716 | + publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config), |
| 717 | + Stream2 = atom_to_list(?FUNCTION_NAME) ++ "2", |
| 718 | + MsgPerBatch2 = 3, |
| 719 | + publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config), |
| 720 | + |
| 721 | + %% aggregated metrics |
| 722 | + |
| 723 | + %% wait for the stream to emit stats |
| 724 | + %% (collect_statistics_interval set to 100ms in this test group) |
| 725 | + ?awaitMatch(V when V == #{rabbitmq_stream_consumer_max_offset_lag => #{undefined => [3]}}, |
| 726 | + begin |
| 727 | + {_, Body1} = http_get_with_pal(Config, "/metrics", [], 200), |
| 728 | + maps:with([rabbitmq_stream_consumer_max_offset_lag], |
| 729 | + parse_response(Body1)) |
| 730 | + end, |
| 731 | + 100), |
| 732 | + |
| 733 | + %% per-object metrics |
| 734 | + {_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=stream_consumer_metrics", |
| 735 | + [], 200), |
| 736 | + ParsedBody2 = parse_response(Body2), |
| 737 | + #{rabbitmq_detailed_stream_consumer_max_offset_lag := MaxOffsetLag} = ParsedBody2, |
| 738 | + |
| 739 | + ?assertEqual([{#{vhost => "/", queue => Stream1}, [2]}, |
| 740 | + {#{vhost => "/", queue => Stream2}, [3]}], |
| 741 | + lists:sort(maps:to_list(MaxOffsetLag))), |
| 742 | + ok. |
| 743 | + |
711 | 744 | core_metrics_special_chars(Config) -> |
712 | 745 | {_, Body1} = http_get_with_pal(Config, "/metrics/detailed?family=queue_coarse_metrics", [], 200), |
713 | 746 | ?assertMatch(#{rabbitmq_detailed_queue_messages := |
@@ -753,6 +786,30 @@ basic_auth(Config) -> |
753 | 786 | rabbit_ct_broker_helpers:delete_user(Config, <<"monitor">>), |
754 | 787 | rabbit_ct_broker_helpers:delete_user(Config, <<"management">>). |
755 | 788 |
|
| 789 | +%% ------------------------------------------------------------------- |
| 790 | +%% Helpers |
| 791 | +%% ------------------------------------------------------------------- |
| 792 | + |
| 793 | +publish_via_stream_protocol(Stream, MsgPerBatch, Config) -> |
| 794 | + {ok, S, C0} = stream_test_utils:connect(Config, 0), |
| 795 | + {ok, C1} = stream_test_utils:create_stream(S, C0, Stream), |
| 796 | + PublisherId = 98, |
| 797 | + {ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId), |
| 798 | + Payloads = lists:duplicate(MsgPerBatch, <<"m1">>), |
| 799 | + SequenceFrom1 = 1, |
| 800 | + {ok, C3} = stream_test_utils:publish(S, C2, PublisherId, SequenceFrom1, Payloads), |
| 801 | + |
| 802 | + PublisherId2 = 99, |
| 803 | + {ok, C4} = stream_test_utils:declare_publisher(S, C3, Stream, PublisherId2), |
| 804 | + Payloads2 = lists:duplicate(MsgPerBatch, <<"m2">>), |
| 805 | + SequenceFrom2 = SequenceFrom1 + MsgPerBatch, |
| 806 | + {ok, C5} = stream_test_utils:publish(S, C4, PublisherId2, SequenceFrom2, Payloads2), |
| 807 | + |
| 808 | + SubscriptionId = 97, |
| 809 | + {ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 1), |
| 810 | + %% delivery of first batch of messages |
| 811 | + {{deliver, SubscriptionId, _Bin1}, _C7} = stream_test_utils:receive_stream_commands(S, C6), |
| 812 | + ok. |
756 | 813 |
|
757 | 814 | http_get(Config, ReqHeaders, CodeExp) -> |
758 | 815 | Path = proplists:get_value(prometheus_path, Config, "/metrics"), |
|
0 commit comments