Skip to content

Commit 02b8234

Browse files
Merge pull request #2001 from rabbitmq/rabbitmq-server-2000
Move check for active readers to message store GC action function (cherry picked from commit 5c80bea) Conflicts: src/rabbit_msg_store.erl test/queue_parallel_SUITE.erl
1 parent 3371e14 commit 02b8234

File tree

4 files changed

+60
-36
lines changed

4 files changed

+60
-36
lines changed

src/rabbit_msg_store.erl

Lines changed: 51 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
client_ref/1, close_all_indicated/1,
2424
write/3, write_flow/3, read/2, contains/2, remove/2]).
2525

26-
-export([set_maximum_since_use/2, has_readers/2, combine_files/3,
26+
-export([set_maximum_since_use/2, combine_files/3,
2727
delete_file/2]). %% internal
2828

2929
-export([transform_dir/3, force_recovery/2]). %% upgrade
@@ -192,10 +192,7 @@
192192
-spec remove([rabbit_types:msg_id()], client_msstate()) -> 'ok'.
193193

194194
-spec set_maximum_since_use(server(), non_neg_integer()) -> 'ok'.
195-
-spec has_readers(non_neg_integer(), gc_state()) -> boolean().
196-
-spec combine_files(non_neg_integer(), non_neg_integer(), gc_state()) ->
197-
deletion_thunk().
198-
-spec delete_file(non_neg_integer(), gc_state()) -> deletion_thunk().
195+
199196
-spec force_recovery(file:filename(), server()) -> 'ok'.
200197
-spec transform_dir(file:filename(), server(),
201198
fun ((any()) -> (rabbit_types:ok_or_error2(msg(), any())))) -> 'ok'.
@@ -1965,28 +1962,48 @@ cleanup_after_file_deletion(File,
19651962
%% garbage collection / compaction / aggregation -- external
19661963
%%----------------------------------------------------------------------------
19671964

1968-
has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) ->
1969-
[#file_summary { locked = true, readers = Count }] =
1970-
ets:lookup(FileSummaryEts, File),
1971-
Count /= 0.
1965+
-spec combine_files(non_neg_integer(), non_neg_integer(), gc_state()) ->
1966+
{ok, deletion_thunk()} | {defer, non_neg_integer()}.
19721967

19731968
combine_files(Source, Destination,
1974-
State = #gc_state { file_summary_ets = FileSummaryEts,
1975-
file_handles_ets = FileHandlesEts,
1976-
dir = Dir,
1977-
msg_store = Server }) ->
1978-
[#file_summary {
1969+
State = #gc_state { file_summary_ets = FileSummaryEts }) ->
1970+
[#file_summary{locked = true} = SourceSummary] =
1971+
ets:lookup(FileSummaryEts, Source),
1972+
1973+
[#file_summary{locked = true} = DestinationSummary] =
1974+
ets:lookup(FileSummaryEts, Destination),
1975+
1976+
case {SourceSummary, DestinationSummary} of
1977+
{#file_summary{readers = 0}, #file_summary{readers = 0}} ->
1978+
{ok, do_combine_files(SourceSummary, DestinationSummary,
1979+
Source, Destination, State)};
1980+
_ ->
1981+
rabbit_log:debug("Asked to combine files ~p and ~p but they have active readers. Deferring.",
1982+
[Source, Destination]),
1983+
DeferredFiles = [FileSummary#file_summary.file
1984+
|| FileSummary <- [SourceSummary, DestinationSummary],
1985+
FileSummary#file_summary.readers /= 0],
1986+
{defer, DeferredFiles}
1987+
end.
1988+
1989+
do_combine_files(SourceSummary, DestinationSummary,
1990+
Source, Destination,
1991+
State = #gc_state { file_summary_ets = FileSummaryEts,
1992+
file_handles_ets = FileHandlesEts,
1993+
dir = Dir,
1994+
msg_store = Server }) ->
1995+
#file_summary {
19791996
readers = 0,
19801997
left = Destination,
19811998
valid_total_size = SourceValid,
19821999
file_size = SourceFileSize,
1983-
locked = true }] = ets:lookup(FileSummaryEts, Source),
1984-
[#file_summary {
2000+
locked = true } = SourceSummary,
2001+
#file_summary {
19852002
readers = 0,
19862003
right = Source,
19872004
valid_total_size = DestinationValid,
19882005
file_size = DestinationFileSize,
1989-
locked = true }] = ets:lookup(FileSummaryEts, Destination),
2006+
locked = true } = DestinationSummary,
19902007

19912008
SourceName = filenum_to_name(Source),
19922009
DestinationName = filenum_to_name(Destination),
@@ -2043,20 +2060,30 @@ combine_files(Source, Destination,
20432060
{#file_summary.file_size, TotalValidData}]),
20442061

20452062
Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData,
2063+
rabbit_log:debug("Combined segment files number ~p (source) and ~p (destination), reclaimed ~p bytes",
2064+
[Source, Destination, Reclaimed]),
20462065
gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}),
20472066
safe_file_delete_fun(Source, Dir, FileHandlesEts).
20482067

2068+
-spec delete_file(non_neg_integer(), gc_state()) -> {ok, deletion_thunk()} | {defer, non_neg_integer()}.
2069+
20492070
delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
20502071
file_handles_ets = FileHandlesEts,
20512072
dir = Dir,
20522073
msg_store = Server }) ->
2053-
[#file_summary { valid_total_size = 0,
2054-
locked = true,
2055-
file_size = FileSize,
2056-
readers = 0 }] = ets:lookup(FileSummaryEts, File),
2057-
{[], 0} = load_and_vacuum_message_file(File, State),
2058-
gen_server2:cast(Server, {delete_file, File, FileSize}),
2059-
safe_file_delete_fun(File, Dir, FileHandlesEts).
2074+
case ets:lookup(FileSummaryEts, File) of
2075+
[#file_summary { valid_total_size = 0,
2076+
locked = true,
2077+
file_size = FileSize,
2078+
readers = 0 }] ->
2079+
{[], 0} = load_and_vacuum_message_file(File, State),
2080+
gen_server2:cast(Server, {delete_file, File, FileSize}),
2081+
{ok, safe_file_delete_fun(File, Dir, FileHandlesEts)};
2082+
[#file_summary{readers = Readers}] when Readers > 0 ->
2083+
rabbit_log:debug("Asked to delete file ~p but it has active readers. Deferring.",
2084+
[File]),
2085+
{defer, [File]}
2086+
end.
20602087

20612088
load_and_vacuum_message_file(File, State = #gc_state { dir = Dir }) ->
20622089
%% Messages here will be end-of-file at start-of-list

src/rabbit_msg_store_gc.erl

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -116,15 +116,13 @@ attempt_action(Action, Files,
116116
State = #state { pending_no_readers = Pending,
117117
on_action = Thunks,
118118
msg_store_state = MsgStoreState }) ->
119-
case [File || File <- Files,
120-
rabbit_msg_store:has_readers(File, MsgStoreState)] of
121-
[] -> State #state {
122-
on_action = lists:filter(
123-
fun (Thunk) -> not Thunk() end,
124-
[do_action(Action, Files, MsgStoreState) |
125-
Thunks]) };
126-
[File | _] -> Pending1 = maps:put(File, {Action, Files}, Pending),
127-
State #state { pending_no_readers = Pending1 }
119+
case do_action(Action, Files, MsgStoreState) of
120+
{ok, OkThunk} ->
121+
State#state{on_action = lists:filter(fun (Thunk) -> not Thunk() end,
122+
[OkThunk | Thunks])};
123+
{defer, [File | _]} ->
124+
Pending1 = maps:put(File, {Action, Files}, Pending),
125+
State #state { pending_no_readers = Pending1 }
128126
end.
129127

130128
do_action(combine, [Source, Destination], MsgStoreState) ->

test/confirms_rejects_SUITE.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ init_per_group(Group, Config) ->
3636
rabbit_ct_broker_helpers:setup_steps() ++
3737
rabbit_ct_client_helpers:setup_steps()).
3838

39-
end_per_group(Group, Config) ->
39+
end_per_group(_Group, Config) ->
4040
rabbit_ct_helpers:run_steps(Config,
4141
rabbit_ct_client_helpers:teardown_steps() ++
4242
rabbit_ct_broker_helpers:teardown_steps()).
@@ -255,4 +255,4 @@ clean_acks_mailbox() ->
255255
{'basic.nack', _, _, _} -> clean_acks_mailbox()
256256
after
257257
1000 -> done
258-
end.
258+
end.

test/unit_inbroker_non_parallel_SUITE.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,6 @@ log_file_fails_to_initialise_during_startup(Config) ->
315315

316316
log_file_fails_to_initialise_during_startup1(_Config) ->
317317
[LogFile|_] = rabbit:log_locations(),
318-
Suffix = ".0",
319318

320319
%% start application with logging to directory with no
321320
%% write permissions

0 commit comments

Comments
 (0)