@@ -56,38 +56,35 @@ all() ->
5656 [{group , mqtt },
5757 {group , web_mqtt }].
5858
59+ % % The code being tested under v3 and v4 is almost identical.
60+ % % To save time in CI, we therefore run only a very small subset of tests in v3.
5961groups () ->
6062 [
6163 {mqtt , [],
62- [{v3 , [],
63- [{cluster_size_1 , [], cluster_size_1_tests ()},
64- {cluster_size_3 , [], cluster_size_3_tests ()},
65- {mnesia_store , [], mnesia_store_tests ()}]},
66- {v4 , [],
67- [{cluster_size_1 , [], cluster_size_1_tests ()},
68- {cluster_size_3 , [], cluster_size_3_tests ()},
69- {mnesia_store , [], mnesia_store_tests ()}]},
70- {v5 , [],
71- [{cluster_size_1 , [], cluster_size_1_tests ()},
72- {cluster_size_3 , [], cluster_size_3_tests ()},
73- {mnesia_store , [], mnesia_store_tests ()}]}
64+ [{cluster_size_1 , [],
65+ [{v3 , [], cluster_size_1_tests_v3 ()},
66+ {v4 , [], cluster_size_1_tests ()},
67+ {v5 , [], cluster_size_1_tests ()}]},
68+ {cluster_size_3 , [],
69+ [{v4 , [], cluster_size_3_tests ()},
70+ {v5 , [], cluster_size_3_tests ()}]}
7471 ]},
7572 {web_mqtt , [],
76- [{v3 , [],
77- [{cluster_size_1 , [], cluster_size_1_tests ()},
78- {cluster_size_3 , [], cluster_size_3_tests ()},
79- {mnesia_store , [], mnesia_store_tests ()}]},
80- {v4 , [],
81- [{cluster_size_1 , [], cluster_size_1_tests ()},
82- {cluster_size_3 , [], cluster_size_3_tests ()},
83- {mnesia_store , [], mnesia_store_tests ()}]},
84- {v5 , [],
85- [{cluster_size_1 , [], cluster_size_1_tests ()},
86- {cluster_size_3 , [], cluster_size_3_tests ()},
87- {mnesia_store , [], mnesia_store_tests ()}]}
73+ [{cluster_size_1 , [],
74+ [{v3 , [], cluster_size_1_tests_v3 ()},
75+ {v4 , [], cluster_size_1_tests ()},
76+ {v5 , [], cluster_size_1_tests ()}]},
77+ {cluster_size_3 , [],
78+ [{v4 , [], cluster_size_3_tests ()},
79+ {v5 , [], cluster_size_3_tests ()}]}
8880 ]}
8981 ].
9082
83+ cluster_size_1_tests_v3 () ->
84+ [global_counters ,
85+ events
86+ ].
87+
9188cluster_size_1_tests () ->
9289 [
9390 global_counters % % must be the 1st test case
@@ -148,13 +145,9 @@ cluster_size_3_tests() ->
148145 session_reconnect ,
149146 session_takeover ,
150147 duplicate_client_id ,
151- maintenance
152- ].
153-
154- mnesia_store_tests () ->
155- [
156148 publish_to_all_queue_types_qos0 ,
157- publish_to_all_queue_types_qos1
149+ publish_to_all_queue_types_qos1 ,
150+ maintenance
158151 ].
159152
160153suite () ->
@@ -166,7 +159,12 @@ suite() ->
166159
167160init_per_suite (Config ) ->
168161 rabbit_ct_helpers :log_environment (),
169- rabbit_ct_helpers :run_setup_steps (Config ).
162+ Config1 = rabbit_ct_helpers :merge_app_env (
163+ Config , {rabbit , [
164+ {quorum_tick_interval , 1000 },
165+ {stream_tick_interval , 1000 }
166+ ]}),
167+ rabbit_ct_helpers :run_setup_steps (Config1 ).
170168
171169end_per_suite (Config ) ->
172170 rabbit_ct_helpers :run_teardown_steps (Config ).
@@ -176,39 +174,31 @@ init_per_group(mqtt, Config) ->
176174init_per_group (web_mqtt , Config ) ->
177175 rabbit_ct_helpers :set_config (Config , {websocket , true });
178176
179- init_per_group (Group , Config )
177+ init_per_group (Group , Config0 )
180178 when Group =:= v3 ;
181179 Group =:= v4 ;
182180 Group =:= v5 ->
183- rabbit_ct_helpers :set_config (Config , {mqtt_version , Group });
181+ Config = rabbit_ct_helpers :set_config (Config0 , {mqtt_version , Group }),
182+ util :maybe_skip_v5 (Config );
184183
185184init_per_group (Group , Config0 ) ->
186185 Nodes = case Group of
187186 cluster_size_1 -> 1 ;
188- cluster_size_3 -> 3 ;
189- mnesia_store -> 3
187+ cluster_size_3 -> 3
190188 end ,
191189 Suffix = rabbit_ct_helpers :testcase_absname (Config0 , " " , " -" ),
192- Config1 = case Group of
193- mnesia_store ->
194- rabbit_ct_helpers :set_config (Config0 , {metadata_store , mnesia });
195- _ ->
196- Config0
197- end ,
198- Config2 = rabbit_ct_helpers :set_config (
199- Config1 ,
200- [{rmq_nodes_count , Nodes },
201- {rmq_nodename_suffix , Suffix }]),
202- Config = rabbit_ct_helpers :run_steps (
203- Config2 ,
204- rabbit_ct_broker_helpers :setup_steps () ++
205- rabbit_ct_client_helpers :setup_steps ()),
206- util :maybe_skip_v5 (Config ).
190+ Config = rabbit_ct_helpers :set_config (
191+ Config0 ,
192+ [{rmq_nodes_count , Nodes },
193+ {rmq_nodename_suffix , Suffix }]),
194+ rabbit_ct_helpers :run_steps (
195+ Config ,
196+ rabbit_ct_broker_helpers :setup_steps () ++
197+ rabbit_ct_client_helpers :setup_steps ()).
207198
208199end_per_group (G , Config )
209200 when G =:= cluster_size_1 ;
210- G =:= cluster_size_3 ;
211- G =:= mnesia_store ->
201+ G =:= cluster_size_3 ->
212202 rabbit_ct_helpers :run_steps (
213203 Config ,
214204 rabbit_ct_client_helpers :teardown_steps () ++
@@ -410,19 +400,21 @@ publish_to_all_queue_types(Config, QoS) ->
410400 declare_queue (Ch , SQ , [{<<" x-queue-type" >>, longstr , <<" stream" >>}]),
411401 bind (Ch , SQ , Topic ),
412402
413- NumMsgs = 2000 ,
414- C = connect (? FUNCTION_NAME , Config , [{retry_interval , 2 }]),
415- lists :foreach (fun (N ) ->
416- case emqtt :publish (C , Topic , integer_to_binary (N ), QoS ) of
417- ok ->
418- ok ;
419- {ok , _ } ->
420- ok ;
421- Other ->
422- ct :fail (" Failed to publish: ~p " , [Other ])
423- end
424- end , lists :seq (1 , NumMsgs )),
425-
403+ NumMsgs = 1000 ,
404+ C = connect (? FUNCTION_NAME , Config , [{max_inflight , 200 },
405+ {retry_interval , 2 }]),
406+ Self = self (),
407+ lists :foreach (
408+ fun (N ) ->
409+ % % Publish async all messages at once to trigger flow control
410+ ok = emqtt :publish_async (C , Topic , integer_to_binary (N ), QoS ,
411+ {fun (N0 , {ok , #{reason_code_name := success }}) ->
412+ Self ! {self (), N0 };
413+ (N0 , ok ) ->
414+ Self ! {self (), N0 }
415+ end , [N ]})
416+ end , lists :seq (1 , NumMsgs )),
417+ ok = await_confirms_ordered (C , 1 , NumMsgs ),
426418 eventually (? _assert (
427419 begin
428420 L = rabbitmqctl_list (Config , 0 , [" list_queues" , " messages" , " --no-table-headers" ]),
@@ -439,7 +431,7 @@ publish_to_all_queue_types(Config, QoS) ->
439431 N < NumMsgs * 2
440432 end
441433 end , L )
442- end ), 2000 , 10 ),
434+ end ), 1000 , 20 ),
443435
444436 delete_queue (Ch , [CQ , QQ , SQ ]),
445437 ok = emqtt :disconnect (C ),
@@ -1126,13 +1118,26 @@ amqp_to_mqtt_qos0(Config) ->
11261118% % Test that the server wraps around the packet identifier.
11271119many_qos1_messages (Config ) ->
11281120 Topic = ClientId = atom_to_binary (? FUNCTION_NAME ),
1129- C = connect (ClientId , Config , 0 , [{retry_interval , 600 }]),
1130- {ok , _ , [1 ]} = emqtt :subscribe (C , {Topic , qos1 }),
11311121 NumMsgs = 16#ffff + 100 ,
1122+ C = connect (ClientId , Config , 0 , [{retry_interval , 600 },
1123+ {max_inflight , NumMsgs div 8 }]),
1124+ {ok , _ , [1 ]} = emqtt :subscribe (C , {Topic , qos1 }),
11321125 Payloads = lists :map (fun integer_to_binary /1 , lists :seq (1 , NumMsgs )),
1126+ Self = self (),
1127+ Target = lists :last (Payloads ),
11331128 lists :foreach (fun (P ) ->
1134- {ok , _ } = emqtt :publish (C , Topic , P , qos1 )
1129+ Cb = {fun (T , _ ) when T == Target ->
1130+ Self ! proceed ;
1131+ (_ , _ ) ->
1132+ ok
1133+ end , [P ]},
1134+ ok = emqtt :publish_async (C , Topic , P , qos1 , Cb )
11351135 end , Payloads ),
1136+ receive
1137+ proceed -> ok
1138+ after 30000 ->
1139+ ct :fail (" message to proceed never received" )
1140+ end ,
11361141 ok = expect_publishes (C , Topic , Payloads ),
11371142 ok = emqtt :disconnect (C ).
11381143
@@ -1453,7 +1458,7 @@ block(Config) ->
14531458block_only_publisher (Config ) ->
14541459 Topic = atom_to_binary (? FUNCTION_NAME ),
14551460
1456- Opts = [{ack_timeout , 2 }],
1461+ Opts = [{ack_timeout , 1 }],
14571462 Con = connect (<<" background-connection" >>, Config , Opts ),
14581463 Sub = connect (<<" subscriber-connection" >>, Config , Opts ),
14591464 Pub = connect (<<" publisher-connection" >>, Config , Opts ),
0 commit comments