88
99-include_lib (" eunit/include/eunit.hrl" ).
1010-include_lib (" amqp_client/include/amqp_client.hrl" ).
11+ -include_lib (" rabbitmq_ct_helpers/include/rabbit_assert.hrl" ).
1112
1213-compile ([nowarn_export_all , export_all ]).
1314
1819
1920all () ->
2021 [
22+ {group , cluster_size_1 },
2123 {group , cluster_size_3 }
2224 ].
2325
2426groups () ->
2527 [
28+ {cluster_size_1 , [], [
29+ classic_queue_flow_control_enabled ,
30+ classic_queue_flow_control_disabled
31+ ]
32+ },
2633 {cluster_size_3 , [], [
2734 leader_locator_client_local ,
2835 leader_locator_balanced ,
@@ -42,10 +49,14 @@ end_per_suite(Config) ->
4249 rabbit_ct_helpers :run_teardown_steps (Config ).
4350
4451init_per_group (Group , Config ) ->
52+ Nodes = case Group of
53+ cluster_size_1 -> 1 ;
54+ cluster_size_3 -> 3
55+ end ,
4556 Config1 = rabbit_ct_helpers :set_config (Config ,
4657 [
4758 {rmq_nodename_suffix , Group },
48- {rmq_nodes_count , 3 },
59+ {rmq_nodes_count , Nodes },
4960 {rmq_nodes_clustered , true },
5061 {tcp_ports_base , {skip_n_nodes , 3 }}
5162 ]),
@@ -72,6 +83,67 @@ init_per_testcase(T, Config) ->
7283% % Testcases.
7384% % -------------------------------------------------------------------
7485
86+ classic_queue_flow_control_enabled (Config ) ->
87+ FlowEnabled = true ,
88+ VerifyFun =
89+ fun (QPid , ConnPid ) ->
90+ % % Only 2+2 messages reach the message queue of the classic queue.
91+ % % (before the credits of the connection and channel processes run out)
92+ ? awaitMatch (4 , proc_info (QPid , message_queue_len ), 1000 ),
93+ ? assertMatch ({0 , _ }, gen_server2_queue (QPid )),
94+
95+ % % The connection gets into flow state
96+ ? assertEqual ([{state , flow }], rabbit_reader :info (ConnPid , [state ])),
97+
98+ Dict = proc_info (ConnPid , dictionary ),
99+ ? assertMatch ([_ |_ ], proplists :get_value (credit_blocked , Dict )),
100+ ok
101+ end ,
102+ flow_control (Config , FlowEnabled , VerifyFun ).
103+
104+ classic_queue_flow_control_disabled (Config ) ->
105+ FlowEnabled = false ,
106+ VerifyFun =
107+ fun (QPid , ConnPid ) ->
108+ % % All published messages will end up in the message
109+ % % queue of the suspended classic queue process
110+ ? awaitMatch (100 , proc_info (QPid , message_queue_len ), 1000 ),
111+ ? assertMatch ({0 , _ }, gen_server2_queue (QPid )),
112+
113+ % % The connection dos not get into flow state
114+ ? assertEqual ([{state , running }], rabbit_reader :info (ConnPid , [state ])),
115+
116+ Dict = proc_info (ConnPid , dictionary ),
117+ ? assertMatch ([], proplists :get_value (credit_blocked , Dict , []))
118+ end ,
119+ flow_control (Config , FlowEnabled , VerifyFun ).
120+
121+ flow_control (Config , FlowEnabled , VerifyFun ) ->
122+ OrigCredit = set_default_credit (Config , {2 , 1 }),
123+ OrigFlow = set_flow_control (Config , FlowEnabled ),
124+
125+ Ch = rabbit_ct_client_helpers :open_channel (Config ),
126+ QueueName = atom_to_binary (? FUNCTION_NAME ),
127+ declare (Ch , QueueName , [{<<" x-queue-type" >>, longstr , <<" classic" >>}]),
128+ QPid = get_queue_pid (Config , QueueName ),
129+ try
130+ sys :suspend (QPid ),
131+
132+ % % Publish 100 messages without publisher confirms
133+ publish_many (Ch , QueueName , 100 ),
134+
135+ [ConnPid ] = rabbit_ct_broker_helpers :rpc (Config , rabbit_networking , local_connections , []),
136+
137+ VerifyFun (QPid , ConnPid ),
138+ ok
139+ after
140+ sys :resume (QPid ),
141+ delete_queues (Ch , [QueueName ]),
142+ set_default_credit (Config , OrigCredit ),
143+ set_flow_control (Config , OrigFlow ),
144+ rabbit_ct_client_helpers :close_channel (Ch )
145+ end .
146+
75147leader_locator_client_local (Config ) ->
76148 Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
77149 Q = <<" q1" >>,
@@ -129,7 +201,55 @@ declare(Ch, Q, Args) ->
129201 auto_delete = false ,
130202 arguments = Args }).
131203
204+ delete_queues (Ch , Qs ) ->
205+ [? assertMatch (# 'queue.delete_ok' {},
206+ amqp_channel :call (Ch , # 'queue.delete' {queue = Q }))
207+ || Q <- Qs ].
208+
132209delete_queues () ->
133210 [rabbit_amqqueue :delete (Q , false , false , <<" dummy" >>)
134211 || Q <- rabbit_amqqueue :list ()].
135212
213+
214+ publish (Ch , QName , Payload ) ->
215+ amqp_channel :cast (Ch ,
216+ # 'basic.publish' {exchange = <<>>,
217+ routing_key = QName },
218+ # amqp_msg {payload = Payload }).
219+
220+ publish_many (Ch , QName , Count ) ->
221+ [publish (Ch , QName , integer_to_binary (I ))
222+ || I <- lists :seq (1 , Count )].
223+
224+ proc_info (Pid , Info ) ->
225+ case rabbit_misc :process_info (Pid , Info ) of
226+ {Info , Value } ->
227+ Value ;
228+ Error ->
229+ {error , Error }
230+ end .
231+
232+ gen_server2_queue (Pid ) ->
233+ Status = sys :get_status (Pid ),
234+ {status , Pid ,_Mod ,
235+ [_Dict , _SysStatus , _Parent , _Dbg ,
236+ [{header , _ },
237+ {data , Data }|_ ]]} = Status ,
238+ proplists :get_value (" Queued messages" , Data ).
239+
240+ set_default_credit (Config , Value ) ->
241+ Key = credit_flow_default_credit ,
242+ OrigValue = rabbit_ct_broker_helpers :rpc (Config , persistent_term , get , [Key ]),
243+ ok = rabbit_ct_broker_helpers :rpc (Config , persistent_term , put , [Key , Value ]),
244+ OrigValue .
245+
246+ set_flow_control (Config , Value ) when is_boolean (Value ) ->
247+ Key = classic_queue_flow_control ,
248+ {ok , OrigValue } = rabbit_ct_broker_helpers :rpc (Config , application , get_env , [rabbit , Key ]),
249+ rabbit_ct_broker_helpers :rpc (Config , application , set_env , [rabbit , Key , Value ]),
250+ OrigValue .
251+
252+ get_queue_pid (Config , QueueName ) ->
253+ {ok , QRec } = rabbit_ct_broker_helpers :rpc (
254+ Config , 0 , rabbit_amqqueue , lookup , [QueueName , <<" /" >>]),
255+ amqqueue :get_pid (QRec ).
0 commit comments