@@ -143,6 +143,7 @@ all_tests_3() ->
143143 consume_credit_out_of_order_ack ,
144144 consume_credit_multiple_ack ,
145145 basic_cancel ,
146+ consumer_metrics_cleaned_on_connection_close ,
146147 receive_basic_cancel_on_queue_deletion ,
147148 keep_consuming_on_leader_restart ,
148149 max_length_bytes ,
@@ -1184,6 +1185,33 @@ basic_cancel(Config) ->
11841185 end ,
11851186 rabbit_ct_broker_helpers :rpc (Config , 0 , ? MODULE , delete_testcase_queue , [Q ]).
11861187
1188+ consumer_metrics_cleaned_on_connection_close (Config ) ->
1189+ [Server | _ ] = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1190+
1191+ Q = ? config (queue_name , Config ),
1192+ ? assertEqual ({'queue.declare_ok' , Q , 0 , 0 },
1193+ declare (Config , Server , Q , [{<<" x-queue-type" >>, longstr , <<" stream" >>}])),
1194+
1195+ Conn = rabbit_ct_client_helpers :open_connection (Config , Server ),
1196+ {ok , Ch } = amqp_connection :open_channel (Conn ),
1197+ qos (Ch , 10 , false ),
1198+ CTag = <<" consumer_metrics_cleaned_on_connection_close" >>,
1199+ subscribe (Ch , Q , false , 0 , CTag ),
1200+ rabbit_ct_helpers :await_condition (
1201+ fun () ->
1202+ 1 == length (filter_consumers (Config , Server , CTag ))
1203+ end , 30000 ),
1204+
1205+ ok = rabbit_ct_client_helpers :close_connection (Conn ),
1206+
1207+ rabbit_ct_helpers :await_condition (
1208+ fun () ->
1209+ 0 == length (filter_consumers (Config , Server , CTag ))
1210+ end , 30000 ),
1211+
1212+ rabbit_ct_broker_helpers :rpc (Config , 0 , ? MODULE , delete_testcase_queue , [Q ]).
1213+
1214+
11871215receive_basic_cancel_on_queue_deletion (Config ) ->
11881216 [Server | _ ] = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
11891217
0 commit comments