Skip to content

Commit 5c80bea

Browse files
Merge pull request #2001 from rabbitmq/rabbitmq-server-2000
Move check for active readers to message store GC action function
2 parents 43dfd50 + a8deb75 commit 5c80bea

File tree

3 files changed

+80
-36
lines changed

3 files changed

+80
-36
lines changed

src/rabbit_msg_store.erl

Lines changed: 48 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
client_ref/1, close_all_indicated/1,
2424
write/3, write_flow/3, read/2, contains/2, remove/2]).
2525

26-
-export([set_maximum_since_use/2, has_readers/2, combine_files/3,
26+
-export([set_maximum_since_use/2, combine_files/3,
2727
delete_file/2]). %% internal
2828

2929
-export([transform_dir/3, force_recovery/2]). %% upgrade
@@ -1970,33 +1970,48 @@ cleanup_after_file_deletion(File,
19701970
%% garbage collection / compaction / aggregation -- external
19711971
%%----------------------------------------------------------------------------
19721972

1973-
-spec has_readers(non_neg_integer(), gc_state()) -> boolean().
1974-
1975-
has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) ->
1976-
[#file_summary { locked = true, readers = Count }] =
1977-
ets:lookup(FileSummaryEts, File),
1978-
Count /= 0.
1979-
19801973
-spec combine_files(non_neg_integer(), non_neg_integer(), gc_state()) ->
1981-
deletion_thunk().
1974+
{ok, deletion_thunk()} | {defer, non_neg_integer()}.
19821975

19831976
combine_files(Source, Destination,
1984-
State = #gc_state { file_summary_ets = FileSummaryEts,
1985-
file_handles_ets = FileHandlesEts,
1986-
dir = Dir,
1987-
msg_store = Server }) ->
1988-
[#file_summary {
1977+
State = #gc_state { file_summary_ets = FileSummaryEts }) ->
1978+
[#file_summary{locked = true} = SourceSummary] =
1979+
ets:lookup(FileSummaryEts, Source),
1980+
1981+
[#file_summary{locked = true} = DestinationSummary] =
1982+
ets:lookup(FileSummaryEts, Destination),
1983+
1984+
case {SourceSummary, DestinationSummary} of
1985+
{#file_summary{readers = 0}, #file_summary{readers = 0}} ->
1986+
{ok, do_combine_files(SourceSummary, DestinationSummary,
1987+
Source, Destination, State)};
1988+
_ ->
1989+
rabbit_log:debug("Asked to combine files ~p and ~p but they have active readers. Deferring.",
1990+
[Source, Destination]),
1991+
DeferredFiles = [FileSummary#file_summary.file
1992+
|| FileSummary <- [SourceSummary, DestinationSummary],
1993+
FileSummary#file_summary.readers /= 0],
1994+
{defer, DeferredFiles}
1995+
end.
1996+
1997+
do_combine_files(SourceSummary, DestinationSummary,
1998+
Source, Destination,
1999+
State = #gc_state { file_summary_ets = FileSummaryEts,
2000+
file_handles_ets = FileHandlesEts,
2001+
dir = Dir,
2002+
msg_store = Server }) ->
2003+
#file_summary {
19892004
readers = 0,
19902005
left = Destination,
19912006
valid_total_size = SourceValid,
19922007
file_size = SourceFileSize,
1993-
locked = true }] = ets:lookup(FileSummaryEts, Source),
1994-
[#file_summary {
2008+
locked = true } = SourceSummary,
2009+
#file_summary {
19952010
readers = 0,
19962011
right = Source,
19972012
valid_total_size = DestinationValid,
19982013
file_size = DestinationFileSize,
1999-
locked = true }] = ets:lookup(FileSummaryEts, Destination),
2014+
locked = true } = DestinationSummary,
20002015

20012016
SourceName = filenum_to_name(Source),
20022017
DestinationName = filenum_to_name(Destination),
@@ -2053,22 +2068,30 @@ combine_files(Source, Destination,
20532068
{#file_summary.file_size, TotalValidData}]),
20542069

20552070
Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData,
2071+
rabbit_log:debug("Combined segment files number ~p (source) and ~p (destination), reclaimed ~p bytes",
2072+
[Source, Destination, Reclaimed]),
20562073
gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}),
20572074
safe_file_delete_fun(Source, Dir, FileHandlesEts).
20582075

2059-
-spec delete_file(non_neg_integer(), gc_state()) -> deletion_thunk().
2076+
-spec delete_file(non_neg_integer(), gc_state()) -> {ok, deletion_thunk()} | {defer, non_neg_integer()}.
20602077

20612078
delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
20622079
file_handles_ets = FileHandlesEts,
20632080
dir = Dir,
20642081
msg_store = Server }) ->
2065-
[#file_summary { valid_total_size = 0,
2066-
locked = true,
2067-
file_size = FileSize,
2068-
readers = 0 }] = ets:lookup(FileSummaryEts, File),
2069-
{[], 0} = load_and_vacuum_message_file(File, State),
2070-
gen_server2:cast(Server, {delete_file, File, FileSize}),
2071-
safe_file_delete_fun(File, Dir, FileHandlesEts).
2082+
case ets:lookup(FileSummaryEts, File) of
2083+
[#file_summary { valid_total_size = 0,
2084+
locked = true,
2085+
file_size = FileSize,
2086+
readers = 0 }] ->
2087+
{[], 0} = load_and_vacuum_message_file(File, State),
2088+
gen_server2:cast(Server, {delete_file, File, FileSize}),
2089+
{ok, safe_file_delete_fun(File, Dir, FileHandlesEts)};
2090+
[#file_summary{readers = Readers}] when Readers > 0 ->
2091+
rabbit_log:debug("Asked to delete file ~p but it has active readers. Deferring.",
2092+
[File]),
2093+
{defer, [File]}
2094+
end.
20722095

20732096
load_and_vacuum_message_file(File, State = #gc_state { dir = Dir }) ->
20742097
%% Messages here will be end-of-file at start-of-list

src/rabbit_msg_store_gc.erl

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -119,15 +119,13 @@ attempt_action(Action, Files,
119119
State = #state { pending_no_readers = Pending,
120120
on_action = Thunks,
121121
msg_store_state = MsgStoreState }) ->
122-
case [File || File <- Files,
123-
rabbit_msg_store:has_readers(File, MsgStoreState)] of
124-
[] -> State #state {
125-
on_action = lists:filter(
126-
fun (Thunk) -> not Thunk() end,
127-
[do_action(Action, Files, MsgStoreState) |
128-
Thunks]) };
129-
[File | _] -> Pending1 = maps:put(File, {Action, Files}, Pending),
130-
State #state { pending_no_readers = Pending1 }
122+
case do_action(Action, Files, MsgStoreState) of
123+
{ok, OkThunk} ->
124+
State#state{on_action = lists:filter(fun (Thunk) -> not Thunk() end,
125+
[OkThunk | Thunks])};
126+
{defer, [File | _]} ->
127+
Pending1 = maps:put(File, {Action, Files}, Pending),
128+
State #state { pending_no_readers = Pending1 }
131129
end.
132130

