Skip to content

Commit 85dab98

Browse files
Merge pull request #12765 from rabbitmq/rabbitmq-server-10275
By @gomoripeti: Streams: two additional Prometheus metrics for connections
2 parents de90cfc + 4eb5b82 commit 85dab98

File tree

7 files changed

+253
-127
lines changed

7 files changed

+253
-127
lines changed

deps/amqp10_common/src/serial_number.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@
1515
diff/2,
1616
foldl/4]).
1717

18-
-ifdef(TEST).
18+
%% For tests.
1919
-export([usort/1]).
20-
-endif.
2120

2221
-type serial_number() :: sequence_no().
2322
-export_type([serial_number/0]).

deps/rabbitmq_ct_helpers/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
PROJECT = rabbitmq_ct_helpers
22
PROJECT_DESCRIPTION = Common Test helpers for RabbitMQ
33

4-
DEPS = rabbit_common proper inet_tcp_proxy meck
4+
DEPS = rabbit_common amqp10_common rabbitmq_stream_common proper inet_tcp_proxy meck
55
LOCAL_DEPS = common_test eunit inets
66
#TEST_DEPS = rabbit
77

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
%% There is no open source Erlang RabbitMQ Stream client.
9+
%% Therefore, we have to build the Stream protocol commands manually.
10+
11+
-module(stream_test_utils).
12+
13+
-compile([export_all, nowarn_export_all]).
14+
15+
-include_lib("amqp10_common/include/amqp10_framing.hrl").
16+
17+
-define(RESPONSE_CODE_OK, 1).
18+
19+
connect(Config, Node) ->
20+
StreamPort = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_stream),
21+
{ok, Sock} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]),
22+
23+
C0 = rabbit_stream_core:init(0),
24+
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}),
25+
ok = gen_tcp:send(Sock, PeerPropertiesFrame),
26+
{{response, 1, {peer_properties, _, _}}, C1} = receive_stream_commands(Sock, C0),
27+
28+
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 1, sasl_handshake})),
29+
{{response, _, {sasl_handshake, _, _}}, C2} = receive_stream_commands(Sock, C1),
30+
Username = <<"guest">>,
31+
Password = <<"guest">>,
32+
Null = 0,
33+
PlainSasl = <<Null:8, Username/binary, Null:8, Password/binary>>,
34+
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 2, {sasl_authenticate, <<"PLAIN">>, PlainSasl}})),
35+
{{response, 2, {sasl_authenticate, _}}, C3} = receive_stream_commands(Sock, C2),
36+
{{tune, DefaultFrameMax, _}, C4} = receive_stream_commands(Sock, C3),
37+
38+
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({response, 0, {tune, DefaultFrameMax, 0}})),
39+
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 3, {open, <<"/">>}})),
40+
{{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(Sock, C4),
41+
{ok, Sock, C5}.
42+
43+
create_stream(Sock, C0, Stream) ->
44+
CreateStreamFrame = rabbit_stream_core:frame({request, 1, {create_stream, Stream, #{}}}),
45+
ok = gen_tcp:send(Sock, CreateStreamFrame),
46+
{{response, 1, {create_stream, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
47+
{ok, C1}.
48+
49+
declare_publisher(Sock, C0, Stream, PublisherId) ->
50+
DeclarePublisherFrame = rabbit_stream_core:frame({request, 1, {declare_publisher, PublisherId, <<>>, Stream}}),
51+
ok = gen_tcp:send(Sock, DeclarePublisherFrame),
52+
{{response, 1, {declare_publisher, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
53+
{ok, C1}.
54+
55+
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) ->
56+
SubscribeFrame = rabbit_stream_core:frame({request, 1, {subscribe, SubscriptionId, Stream, _OffsetSpec = first, InitialCredit, _Props = #{}}}),
57+
ok = gen_tcp:send(Sock, SubscribeFrame),
58+
{{response, 1, {subscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
59+
{ok, C1}.
60+
61+
publish(Sock, C0, PublisherId, Sequence0, Payloads) ->
62+
SeqIds = lists:seq(Sequence0, Sequence0 + length(Payloads) - 1),
63+
Messages = [simple_entry(Seq, P)
64+
|| {Seq, P} <- lists:zip(SeqIds, Payloads)],
65+
{ok, SeqIds, C1} = publish_entries(Sock, C0, PublisherId, length(Messages), Messages),
66+
{ok, C1}.
67+
68+
publish_entries(Sock, C0, PublisherId, MsgCount, Messages) ->
69+
PublishFrame1 = rabbit_stream_core:frame({publish, PublisherId, MsgCount, Messages}),
70+
ok = gen_tcp:send(Sock, PublishFrame1),
71+
{{publish_confirm, PublisherId, SeqIds}, C1} = receive_stream_commands(Sock, C0),
72+
{ok, SeqIds, C1}.
73+
74+
%% Streams contain AMQP 1.0 encoded messages.
75+
%% In this case, the AMQP 1.0 encoded message contains a single data section.
76+
simple_entry(Sequence, Body)
77+
when is_binary(Body) ->
78+
DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
79+
DataSectSize = byte_size(DataSect),
80+
<<Sequence:64, 0:1, DataSectSize:31, DataSect:DataSectSize/binary>>.
81+
82+
%% Streams contain AMQP 1.0 encoded messages.
83+
%% In this case, the AMQP 1.0 encoded message consists of an application-properties section and a data section.
84+
simple_entry(Sequence, Body, AppProps)
85+
when is_binary(Body) ->
86+
AppPropsSect = iolist_to_binary(amqp10_framing:encode_bin(AppProps)),
87+
DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
88+
Sects = <<AppPropsSect/binary, DataSect/binary>>,
89+
SectSize = byte_size(Sects),
90+
<<Sequence:64, 0:1, SectSize:31, Sects:SectSize/binary>>.
91+
92+
%% Here, each AMQP 1.0 encoded message consists of an application-properties section and a data section.
93+
%% All data sections are delivered uncompressed in 1 batch.
94+
sub_batch_entry_uncompressed(Sequence, Bodies) ->
95+
Batch = lists:foldl(fun(Body, Acc) ->
96+
AppProps = #'v1_0.application_properties'{
97+
content = [{{utf8, <<"my key">>}, {utf8, <<"my value">>}}]},
98+
Sect0 = iolist_to_binary(amqp10_framing:encode_bin(AppProps)),
99+
Sect1 = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
100+
Sect = <<Sect0/binary, Sect1/binary>>,
101+
<<Acc/binary, 0:1, (byte_size(Sect)):31, Sect/binary>>
102+
end, <<>>, Bodies),
103+
Size = byte_size(Batch),
104+
<<Sequence:64, 1:1, 0:3, 0:4, (length(Bodies)):16, Size:32, Size:32, Batch:Size/binary>>.
105+
106+
%% Here, each AMQP 1.0 encoded message contains a single data section.
107+
%% All data sections are delivered in 1 gzip compressed batch.
108+
sub_batch_entry_compressed(Sequence, Bodies) ->
109+
Uncompressed = lists:foldl(fun(Body, Acc) ->
110+
Bin = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
111+
<<Acc/binary, Bin/binary>>
112+
end, <<>>, Bodies),
113+
Compressed = zlib:gzip(Uncompressed),
114+
CompressedLen = byte_size(Compressed),
115+
<<Sequence:64, 1:1, 1:3, 0:4, (length(Bodies)):16, (byte_size(Uncompressed)):32,
116+
CompressedLen:32, Compressed:CompressedLen/binary>>.
117+
118+
receive_stream_commands(Sock, C0) ->
119+
case rabbit_stream_core:next_command(C0) of
120+
empty ->
121+
case gen_tcp:recv(Sock, 0, 5000) of
122+
{ok, Data} ->
123+
C1 = rabbit_stream_core:incoming_data(Data, C0),
124+
case rabbit_stream_core:next_command(C1) of
125+
empty ->
126+
{ok, Data2} = gen_tcp:recv(Sock, 0, 5000),
127+
rabbit_stream_core:next_command(
128+
rabbit_stream_core:incoming_data(Data2, C1));
129+
Res ->
130+
Res
131+
end;
132+
{error, Err} ->
133+
ct:fail("error receiving stream data ~w", [Err])
134+
end;
135+
Res ->
136+
Res
137+
end.

deps/rabbitmq_prometheus/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ PROJECT_DESCRIPTION = Prometheus metrics for RabbitMQ
1111
PROJECT_MOD := rabbit_prometheus_app
1212
DEPS = accept cowboy rabbit rabbitmq_management_agent prometheus rabbitmq_web_dispatch
1313
BUILD_DEPS = amqp_client rabbit_common rabbitmq_management
14-
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers eunit_formatters
14+
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers eunit_formatters rabbitmq_stream
1515

1616
EUNIT_OPTS = no_tty, {report, {eunit_progress, [colored, profile]}}
1717

deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,11 @@
200200
{4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"}
201201
]},
202202

203+
%% the family name for this metric is stream_consumer_metrics but the real table used for data is rabbit_stream_consumer_created.
204+
{stream_consumer_metrics, [
205+
{2, undefined, stream_consumer_max_offset_lag, gauge, "Current maximum of offset lag of consumers"}
206+
]},
207+
203208
{connection_metrics, [
204209
{2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt},
205210
{2, undefined, connection_outgoing_packets_total, counter, "Total number of packets sent on a connection", send_cnt},
@@ -578,6 +583,17 @@ get_data(channel_metrics = Table, false, _) ->
578583
[{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3},
579584
{messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6},
580585
{global_prefetch_count, A7}]}];
586+
get_data(stream_consumer_metrics = MF, false, _) ->
587+
Table = rabbit_stream_consumer_created, %% real table name
588+
try ets:foldl(fun({_, Props}, OldMax) ->
589+
erlang:max(proplists:get_value(offset_lag, Props, 0), OldMax)
590+
end, 0, Table) of
591+
MaxOffsetLag ->
592+
[{MF, MaxOffsetLag}]
593+
catch error:badarg ->
594+
%% rabbitmq_stream plugin is not enabled
595+
[]
596+
end;
581597
get_data(queue_consumer_count = MF, false, VHostsFilter) ->
582598
Table = queue_metrics, %% Real table name
583599
{_, A1} = ets:foldl(fun
@@ -708,6 +724,22 @@ get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics
708724
end, [], Table);
709725
get_data(queue_consumer_count, true, _) ->
710726
ets:tab2list(queue_metrics);
727+
get_data(stream_consumer_metrics, true, _) ->
728+
Table = rabbit_stream_consumer_created, %% real table name
729+
try ets:foldl(fun({{QueueName, _Pid, _SubId}, Props}, Map0) ->
730+
Value = proplists:get_value(offset_lag, Props, 0),
731+
maps:update_with(
732+
QueueName,
733+
fun(OldMax) -> erlang:max(Value, OldMax) end,
734+
Value,
735+
Map0)
736+
end, #{}, Table) of
737+
Map1 ->
738+
maps:to_list(Map1)
739+
catch error:badarg ->
740+
%% rabbitmq_stream plugin is not enabled
741+
[]
742+
end;
711743
get_data(vhost_status, _, _) ->
712744
[ { #{<<"vhost">> => VHost},
713745
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of

deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@
1111
-include_lib("common_test/include/ct.hrl").
1212
-include_lib("eunit/include/eunit.hrl").
1313
-include_lib("rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl").
14+
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
1415

15-
-compile(export_all).
16+
-compile([export_all, nowarn_export_all]).
1617

1718
all() ->
1819
[
@@ -70,7 +71,8 @@ groups() ->
7071
queue_consumer_count_and_queue_metrics_mutually_exclusive_test,
7172
vhost_status_metric,
7273
exchange_bindings_metric,
73-
exchange_names_metric
74+
exchange_names_metric,
75+
stream_pub_sub_metrics
7476
]},
7577
{special_chars, [], [core_metrics_special_chars]},
7678
{authentication, [], [basic_auth]}
@@ -739,6 +741,37 @@ exchange_names_metric(Config) ->
739741
}, Names),
740742
ok.
741743

744+
stream_pub_sub_metrics(Config) ->
745+
Stream1 = atom_to_list(?FUNCTION_NAME) ++ "1",
746+
MsgPerBatch1 = 2,
747+
publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config),
748+
Stream2 = atom_to_list(?FUNCTION_NAME) ++ "2",
749+
MsgPerBatch2 = 3,
750+
publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config),
751+
752+
%% aggregated metrics
753+
754+
%% wait for the stream to emit stats
755+
%% (collect_statistics_interval set to 100ms in this test group)
756+
?awaitMatch(V when V == #{rabbitmq_stream_consumer_max_offset_lag => #{undefined => [3]}},
757+
begin
758+
{_, Body1} = http_get_with_pal(Config, "/metrics", [], 200),
759+
maps:with([rabbitmq_stream_consumer_max_offset_lag],
760+
parse_response(Body1))
761+
end,
762+
100),
763+
764+
%% per-object metrics
765+
{_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=stream_consumer_metrics",
766+
[], 200),
767+
ParsedBody2 = parse_response(Body2),
768+
#{rabbitmq_detailed_stream_consumer_max_offset_lag := MaxOffsetLag} = ParsedBody2,
769+
770+
?assertEqual([{#{vhost => "/", queue => Stream1}, [2]},
771+
{#{vhost => "/", queue => Stream2}, [3]}],
772+
lists:sort(maps:to_list(MaxOffsetLag))),
773+
ok.
774+
742775
core_metrics_special_chars(Config) ->
743776
{_, Body1} = http_get_with_pal(Config, "/metrics/detailed?family=queue_coarse_metrics", [], 200),
744777
?assertMatch(#{rabbitmq_detailed_queue_messages :=
@@ -784,6 +817,30 @@ basic_auth(Config) ->
784817
rabbit_ct_broker_helpers:delete_user(Config, <<"monitor">>),
785818
rabbit_ct_broker_helpers:delete_user(Config, <<"management">>).
786819

820+
%% -------------------------------------------------------------------
821+
%% Helpers
822+
%% -------------------------------------------------------------------
823+
824+
publish_via_stream_protocol(Stream, MsgPerBatch, Config) ->
825+
{ok, S, C0} = stream_test_utils:connect(Config, 0),
826+
{ok, C1} = stream_test_utils:create_stream(S, C0, Stream),
827+
PublisherId = 98,
828+
{ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId),
829+
Payloads = lists:duplicate(MsgPerBatch, <<"m1">>),
830+
SequenceFrom1 = 1,
831+
{ok, C3} = stream_test_utils:publish(S, C2, PublisherId, SequenceFrom1, Payloads),
832+
833+
PublisherId2 = 99,
834+
{ok, C4} = stream_test_utils:declare_publisher(S, C3, Stream, PublisherId2),
835+
Payloads2 = lists:duplicate(MsgPerBatch, <<"m2">>),
836+
SequenceFrom2 = SequenceFrom1 + MsgPerBatch,
837+
{ok, C5} = stream_test_utils:publish(S, C4, PublisherId2, SequenceFrom2, Payloads2),
838+
839+
SubscriptionId = 97,
840+
{ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 1),
841+
%% delivery of first batch of messages
842+
{{deliver, SubscriptionId, _Bin1}, _C7} = stream_test_utils:receive_stream_commands(S, C6),
843+
ok.
787844

788845
http_get(Config, ReqHeaders, CodeExp) ->
789846
Path = proplists:get_value(prometheus_path, Config, "/metrics"),

0 commit comments

Comments
 (0)