@@ -197,6 +197,7 @@ all_tests() ->
197197 requeue_multiple_true ,
198198 requeue_multiple_false ,
199199 subscribe_from_each ,
200+ dont_leak_file_handles ,
200201 leader_health_check
201202 ].
202203
@@ -1629,6 +1630,54 @@ subscribe_from_each(Config) ->
16291630
16301631 ok .
16311632
1633+ dont_leak_file_handles (Config ) ->
1634+
1635+ [Server0 | _ ] = Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1636+
1637+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1638+ # 'confirm.select_ok' {} = amqp_channel :call (Ch , # 'confirm.select' {}),
1639+ QQ = ? config (queue_name , Config ),
1640+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1641+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1642+ [begin
1643+ publish_confirm (Ch , QQ )
1644+ end || _ <- Servers ],
1645+ timer :sleep (100 ),
1646+ % % roll the wal to force consumer messages to be read from disk
1647+ [begin
1648+ ok = rpc :call (S , ra_log_wal , force_roll_over , [ra_log_wal ])
1649+ end || S <- Servers ],
1650+ timer :sleep (256 ),
1651+
1652+ C = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1653+ [_ , NCh1 ] = rpc :call (Server0 , rabbit_channel , list , []),
1654+ qos (C , 1 , false ),
1655+ subscribe (C , QQ , false ),
1656+ [begin
1657+ receive
1658+ {# 'basic.deliver' {delivery_tag = DeliveryTag }, _ } ->
1659+ amqp_channel :call (C , # 'basic.ack' {delivery_tag = DeliveryTag })
1660+ after 5000 ->
1661+ flush (1 ),
1662+ ct :fail (" basic.deliver timeout" )
1663+ end
1664+ end || _ <- Servers ],
1665+ flush (1 ),
1666+ [{_ , MonBy2 }] = rpc :call (Server0 , erlang , process_info , [NCh1 , [monitored_by ]]),
1667+ NumMonRefsBefore = length ([M || M <- MonBy2 , is_reference (M )]),
1668+ % % delete queue
1669+ ? assertMatch (# 'queue.delete_ok' {},
1670+ amqp_channel :call (Ch , # 'queue.delete' {queue = QQ })),
1671+ [{_ , MonBy3 }] = rpc :call (Server0 , erlang , process_info , [NCh1 , [monitored_by ]]),
1672+ NumMonRefsAfter = length ([M || M <- MonBy3 , is_reference (M )]),
1673+ % % this isn't an ideal way to assert this but every file handle creates
1674+ % % a monitor that (currenlty?) is a reference so we assert that we have
1675+ % % fewer reference monitors after
1676+ ? assert (NumMonRefsAfter < NumMonRefsBefore ),
1677+
1678+ rabbit_ct_client_helpers :close_channel (C ),
1679+ ok .
1680+
16321681gh_12635 (Config ) ->
16331682 % https://github.com/rabbitmq/rabbitmq-server/issues/12635
16341683 [Server0 , _Server1 , Server2 ] =
@@ -4946,3 +4995,7 @@ ensure_qq_proc_dead(Config, Server, RaName) ->
49464995 ensure_qq_proc_dead (Config , Server , RaName )
49474996 end .
49484997
4998+ lsof_rpc () ->
4999+ Cmd = rabbit_misc :format (
5000+ " lsof -p ~ts " , [os :getpid ()]),
5001+ os :cmd (Cmd ).
0 commit comments