@@ -109,6 +109,7 @@ subgroups() ->
109109 flow_quorum_queue ,
110110 flow_stream ,
111111 rabbit_mqtt_qos0_queue ,
112+ rabbit_mqtt_qos0_queue_kill_node ,
112113 cli_list_queues ,
113114 maintenance ,
114115 delete_create_queue ,
@@ -1019,6 +1020,40 @@ rabbit_mqtt_qos0_queue(Config) ->
10191020 ok = emqtt :disconnect (Sub ),
10201021 ok = emqtt :disconnect (Pub ).
10211022
1023+ rabbit_mqtt_qos0_queue_kill_node (Config ) ->
1024+ Topic = atom_to_binary (? FUNCTION_NAME ),
1025+ Pub = connect (<<" publisher" >>, Config , 2 , []),
1026+
1027+ SubscriberId = <<" subscriber" >>,
1028+ Sub0 = connect (SubscriberId , Config , 0 , []),
1029+ {ok , _ , [0 ]} = emqtt :subscribe (Sub0 , Topic , qos0 ),
1030+ ok = emqtt :publish (Pub , Topic , <<" m0" >>, qos0 ),
1031+ ok = expect_publishes (Sub0 , Topic , [<<" m0" >>]),
1032+
1033+ process_flag (trap_exit , true ),
1034+ ok = rabbit_ct_broker_helpers :kill_node (Config , 0 ),
1035+ % % Wait a bit to ensure that Mnesia deletes the queue record on nodes 1 and 2 from Mnesia
1036+ % % table rabbit_queue (but the queue record is still present in rabbit_durable_queue).
1037+ timer :sleep (500 ),
1038+ Sub1 = connect (SubscriberId , Config , 1 , []),
1039+ {ok , _ , [0 ]} = emqtt :subscribe (Sub1 , Topic , qos0 ),
1040+ ok = emqtt :publish (Pub , Topic , <<" m1" >>, qos0 ),
1041+ ok = expect_publishes (Sub1 , Topic , [<<" m1" >>]),
1042+
1043+ % % Start node 0 to have a majority for Khepri.
1044+ ok = rabbit_ct_broker_helpers :start_node (Config , 0 ),
1045+ ok = rabbit_ct_broker_helpers :kill_node (Config , 1 ),
1046+ % % This time, do not wait. Mnesia will contain the queue record in rabbit_durable_queue,
1047+ % % but this time Mnesia may or may not contain the queue record in rabbit_queue.
1048+ Sub2 = connect (SubscriberId , Config , 2 , []),
1049+ {ok , _ , [0 ]} = emqtt :subscribe (Sub2 , Topic , qos0 ),
1050+ ok = emqtt :publish (Pub , Topic , <<" m2" >>, qos0 ),
1051+ ok = expect_publishes (Sub2 , Topic , [<<" m2" >>]),
1052+
1053+ ok = emqtt :disconnect (Sub2 ),
1054+ ok = emqtt :disconnect (Pub ),
1055+ ok = rabbit_ct_broker_helpers :start_node (Config , 1 ).
1056+
10221057% % Test that MQTT connection can be listed and closed via the rabbitmq_management plugin.
10231058management_plugin_connection (Config ) ->
10241059 KeepaliveSecs = 99 ,
0 commit comments