diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index e9df2b1a522f..f00fb1ad6111 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -14,6 +14,7 @@ -export([ init/1, init/2, + close/1, checkout/4, cancel_checkout/3, enqueue/3, @@ -755,6 +756,13 @@ handle_ra_event(QName, Leader, close_cached_segments, handle_ra_event(_QName, _Leader, {machine, eol}, State) -> {eol, [{unblock, cluster_name(State)}]}. +-spec close(rabbit_fifo_client:state()) -> ok. +close(#state{cached_segments = undefined}) -> + ok; +close(#state{cached_segments = {_, _, Flru}}) -> + _ = ra_flru:evict_all(Flru), + ok. + %% @doc Attempts to enqueue a message using cast semantics. This provides no %% guarantees or retries if the message fails to achieve consensus or if the %% servers sent to happens not to be available. If the message is sent to a diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 4ddf31780538..a9d09bdb4f50 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -414,7 +414,9 @@ remove(QRef, #?STATE{ctxs = Ctxs0} = State) -> case maps:take(QRef, Ctxs0) of error -> State; - {_, Ctxs} -> + {#ctx{module = Mod, + state = S}, Ctxs} -> + ok = Mod:close(S), State#?STATE{ctxs = Ctxs} end. @@ -502,11 +504,10 @@ init() -> -spec close(state()) -> ok. close(#?STATE{ctxs = Contexts}) -> - maps:foreach( - fun (_, #ctx{module = Mod, - state = S}) -> - ok = Mod:close(S) - end, Contexts). + maps:foreach(fun (_, #ctx{module = Mod, + state = S}) -> + ok = Mod:close(S) + end, Contexts). -spec new(amqqueue:amqqueue(), state()) -> state(). new(Q, State) when ?is_amqqueue(Q) -> diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 4edef33a5bf5..c2758af70784 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -143,7 +143,6 @@ -define(RPC_TIMEOUT, 1000). -define(START_CLUSTER_TIMEOUT, 5000). -define(START_CLUSTER_RPC_TIMEOUT, 60_000). %% needs to be longer than START_CLUSTER_TIMEOUT --define(FORCE_CHECKPOINT_RPC_TIMEOUT, 15_000). -define(TICK_INTERVAL, 5000). %% the ra server tick time -define(DELETE_TIMEOUT, 5000). -define(MEMBER_CHANGE_TIMEOUT, 20_000). @@ -214,8 +213,8 @@ init(Q) when ?is_amqqueue(Q) -> {ok, rabbit_fifo_client:init(Servers, SoftLimit)}. -spec close(rabbit_fifo_client:state()) -> ok. -close(_State) -> - ok. +close(State) -> + rabbit_fifo_client:close(State). -spec update(amqqueue:amqqueue(), rabbit_fifo_client:state()) -> rabbit_fifo_client:state(). diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index a9164fb99a4e..2087f0f11a6a 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -197,6 +197,7 @@ all_tests() -> requeue_multiple_true, requeue_multiple_false, subscribe_from_each, + dont_leak_file_handles, leader_health_check ]. @@ -1641,6 +1642,54 @@ subscribe_from_each(Config) -> ok. +dont_leak_file_handles(Config) -> + + [Server0 | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + [begin + publish_confirm(Ch, QQ) + end || _ <- Servers], + timer:sleep(100), + %% roll the wal to force consumer messages to be read from disk + [begin + ok = rpc:call(S, ra_log_wal, force_roll_over, [ra_log_wal]) + end || S <- Servers], + timer:sleep(256), + + C = rabbit_ct_client_helpers:open_channel(Config, Server0), + [_, NCh1] = rpc:call(Server0, rabbit_channel, list, []), + qos(C, 1, false), + subscribe(C, QQ, false), + [begin + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + amqp_channel:call(C, #'basic.ack'{delivery_tag = DeliveryTag}) + after 5000 -> + flush(1), + ct:fail("basic.deliver timeout") + end + end || _ <- Servers], + flush(1), + [{_, MonBy2}] = rpc:call(Server0, erlang, process_info, [NCh1, [monitored_by]]), + NumMonRefsBefore = length([M || M <- MonBy2, is_reference(M)]), + %% delete queue + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = QQ})), + [{_, MonBy3}] = rpc:call(Server0, erlang, process_info, [NCh1, [monitored_by]]), + NumMonRefsAfter = length([M || M <- MonBy3, is_reference(M)]), + %% this isn't an ideal way to assert this but every file handle creates + %% a monitor that (currenlty?) is a reference so we assert that we have + %% fewer reference monitors after + ?assert(NumMonRefsAfter < NumMonRefsBefore), + + rabbit_ct_client_helpers:close_channel(C), + ok. + gh_12635(Config) -> % https://github.com/rabbitmq/rabbitmq-server/issues/12635 [Server0, _Server1, Server2] = @@ -4949,3 +4998,7 @@ ensure_qq_proc_dead(Config, Server, RaName) -> ensure_qq_proc_dead(Config, Server, RaName) end. +lsof_rpc() -> + Cmd = rabbit_misc:format( + "lsof -p ~ts", [os:getpid()]), + os:cmd(Cmd).