Skip to content

Commit 4b4e8a7

Browse files
committed
Move check for reader to action function for message store GC.
Message store GC postpones processing of file, which have readers. When performing an action, it asserts that there are no readers. Check for readers may race with readers update by a queue, crashing the message store. Make check and assert work with the same lookup to reduce failure rate. In case of races the queue process should handle exception instead. Addresses #2000 [#165755203]
1 parent a4033d8 commit 4b4e8a7

File tree

2 files changed

+53
-34
lines changed

2 files changed

+53
-34
lines changed

src/rabbit_msg_store.erl

Lines changed: 46 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
client_ref/1, close_all_indicated/1,
2424
write/3, write_flow/3, read/2, contains/2, remove/2]).
2525

26-
-export([set_maximum_since_use/2, has_readers/2, combine_files/3,
26+
-export([set_maximum_since_use/2, combine_files/3,
2727
delete_file/2]). %% internal
2828

2929
-export([transform_dir/3, force_recovery/2]). %% upgrade
@@ -1970,33 +1970,48 @@ cleanup_after_file_deletion(File,
19701970
%% garbage collection / compaction / aggregation -- external
19711971
%%----------------------------------------------------------------------------
19721972

1973-
-spec has_readers(non_neg_integer(), gc_state()) -> boolean().
1974-
1975-
has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) ->
1976-
[#file_summary { locked = true, readers = Count }] =
1977-
ets:lookup(FileSummaryEts, File),
1978-
Count /= 0.
1979-
19801973
-spec combine_files(non_neg_integer(), non_neg_integer(), gc_state()) ->
1981-
deletion_thunk().
1974+
{ok, deletion_thunk()} | {defer, non_neg_integer()}.
19821975

19831976
combine_files(Source, Destination,
1984-
State = #gc_state { file_summary_ets = FileSummaryEts,
1985-
file_handles_ets = FileHandlesEts,
1986-
dir = Dir,
1987-
msg_store = Server }) ->
1988-
[#file_summary {
1977+
State = #gc_state { file_summary_ets = FileSummaryEts }) ->
1978+
[#file_summary{locked = true} = SourceSummary] =
1979+
ets:lookup(FileSummaryEts, Source),
1980+
1981+
[#file_summary{locked = true} = DestinationSummary] =
1982+
ets:lookup(FileSummaryEts, Destination),
1983+
1984+
case {SourceSummary, DestinationSummary} of
1985+
{#file_summary{readers = 0}, #file_summary{readers = 0}} ->
1986+
{ok, do_combine_files(SourceSummary, DestinationSummary,
1987+
Source, Destination, State)};
1988+
_ ->
1989+
rabbit_log:error("Asked to combine files ~p and ~p, but they have readers. Deferring.",
1990+
[Source, Destination]),
1991+
DeferredFiles = [FileSummary#file_summary.file
1992+
|| FileSummary <- [SourceSummary, DestinationSummary],
1993+
FileSummary#file_summary.readers /= 0],
1994+
{defer, DeferredFiles}
1995+
end.
1996+
1997+
do_combine_files(SourceSummary, DestinationSummary,
1998+
Source, Destination,
1999+
State = #gc_state { file_summary_ets = FileSummaryEts,
2000+
file_handles_ets = FileHandlesEts,
2001+
dir = Dir,
2002+
msg_store = Server }) ->
2003+
#file_summary {
19892004
readers = 0,
19902005
left = Destination,
19912006
valid_total_size = SourceValid,
19922007
file_size = SourceFileSize,
1993-
locked = true }] = ets:lookup(FileSummaryEts, Source),
1994-
[#file_summary {
2008+
locked = true } = SourceSummary,
2009+
#file_summary {
19952010
readers = 0,
19962011
right = Source,
19972012
valid_total_size = DestinationValid,
19982013
file_size = DestinationFileSize,
1999-
locked = true }] = ets:lookup(FileSummaryEts, Destination),
2014+
locked = true } = DestinationSummary,
20002015

20012016
SourceName = filenum_to_name(Source),
20022017
DestinationName = filenum_to_name(Destination),
@@ -2056,19 +2071,25 @@ combine_files(Source, Destination,
20562071
gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}),
20572072
safe_file_delete_fun(Source, Dir, FileHandlesEts).
20582073

2059-
-spec delete_file(non_neg_integer(), gc_state()) -> deletion_thunk().
2074+
-spec delete_file(non_neg_integer(), gc_state()) -> {ok, deletion_thunk()} | {defer, non_neg_integer()}.
20602075

20612076
delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
20622077
file_handles_ets = FileHandlesEts,
20632078
dir = Dir,
20642079
msg_store = Server }) ->
2065-
[#file_summary { valid_total_size = 0,
2066-
locked = true,
2067-
file_size = FileSize,
2068-
readers = 0 }] = ets:lookup(FileSummaryEts, File),
2069-
{[], 0} = load_and_vacuum_message_file(File, State),
2070-
gen_server2:cast(Server, {delete_file, File, FileSize}),
2071-
safe_file_delete_fun(File, Dir, FileHandlesEts).
2080+
case ets:lookup(FileSummaryEts, File) of
2081+
[#file_summary { valid_total_size = 0,
2082+
locked = true,
2083+
file_size = FileSize,
2084+
readers = 0 }] ->
2085+
{[], 0} = load_and_vacuum_message_file(File, State),
2086+
gen_server2:cast(Server, {delete_file, File, FileSize}),
2087+
{ok, safe_file_delete_fun(File, Dir, FileHandlesEts)};
2088+
[#file_summary{readers = Readers}] when Readers > 0 ->
2089+
rabbit_log:error("Asked to delete file ~p, but it has readers. Deferring.",
2090+
[File]),
2091+
{defer, [File]}
2092+
end.
20722093

20732094
load_and_vacuum_message_file(File, State = #gc_state { dir = Dir }) ->
20742095
%% Messages here will be end-of-file at start-of-list

src/rabbit_msg_store_gc.erl

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -119,15 +119,13 @@ attempt_action(Action, Files,
119119
State = #state { pending_no_readers = Pending,
120120
on_action = Thunks,
121121
msg_store_state = MsgStoreState }) ->
122-
case [File || File <- Files,
123-
rabbit_msg_store:has_readers(File, MsgStoreState)] of
124-
[] -> State #state {
125-
on_action = lists:filter(
126-
fun (Thunk) -> not Thunk() end,
127-
[do_action(Action, Files, MsgStoreState) |
128-
Thunks]) };
129-
[File | _] -> Pending1 = maps:put(File, {Action, Files}, Pending),
130-
State #state { pending_no_readers = Pending1 }
122+
case do_action(Action, Files, MsgStoreState) of
123+
{ok, OkThunk} ->
124+
State#state{on_action = lists:filter(fun (Thunk) -> not Thunk() end,
125+
[OkThunk | Thunks])};
126+
{defer, [File | _]} ->
127+
Pending1 = maps:put(File, {Action, Files}, Pending),
128+
State #state { pending_no_readers = Pending1 }
131129
end.
132130

133131
do_action(combine, [Source, Destination], MsgStoreState) ->

0 commit comments

Comments
 (0)