@@ -197,6 +197,7 @@ all_tests() ->
197197 requeue_multiple_true ,
198198 requeue_multiple_false ,
199199 subscribe_from_each ,
200+ dont_leak_file_handles ,
200201 leader_health_check
201202 ].
202203
@@ -1647,6 +1648,54 @@ subscribe_from_each(Config) ->
16471648
16481649 ok .
16491650
1651+ dont_leak_file_handles (Config ) ->
1652+
1653+ [Server0 | _ ] = Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1654+
1655+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1656+ # 'confirm.select_ok' {} = amqp_channel :call (Ch , # 'confirm.select' {}),
1657+ QQ = ? config (queue_name , Config ),
1658+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1659+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1660+ [begin
1661+ publish_confirm (Ch , QQ )
1662+ end || _ <- Servers ],
1663+ timer :sleep (100 ),
1664+ % % roll the wal to force consumer messages to be read from disk
1665+ [begin
1666+ ok = rpc :call (S , ra_log_wal , force_roll_over , [ra_log_wal ])
1667+ end || S <- Servers ],
1668+ timer :sleep (256 ),
1669+
1670+ C = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1671+ [_ , NCh1 ] = rpc :call (Server0 , rabbit_channel , list , []),
1672+ qos (C , 1 , false ),
1673+ subscribe (C , QQ , false ),
1674+ [begin
1675+ receive
1676+ {# 'basic.deliver' {delivery_tag = DeliveryTag }, _ } ->
1677+ amqp_channel :call (C , # 'basic.ack' {delivery_tag = DeliveryTag })
1678+ after 5000 ->
1679+ flush (1 ),
1680+ ct :fail (" basic.deliver timeout" )
1681+ end
1682+ end || _ <- Servers ],
1683+ flush (1 ),
1684+ [{_ , MonBy2 }] = rpc :call (Server0 , erlang , process_info , [NCh1 , [monitored_by ]]),
1685+ NumMonRefsBefore = length ([M || M <- MonBy2 , is_reference (M )]),
1686+ % % delete queue
1687+ ? assertMatch (# 'queue.delete_ok' {},
1688+ amqp_channel :call (Ch , # 'queue.delete' {queue = QQ })),
1689+ [{_ , MonBy3 }] = rpc :call (Server0 , erlang , process_info , [NCh1 , [monitored_by ]]),
1690+ NumMonRefsAfter = length ([M || M <- MonBy3 , is_reference (M )]),
1691+ % % this isn't an ideal way to assert this but every file handle creates
1692+ % % a monitor that (currenlty?) is a reference so we assert that we have
1693+ % % fewer reference monitors after
1694+ ? assert (NumMonRefsAfter < NumMonRefsBefore ),
1695+
1696+ rabbit_ct_client_helpers :close_channel (C ),
1697+ ok .
1698+
16501699gh_12635 (Config ) ->
16511700 % https://github.com/rabbitmq/rabbitmq-server/issues/12635
16521701 [Server0 , _Server1 , Server2 ] =
@@ -4964,3 +5013,7 @@ ensure_qq_proc_dead(Config, Server, RaName) ->
49645013 ensure_qq_proc_dead (Config , Server , RaName )
49655014 end .
49665015
5016+ lsof_rpc () ->
5017+ Cmd = rabbit_misc :format (
5018+ " lsof -p ~ts " , [os :getpid ()]),
5019+ os :cmd (Cmd ).
0 commit comments