1313-include_lib (" amqp_client/include/amqp_client.hrl" ).
1414
1515-import (rabbit_ct_broker_helpers ,
16- [rpc /4 ]).
16+ [rpc /4 , rpc / 5 ]).
1717
1818all () ->
1919 [
20- {group , tests }
20+ {group , cluster_size_1 },
21+ {group , cluster_size_3 }
2122 ].
2223
2324groups () ->
2425 [
25- {tests , [shuffle ],
26+ {cluster_size_1 , [],
2627 [message_size ,
27- over_max_message_size ]}
28+ over_max_message_size ]},
29+ {cluster_size_3 , [],
30+ [summary ]
31+ }
2832 ].
2933
3034% % -------------------------------------------------------------------
@@ -34,16 +38,32 @@ groups() ->
3438init_per_suite (Config ) ->
3539 {ok , _ } = application :ensure_all_started (amqp10_client ),
3640 rabbit_ct_helpers :log_environment (),
37- rabbit_ct_helpers : run_setup_steps ( Config ) .
41+ Config .
3842
3943end_per_suite (Config ) ->
4044 rabbit_ct_helpers :run_teardown_steps (Config ).
4145
42- init_per_group (_Group , Config ) ->
43- rabbit_ct_helpers :run_steps (
44- Config ,
45- rabbit_ct_broker_helpers :setup_steps () ++
46- rabbit_ct_client_helpers :setup_steps ()).
46+ init_per_group (Group , Config ) ->
47+ Nodes = case Group of
48+ cluster_size_1 -> 1 ;
49+ cluster_size_3 -> 3
50+ end ,
51+ Suffix = rabbit_ct_helpers :testcase_absname (Config , " " , " -" ),
52+ Config1 = rabbit_ct_helpers :set_config (
53+ Config , [{rmq_nodes_count , Nodes },
54+ {rmq_nodename_suffix , Suffix },
55+ {metadata_store , mnesia }]),
56+
57+ Config2 = rabbit_ct_helpers :run_setup_steps (
58+ Config1 ,
59+ rabbit_ct_broker_helpers :setup_steps () ++
60+ rabbit_ct_client_helpers :setup_steps ()),
61+ case rabbit_ct_broker_helpers :enable_feature_flag (Config2 , 'rabbitmq_4.2.0' ) of
62+ ok ->
63+ Config2 ;
64+ {skip , _ } = Skip ->
65+ Skip
66+ end .
4767
4868end_per_group (_Group , Config ) ->
4969 rabbit_ct_helpers :run_steps (
@@ -65,32 +85,7 @@ message_size(Config) ->
6585 AmqplBefore = get_msg_size_metrics (amqp091 , Config ),
6686 AmqpBefore = get_msg_size_metrics (amqp10 , Config ),
6787
68- Binary2B = <<" 12" >>,
69- Binary200K = binary :copy (<<" x" >>, 200_000 ),
70- Payloads = [Binary2B , Binary200K , Binary2B ],
71-
72- {AmqplConn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config ),
73- [amqp_channel :call (Ch ,
74- # 'basic.publish' {routing_key = <<" nowhere" >>},
75- # amqp_msg {payload = Payload })
76- || Payload <- Payloads ],
77-
78- OpnConf = connection_config (Config ),
79- {ok , Connection } = amqp10_client :open_connection (OpnConf ),
80- {ok , Session } = amqp10_client :begin_session_sync (Connection ),
81- Address = rabbitmq_amqp_address :exchange (<<" amq.fanout" >>),
82- {ok , Sender } = amqp10_client :attach_sender_link_sync (Session , <<" sender" >>, Address ),
83- receive {amqp10_event , {link , Sender , credited }} -> ok
84- after 30_000 -> ct :fail (credited_timeout )
85- end ,
86-
87- ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" tag1" >>, Binary2B )),
88- ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" tag2" >>, Binary200K )),
89- ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" tag3" >>, Binary2B )),
90-
91- ok = wait_for_settlement (released , <<" tag1" >>),
92- ok = wait_for_settlement (released , <<" tag2" >>),
93- ok = wait_for_settlement (released , <<" tag3" >>),
88+ publish_messages (Config ),
9489
9590 AmqplAfter = get_msg_size_metrics (amqp091 , Config ),
9691 AmqpAfter = get_msg_size_metrics (amqp10 , Config ),
@@ -100,10 +95,7 @@ message_size(Config) ->
10095 ? assertEqual (ExpectedDiff ,
10196 rabbit_msg_size_metrics :diff_raw_buckets (AmqplAfter , AmqplBefore )),
10297 ? assertEqual (ExpectedDiff ,
103- rabbit_msg_size_metrics :diff_raw_buckets (AmqpAfter , AmqpBefore )),
104-
105- ok = amqp10_client :close_connection (Connection ),
106- ok = rabbit_ct_client_helpers :close_connection_and_channel (AmqplConn , Ch ).
98+ rabbit_msg_size_metrics :diff_raw_buckets (AmqpAfter , AmqpBefore )).
10799
108100over_max_message_size (Config ) ->
109101 DefaultMaxMessageSize = rpc (Config , persistent_term , get , [max_message_size ]),
@@ -134,6 +126,39 @@ over_max_message_size(Config) ->
134126 ok = rabbit_ct_client_helpers :close_connection (Conn ),
135127 ok = rpc (Config , persistent_term , put , [max_message_size , DefaultMaxMessageSize ]).
136128
129+ summary (Config ) ->
130+ ZeroSummary = [{{0 , 100 }, {0 , 0.0 }},
131+ {{101 , 1000 }, {0 , 0.0 }},
132+ {{1001 , 10000 }, {0 , 0.0 }},
133+ {{10001 , 100000 }, {0 , 0.0 }},
134+ {{100001 , 1000000 }, {0 , 0.0 }},
135+ {{1000001 , 10000000 }, {0 , 0.0 }},
136+ {{10000001 , 50000000 }, {0 , 0.0 }},
137+ {{50000001 , 100000000 }, {0 , 0.0 }},
138+ {{100000001 , infinity }, {0 , 0.0 }}],
139+
140+ ? assertEqual (ZeroSummary , rpc (Config , 0 , rabbit_msg_size_metrics , local_summary , [])),
141+ ? assertEqual (ZeroSummary , rpc (Config , 1 , rabbit_msg_size_metrics , cluster_summary , [])),
142+ ? assertEqual (ZeroSummary , rpc (Config , 0 , rabbit_msg_size_metrics , local_summary , [])),
143+ ? assertEqual (ZeroSummary , rpc (Config , 1 , rabbit_msg_size_metrics , cluster_summary , [])),
144+
145+ publish_messages (Config ),
146+
147+ ExpectedSummary = [{{0 , 100 }, {4 , 66.66666666666666 }},
148+ {{101 , 1000 }, {0 , 0.0 }},
149+ {{1001 , 10000 }, {0 , 0.0 }},
150+ {{10001 , 100000 }, {0 , 0.0 }},
151+ {{100001 , 1000000 }, {2 , 33.33333333333333 }},
152+ {{1000001 , 10000000 }, {0 , 0.0 }},
153+ {{10000001 , 50000000 }, {0 , 0.0 }},
154+ {{50000001 , 100000000 }, {0 , 0.0 }},
155+ {{100000001 , infinity }, {0 , 0.0 }}],
156+
157+ ? assertEqual (ExpectedSummary , rpc (Config , 0 , rabbit_msg_size_metrics , local_summary , [])),
158+ ? assertEqual (ExpectedSummary , rpc (Config , 0 , rabbit_msg_size_metrics , cluster_summary , [])),
159+ ? assertEqual (ExpectedSummary , rpc (Config , 1 , rabbit_msg_size_metrics , cluster_summary , [])),
160+ ? assertEqual (ZeroSummary , rpc (Config , 1 , rabbit_msg_size_metrics , local_summary , [])).
161+
137162get_msg_size_metrics (Protocol , Config ) ->
138163 rpc (Config , rabbit_msg_size_metrics , raw_buckets , [Protocol ]).
139164
@@ -145,6 +170,36 @@ connection_config(Config) ->
145170 container_id => <<" my container" >>,
146171 sasl => anon }.
147172
173+ publish_messages (Config ) ->
174+ Binary2B = <<" 12" >>,
175+ Binary200K = binary :copy (<<" x" >>, 200_000 ),
176+ Payloads = [Binary2B , Binary200K , Binary2B ],
177+
178+ {AmqplConn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config ),
179+ [amqp_channel :call (Ch ,
180+ # 'basic.publish' {routing_key = <<" nowhere" >>},
181+ # amqp_msg {payload = Payload })
182+ || Payload <- Payloads ],
183+
184+ OpnConf = connection_config (Config ),
185+ {ok , Connection } = amqp10_client :open_connection (OpnConf ),
186+ {ok , Session } = amqp10_client :begin_session_sync (Connection ),
187+ Address = rabbitmq_amqp_address :exchange (<<" amq.fanout" >>),
188+ {ok , Sender } = amqp10_client :attach_sender_link_sync (Session , <<" sender" >>, Address ),
189+ receive {amqp10_event , {link , Sender , credited }} -> ok
190+ after 30_000 -> ct :fail (credited_timeout )
191+ end ,
192+
193+ ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" tag1" >>, Binary2B )),
194+ ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" tag2" >>, Binary200K )),
195+ ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" tag3" >>, Binary2B )),
196+
197+ ok = wait_for_settlement (released , <<" tag1" >>),
198+ ok = wait_for_settlement (released , <<" tag2" >>),
199+ ok = wait_for_settlement (released , <<" tag3" >>),
200+ ok = amqp10_client :close_connection (Connection ),
201+ ok = rabbit_ct_client_helpers :close_connection_and_channel (AmqplConn , Ch ).
202+
148203wait_for_settlement (State , Tag ) ->
149204 receive
150205 {amqp10_disposition , {State , Tag }} ->
0 commit comments