Skip to content

Commit da43bca

Browse files
committed
QQ/Streams: Ensure open file handles are closed when a queue is deleted.
Whilst a session or channel has one open.
1 parent 8806e56 commit da43bca

File tree

3 files changed

+17
-9
lines changed

3 files changed

+17
-9
lines changed

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
-export([
1515
init/1,
1616
init/2,
17+
close/1,
1718
checkout/4,
1819
cancel_checkout/3,
1920
enqueue/3,
@@ -755,6 +756,13 @@ handle_ra_event(QName, Leader, close_cached_segments,
755756
handle_ra_event(_QName, _Leader, {machine, eol}, State) ->
756757
{eol, [{unblock, cluster_name(State)}]}.
757758

759+
-spec close(rabbit_fifo_client:state()) -> ok.
760+
close(#state{cached_segments = undefined}) ->
761+
ok;
762+
close(#state{cached_segments = {_, _, Flru}}) ->
763+
_ = ra_flru:evict_all(Flru),
764+
ok.
765+
758766
%% @doc Attempts to enqueue a message using cast semantics. This provides no
759767
%% guarantees or retries if the message fails to achieve consensus or if the
760768
%% servers sent to happens not to be available. If the message is sent to a

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,9 @@ remove(QRef, #?STATE{ctxs = Ctxs0} = State) ->
407407
case maps:take(QRef, Ctxs0) of
408408
error ->
409409
State;
410-
{_, Ctxs} ->
410+
{#ctx{module = Mod,
411+
state = S}, Ctxs} ->
412+
ok = Mod:close(S),
411413
State#?STATE{ctxs = Ctxs}
412414
end.
413415

@@ -495,11 +497,10 @@ init() ->
495497

496498
-spec close(state()) -> ok.
497499
close(#?STATE{ctxs = Contexts}) ->
498-
maps:foreach(
499-
fun (_, #ctx{module = Mod,
500-
state = S}) ->
501-
ok = Mod:close(S)
502-
end, Contexts).
500+
maps:foreach(fun (_, #ctx{module = Mod,
501+
state = S}) ->
502+
ok = Mod:close(S)
503+
end, Contexts).
503504

504505
-spec new(amqqueue:amqqueue(), state()) -> state().
505506
new(Q, State) when ?is_amqqueue(Q) ->

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,6 @@
159159
-define(RPC_TIMEOUT, 1000).
160160
-define(START_CLUSTER_TIMEOUT, 5000).
161161
-define(START_CLUSTER_RPC_TIMEOUT, 60_000). %% needs to be longer than START_CLUSTER_TIMEOUT
162-
-define(FORCE_CHECKPOINT_RPC_TIMEOUT, 15_000).
163162
-define(TICK_INTERVAL, 5000). %% the ra server tick time
164163
-define(DELETE_TIMEOUT, 5000).
165164
-define(MEMBER_CHANGE_TIMEOUT, 20_000).
@@ -230,8 +229,8 @@ init(Q) when ?is_amqqueue(Q) ->
230229
{ok, rabbit_fifo_client:init(Servers, SoftLimit)}.
231230

232231
-spec close(rabbit_fifo_client:state()) -> ok.
233-
close(_State) ->
234-
ok.
232+
close(State) ->
233+
rabbit_fifo_client:close(State).
235234

236235
-spec update(amqqueue:amqqueue(), rabbit_fifo_client:state()) ->
237236
rabbit_fifo_client:state().

0 commit comments

Comments
 (0)