133131
do_action(combine, [Source, Destination], MsgStoreState) ->

test/queue_parallel_SUITE.erl

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,10 @@ groups() ->
6060
[
6161
{parallel_tests, [],
6262
[
63-
{classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
64-
{mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
63+
{classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds,
64+
trigger_message_store_compaction]},
65+
{mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds,
66+
trigger_message_store_compaction]},
6567
{quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
6668
{quorum_queue_in_memory_limit, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
6769
{quorum_queue_in_memory_bytes, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}
@@ -327,6 +329,27 @@ subscribe_and_multiple_ack(Config) ->
327329
rabbit_ct_client_helpers:close_channel(Ch),
328330
ok.
329331

332+
trigger_message_store_compaction(Config) ->
333+
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
334+
QName = ?config(queue_name, Config),
335+
declare_queue(Ch, Config, QName),
336+
337+
N = 12000,
338+
[publish(Ch, QName, [binary:copy(<<"a">>, 5000)]) || _ <- lists:seq(1, N)],
339+
wait_for_messages(Config, [[QName, <<"12000">>, <<"12000">>, <<"0">>]]),
340+
341+
AllDTags = rabbit_ct_client_helpers:consume_without_acknowledging(Ch, QName, N),
342+
ToAck = lists:filter(fun (I) -> I > 500 andalso I < 11200 end, AllDTags),
343+
344+
[amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = Tag,
345+
multiple = false}) || Tag <- ToAck],
346+
347+
%% give compaction a moment to start in and finish
348+
timer:sleep(5000),
349+
amqp_channel:cast(Ch, #'queue.purge'{queue = QName}),
350+
rabbit_ct_client_helpers:close_channel(Ch),
351+
ok.
352+
330353
subscribe_and_requeue_multiple_nack(Config) ->
331354
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
332355
QName = ?config(queue_name, Config),

0 commit comments

Comments
 (0)