2424
2525-import (rabbit_ct_broker_helpers ,
2626 [rabbitmqctl_list /3 ,
27+ rabbitmqctl /3 ,
2728 rpc /4 ,
2829 rpc /5 ,
2930 rpc_all /4 ,
@@ -125,6 +126,9 @@ cluster_size_1_tests() ->
125126 ,retained_message_conversion
126127 ,bind_exchange_to_exchange
127128 ,bind_exchange_to_exchange_single_message
129+ ,notify_consumer_classic_queue_deleted
130+ ,notify_consumer_quorum_queue_deleted
131+ ,notify_consumer_qos0_queue_deleted
128132 ].
129133
130134cluster_size_3_tests () ->
@@ -167,8 +171,8 @@ init_per_suite(Config) ->
167171end_per_suite (Config ) ->
168172 rabbit_ct_helpers :run_teardown_steps (Config ).
169173
170- init_per_group (mqtt , Config ) ->
171- rabbit_ct_helpers :set_config (Config , {websocket , false });
174+ init_per_group (mqtt , Config0 ) ->
175+ rabbit_ct_helpers :set_config (Config0 , {websocket , false });
172176init_per_group (Group , Config )
173177 when Group =:= v3 ;
174178 Group =:= v4 ;
@@ -205,6 +209,13 @@ init_per_testcase(T, Config)
205209 T =:= management_plugin_enable ->
206210 inets :start (),
207211 init_per_testcase0 (T , Config );
212+ init_per_testcase (T , Config )
213+ when T =:= clean_session_disconnect_client ;
214+ T =:= clean_session_node_restart ;
215+ T =:= clean_session_node_kill ;
216+ T =:= notify_consumer_qos0_queue_deleted ->
217+ ok = rpc (Config , rabbit_registry , register , [queue , <<" qos0" >>, rabbit_mqtt_qos0_queue ]),
218+ init_per_testcase0 (T , Config );
208219init_per_testcase (Testcase , Config ) ->
209220 init_per_testcase0 (Testcase , Config ).
210221
@@ -216,6 +227,13 @@ end_per_testcase(T, Config)
216227 T =:= management_plugin_enable ->
217228 ok = inets :stop (),
218229 end_per_testcase0 (T , Config );
230+ end_per_testcase (T , Config )
231+ when T =:= clean_session_disconnect_client ;
232+ T =:= clean_session_node_restart ;
233+ T =:= clean_session_node_kill ;
234+ T =:= notify_consumer_qos0_queue_deleted ->
235+ ok = rpc (Config , rabbit_registry , unregister , [queue , <<" qos0" >>]),
236+ end_per_testcase0 (T , Config );
219237end_per_testcase (Testcase , Config ) ->
220238 end_per_testcase0 (Testcase , Config ).
221239
@@ -307,23 +325,22 @@ will_without_disconnect(Config) ->
307325% % Test that an MQTT connection decodes the AMQP 0.9.1 'P_basic' properties.
308326% % see https://github.com/rabbitmq/rabbitmq-server/discussions/8252
309327decode_basic_properties (Config ) ->
310- App = rabbitmq_mqtt ,
311- Par = durable_queue_type ,
312- ok = rpc (Config , application , set_env , [App , Par , quorum ]),
328+ set_durable_queue_type (Config ),
313329 ClientId = Topic = Payload = atom_to_binary (? FUNCTION_NAME ),
314330 C1 = connect (ClientId , Config , non_clean_sess_opts ()),
315331 {ok , _ , [1 ]} = emqtt :subscribe (C1 , Topic , qos1 ),
316332 QuorumQueues = rpc (Config , rabbit_amqqueue , list_by_type , [rabbit_quorum_queue ]),
317333 ? assertEqual (1 , length (QuorumQueues )),
318- Ch = rabbit_ct_client_helpers :open_channel (Config ),
334+ { Conn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config ),
319335 amqp_channel :call (Ch , # 'basic.publish' {exchange = <<" amq.topic" >>,
320336 routing_key = Topic },
321337 # amqp_msg {payload = Payload }),
322338 ok = expect_publishes (C1 , Topic , [Payload ]),
323339 ok = emqtt :disconnect (C1 ),
324340 C2 = connect (ClientId , Config , [{clean_start , true }]),
325341 ok = emqtt :disconnect (C2 ),
326- ok = rpc (Config , application , unset_env , [App , Par ]).
342+ unset_durable_queue_type (Config ),
343+ ok = rabbit_ct_client_helpers :close_connection_and_channel (Conn , Ch ).
327344
328345quorum_queue_rejects (Config ) ->
329346 {_Conn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config ),
@@ -1906,6 +1923,35 @@ bind_exchange_to_exchange_single_message(Config) ->
19061923 amqp_channel :call (Ch , # 'queue.delete' {queue = Q })),
19071924 ok = emqtt :disconnect (C ).
19081925
1926+ notify_consumer_qos0_queue_deleted (Config ) ->
1927+ Topic = atom_to_binary (? FUNCTION_NAME ),
1928+ notify_consumer_queue_deleted (Config , Topic , <<" MQTT QoS 0" >>, [{retry_interval , 1 }], qos0 ).
1929+
1930+ notify_consumer_classic_queue_deleted (Config ) ->
1931+ Topic = atom_to_binary (? FUNCTION_NAME ),
1932+ notify_consumer_queue_deleted (Config , Topic , <<" classic" >>, non_clean_sess_opts (), qos0 ).
1933+
1934+ notify_consumer_quorum_queue_deleted (Config ) ->
1935+ set_durable_queue_type (Config ),
1936+ Topic = atom_to_binary (? FUNCTION_NAME ),
1937+ notify_consumer_queue_deleted (Config , Topic , <<" quorum" >>, non_clean_sess_opts (), qos1 ),
1938+ unset_durable_queue_type (Config ).
1939+
1940+ notify_consumer_queue_deleted (Config , Name = Topic , ExpectedType , ConnOpts , Qos ) ->
1941+ C = connect (Name , Config , ConnOpts ),
1942+ {ok , _ , _ } = emqtt :subscribe (C , Topic , Qos ),
1943+ {ok , #{reason_code_name := success }} = emqtt :publish (C , Name , <<" m1" >>, qos1 ),
1944+ {ok , #{reason_code_name := success }} = emqtt :publish (C , Name , <<" m2" >>, qos1 ),
1945+ ok = expect_publishes (C , Topic , [<<" m1" >>, <<" m2" >>]),
1946+
1947+ [[QName , Type ]] = rabbitmqctl_list (Config , 0 , [" list_queues" , " name" , " type" , " --no-table-headers" ]),
1948+ ? assertMatch (ExpectedType , Type ),
1949+
1950+ process_flag (trap_exit , true ),
1951+ {ok , _ } = rabbitmqctl (Config , 0 , [" delete_queue" , QName ]),
1952+
1953+ await_exit (C ).
1954+
19091955% % -------------------------------------------------------------------
19101956% % Internal helpers
19111957% % -------------------------------------------------------------------
@@ -1936,7 +1982,7 @@ await_confirms_unordered(From, Left) ->
19361982 end .
19371983
19381984await_consumer_count (ConsumerCount , ClientId , QoS , Config ) ->
1939- Ch = rabbit_ct_client_helpers :open_channel (Config ),
1985+ { _Conn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config ),
19401986 QueueName = rabbit_mqtt_util :queue_name_bin (
19411987 rabbit_data_coercion :to_binary (ClientId ), QoS ),
19421988 eventually (
@@ -1981,3 +2027,9 @@ assert_v5_disconnect_reason_code(Config, ReasonCode) ->
19812027 after ? TIMEOUT -> ct :fail (" missing DISCONNECT packet from server" )
19822028 end
19832029 end .
2030+
2031+ set_durable_queue_type (Config ) ->
2032+ ok = rpc (Config , application , set_env , [rabbitmq_mqtt , durable_queue_type , quorum ]).
2033+
2034+ unset_durable_queue_type (Config ) ->
2035+ ok = rpc (Config , application , unset_env , [rabbitmq_mqtt , durable_queue_type ]).
0 commit comments