1313-include_lib (" amqp_client/include/amqp_client.hrl" ).
1414
1515-import (rabbit_ct_broker_helpers ,
16- [rpc /4 ]).
16+ [rpc /4 , rpc / 5 ]).
1717
1818all () ->
1919 [
20- {group , tests }
20+ {group , cluster_size_1 },
21+ {group , cluster_size_2 }
2122 ].
2223
2324groups () ->
2425 [
25- {tests , [shuffle ],
26+ {cluster_size_1 , [],
2627 [message_size ,
27- over_max_message_size ]}
28+ over_max_message_size ]},
29+ {cluster_size_2 , [],
30+ [summary ]
31+ }
2832 ].
2933
3034% % -------------------------------------------------------------------
@@ -39,7 +43,19 @@ init_per_suite(Config) ->
3943end_per_suite (Config ) ->
4044 rabbit_ct_helpers :run_teardown_steps (Config ).
4145
42- init_per_group (_Group , Config ) ->
46+ init_per_group (cluster_size_2 , Config0 ) ->
47+ Config1 = rabbit_ct_helpers :set_config (Config0 , [{rmq_nodes_count , 2 }]),
48+ Config = rabbit_ct_helpers :run_steps (
49+ Config1 ,
50+ rabbit_ct_broker_helpers :setup_steps () ++
51+ rabbit_ct_client_helpers :setup_steps ()),
52+ case rabbit_ct_broker_helpers :enable_feature_flag (Config , 'rabbitmq_4.2.0' ) of
53+ ok ->
54+ Config ;
55+ {skip , _ } = Skip ->
56+ Skip
57+ end ;
58+ init_per_group (_Grooup , Config ) ->
4359 rabbit_ct_helpers :run_steps (
4460 Config ,
4561 rabbit_ct_broker_helpers :setup_steps () ++
@@ -65,32 +81,7 @@ message_size(Config) ->
6581 AmqplBefore = get_msg_size_metrics (amqp091 , Config ),
6682 AmqpBefore = get_msg_size_metrics (amqp10 , Config ),
6783
68- Binary2B = <<" 12" >>,
69- Binary200K = binary :copy (<<" x" >>, 200_000 ),
70- Payloads = [Binary2B , Binary200K , Binary2B ],
71-
72- {AmqplConn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config ),
73- [amqp_channel :call (Ch ,
74- # 'basic.publish' {routing_key = <<" nowhere" >>},
75- # amqp_msg {payload = Payload })
76- || Payload <- Payloads ],
77-
78- OpnConf = connection_config (Config ),
79- {ok , Connection } = amqp10_client :open_connection (OpnConf ),
80- {ok , Session } = amqp10_client :begin_session_sync (Connection ),
81- Address = rabbitmq_amqp_address :exchange (<<" amq.fanout" >>),
82- {ok , Sender } = amqp10_client :attach_sender_link_sync (Session , <<" sender" >>, Address ),
83- receive {amqp10_event , {link , Sender , credited }} -> ok
84- after 30_000 -> ct :fail (credited_timeout )
85- end ,
86-
87- ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" tag1" >>, Binary2B )),
88- ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" tag2" >>, Binary200K )),
89- ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" tag3" >>, Binary2B )),
90-
91- ok = wait_for_settlement (released , <<" tag1" >>),
92- ok = wait_for_settlement (released , <<" tag2" >>),
93- ok = wait_for_settlement (released , <<" tag3" >>),
84+ publish_messages (Config ),
9485
9586 AmqplAfter = get_msg_size_metrics (amqp091 , Config ),
9687 AmqpAfter = get_msg_size_metrics (amqp10 , Config ),
@@ -100,10 +91,7 @@ message_size(Config) ->
10091 ? assertEqual (ExpectedDiff ,
10192 rabbit_msg_size_metrics :diff_raw_buckets (AmqplAfter , AmqplBefore )),
10293 ? assertEqual (ExpectedDiff ,
103- rabbit_msg_size_metrics :diff_raw_buckets (AmqpAfter , AmqpBefore )),
104-
105- ok = amqp10_client :close_connection (Connection ),
106- ok = rabbit_ct_client_helpers :close_connection_and_channel (AmqplConn , Ch ).
94+ rabbit_msg_size_metrics :diff_raw_buckets (AmqpAfter , AmqpBefore )).
10795
10896over_max_message_size (Config ) ->
10997 DefaultMaxMessageSize = rpc (Config , persistent_term , get , [max_message_size ]),
@@ -134,6 +122,39 @@ over_max_message_size(Config) ->
134122 ok = rabbit_ct_client_helpers :close_connection (Conn ),
135123 ok = rpc (Config , persistent_term , put , [max_message_size , DefaultMaxMessageSize ]).
136124
125+ summary (Config ) ->
126+ ZeroSummary = [{{0 , 100 }, {0 , 0.0 }},
127+ {{101 , 1000 }, {0 , 0.0 }},
128+ {{1001 , 10000 }, {0 , 0.0 }},
129+ {{10001 , 100000 }, {0 , 0.0 }},
130+ {{100001 , 1000000 }, {0 , 0.0 }},
131+ {{1000001 , 10000000 }, {0 , 0.0 }},
132+ {{10000001 , 50000000 }, {0 , 0.0 }},
133+ {{50000001 , 100000000 }, {0 , 0.0 }},
134+ {{100000001 , infinity }, {0 , 0.0 }}],
135+
136+ ? assertEqual (ZeroSummary , rpc (Config , 0 , rabbit_msg_size_metrics , local_summary , [])),
137+ ? assertEqual (ZeroSummary , rpc (Config , 1 , rabbit_msg_size_metrics , cluster_summary , [])),
138+ ? assertEqual (ZeroSummary , rpc (Config , 0 , rabbit_msg_size_metrics , local_summary , [])),
139+ ? assertEqual (ZeroSummary , rpc (Config , 1 , rabbit_msg_size_metrics , cluster_summary , [])),
140+
141+ publish_messages (Config ),
142+
143+ ExpectedSummary = [{{0 , 100 }, {4 , 66.66666666666666 }},
144+ {{101 , 1000 }, {0 , 0.0 }},
145+ {{1001 , 10000 }, {0 , 0.0 }},
146+ {{10001 , 100000 }, {0 , 0.0 }},
147+ {{100001 , 1000000 }, {2 , 33.33333333333333 }},
148+ {{1000001 , 10000000 }, {0 , 0.0 }},
149+ {{10000001 , 50000000 }, {0 , 0.0 }},
150+ {{50000001 , 100000000 }, {0 , 0.0 }},
151+ {{100000001 , infinity }, {0 , 0.0 }}],
152+
153+ ? assertEqual (ExpectedSummary , rpc (Config , 0 , rabbit_msg_size_metrics , local_summary , [])),
154+ ? assertEqual (ExpectedSummary , rpc (Config , 0 , rabbit_msg_size_metrics , cluster_summary , [])),
155+ ? assertEqual (ExpectedSummary , rpc (Config , 1 , rabbit_msg_size_metrics , cluster_summary , [])),
156+ ? assertEqual (ZeroSummary , rpc (Config , 1 , rabbit_msg_size_metrics , local_summary , [])).
157+
137158get_msg_size_metrics (Protocol , Config ) ->
138159 rpc (Config , rabbit_msg_size_metrics , raw_buckets , [Protocol ]).
139160
@@ -145,6 +166,36 @@ connection_config(Config) ->
145166 container_id => <<" my container" >>,
146167 sasl => anon }.
147168
169+ publish_messages (Config ) ->
170+ Binary2B = <<" 12" >>,
171+ Binary200K = binary :copy (<<" x" >>, 200_000 ),
172+ Payloads = [Binary2B , Binary200K , Binary2B ],
173+
174+ {AmqplConn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config ),
175+ [amqp_channel :call (Ch ,
176+ # 'basic.publish' {routing_key = <<" nowhere" >>},
177+ # amqp_msg {payload = Payload })
178+ || Payload <- Payloads ],
179+
180+ OpnConf = connection_config (Config ),
181+ {ok , Connection } = amqp10_client :open_connection (OpnConf ),
182+ {ok , Session } = amqp10_client :begin_session_sync (Connection ),
183+ Address = rabbitmq_amqp_address :exchange (<<" amq.fanout" >>),
184+ {ok , Sender } = amqp10_client :attach_sender_link_sync (Session , <<" sender" >>, Address ),
185+ receive {amqp10_event , {link , Sender , credited }} -> ok
186+ after 30_000 -> ct :fail (credited_timeout )
187+ end ,
188+
189+ ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" tag1" >>, Binary2B )),
190+ ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" tag2" >>, Binary200K )),
191+ ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" tag3" >>, Binary2B )),
192+
193+ ok = wait_for_settlement (released , <<" tag1" >>),
194+ ok = wait_for_settlement (released , <<" tag2" >>),
195+ ok = wait_for_settlement (released , <<" tag3" >>),
196+ ok = amqp10_client :close_connection (Connection ),
197+ ok = rabbit_ct_client_helpers :close_connection_and_channel (AmqplConn , Ch ).
198+
148199wait_for_settlement (State , Tag ) ->
149200 receive
150201 {amqp10_disposition , {State , Tag }} ->
0 commit comments