From 639e905aea7b3726b789afca73fd4d8cddb01245 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Thu, 26 Sep 2024 16:43:55 +0200 Subject: [PATCH] CQ: Fix shared store scanner missing messages It was still possible, although rare, to have message store files lose message data, when the following conditions were met: * the message data contains byte values 255 (255 is used as an OK marker after a message) * the message is located after a 0-filled hole in the file * the length of the data is at least 4096 bytes and if we misread it (as detailed below) we encounter a 255 byte where we expect the OK marker The trick for the code to previously misread the length can be explained as follow: A message is stored in the following format: <> With MsgId always being 16 bytes in length. So Len is always at least 16, if the message data Msg is empty. But technically it never is. Now if we have a zero filled hole just before this message, we may end up with this: <<0, Len:64, MsgIdAndMsg:Len/unit:8, 255>> When we are scanning we are testing bytes to see if there is a message there or not. We look for a Len that gives us byte 255 after MsgIdAndMsg. Len of value 4096 looks like this in binary: <<0:48, 16, 0>> Problem is if we have leading zeroes, Len may look like this: <<0, 0:48, 16, 0>> If we take the first 64 bits we get a potential length of 16. We look at the byte after the next 16 bytes. If it is 255, we think this is a message and skip by this amount of bytes, and mistakenly miss the real message. Solving this by changing the file format would be simple enough, but we don't have the luxury to afford that. A different solution was found, which is to combine file scanning with checking that the message exists in the message store index (populated from queues at startup, and kept up to date over the life time of the store). Then we know for sure that the message above doesn't exist, because the MsgId won't be found in the index. If it is, then the file number and offset will not match, and the check will fail. There remains a small chance that we get it wrong during dirty recovery. Only a better file format would improve that. --- deps/rabbit/src/rabbit_msg_store.erl | 179 ++++++++++++----------- deps/rabbit/test/backing_queue_SUITE.erl | 23 ++- 2 files changed, 108 insertions(+), 94 deletions(-) diff --git a/deps/rabbit/src/rabbit_msg_store.erl b/deps/rabbit/src/rabbit_msg_store.erl index b28506ab2ab8..efd8d53a0507 100644 --- a/deps/rabbit/src/rabbit_msg_store.erl +++ b/deps/rabbit/src/rabbit_msg_store.erl @@ -16,7 +16,7 @@ -export([compact_file/2, truncate_file/4, delete_file/2]). %% internal --export([scan_file_for_valid_messages/1]). %% salvage tool +-export([scan_file_for_valid_messages/1, scan_file_for_valid_messages/2]). %% salvage tool -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_call/4, prioritise_cast/3, @@ -1472,31 +1472,28 @@ list_sorted_filenames(Dir, Ext) -> -define(SCAN_BLOCK_SIZE, 4194304). %% 4MB -scan_file_for_valid_messages(Dir, FileName) -> - scan_file_for_valid_messages(form_filename(Dir, FileName)). - +%% Exported as a salvage tool. Not as accurate as node recovery +%% because it doesn't have the queue index. scan_file_for_valid_messages(Path) -> + scan_file_for_valid_messages(Path, fun(Obj) -> {valid, Obj} end). + +scan_file_for_valid_messages(Path, Fun) -> case file:open(Path, [read, binary, raw]) of {ok, Fd} -> {ok, FileSize} = file:position(Fd, eof), {ok, _} = file:position(Fd, bof), - Messages = scan(<<>>, Fd, 0, FileSize, #{}, []), + Messages = scan(<<>>, Fd, Fun, 0, FileSize, #{}, []), ok = file:close(Fd), - case Messages of - [] -> - {ok, [], 0}; - [{_, TotalSize, Offset}|_] -> - {ok, Messages, Offset + TotalSize} - end; + {ok, Messages}; {error, enoent} -> - {ok, [], 0}; + {ok, []}; {error, Reason} -> {error, {unable_to_scan_file, filename:basename(Path), Reason}} end. -scan(Buffer, Fd, Offset, FileSize, MsgIdsFound, Acc) -> +scan(Buffer, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) -> case file:read(Fd, ?SCAN_BLOCK_SIZE) of eof -> Acc; @@ -1505,12 +1502,12 @@ scan(Buffer, Fd, Offset, FileSize, MsgIdsFound, Acc) -> <<>> -> Data0; _ -> <> end, - scan_data(Data, Fd, Offset, FileSize, MsgIdsFound, Acc) + scan_data(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) end. %% Message might have been found. scan_data(<> = Data, - Fd, Offset, FileSize, MsgIdsFound, Acc) + Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) when Size >= 16 -> <> = MsgIdAndMsg, case MsgIdsFound of @@ -1519,26 +1516,37 @@ scan_data(<> = Data, %% simply be a coincidence. Try the next byte. #{MsgIdInt := true} -> <<_, Rest2/bits>> = Data, - scan_data(Rest2, Fd, Offset + 1, FileSize, MsgIdsFound, Acc); + scan_data(Rest2, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc); %% Data looks to be a message. _ -> %% Avoid sub-binary construction. MsgId = <>, TotalSize = Size + 9, - scan_data(Rest, Fd, Offset + TotalSize, FileSize, - MsgIdsFound#{MsgIdInt => true}, - [{MsgId, TotalSize, Offset}|Acc]) + case Fun({MsgId, TotalSize, Offset}) of + %% Confirmed to be a message by the provided fun. + {valid, Entry} -> + scan_data(Rest, Fd, Fun, Offset + TotalSize, FileSize, + MsgIdsFound#{MsgIdInt => true}, [Entry|Acc]); + %% Confirmed to be a message but we don't need it anymore. + previously_valid -> + scan_data(Rest, Fd, Fun, Offset + TotalSize, FileSize, + MsgIdsFound#{MsgIdInt => true}, Acc); + %% Not a message, try the next byte. + invalid -> + <<_, Rest2/bits>> = Data, + scan_data(Rest2, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc) + end end; %% This might be the start of a message. -scan_data(<> = Data, Fd, Offset, FileSize, MsgIdsFound, Acc) +scan_data(<> = Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) when byte_size(Rest) < Size + 1, Size < FileSize - Offset -> - scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc); -scan_data(Data, Fd, Offset, FileSize, MsgIdsFound, Acc) + scan(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc); +scan_data(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) when byte_size(Data) < 8 -> - scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc); + scan(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc); %% This is definitely not a message. Try the next byte. -scan_data(<<_, Rest/bits>>, Fd, Offset, FileSize, MsgIdsFound, Acc) -> - scan_data(Rest, Fd, Offset + 1, FileSize, MsgIdsFound, Acc). +scan_data(<<_, Rest/bits>>, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) -> + scan_data(Rest, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc). %%---------------------------------------------------------------------------- %% Ets index @@ -1742,47 +1750,39 @@ build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit}, build_index_worker(Gatherer, #msstate { index_ets = IndexEts, dir = Dir }, File, Files) -> - FileName = filenum_to_name(File), + Path = form_filename(Dir, filenum_to_name(File)), rabbit_log:debug("Rebuilding message location index from ~ts (~B file(s) remaining)", - [form_filename(Dir, FileName), length(Files)]), + [Path, length(Files)]), %% The scan function already dealt with duplicate messages - %% within the file. We then get messages in reverse order. - {ok, Messages, FileSize} = - scan_file_for_valid_messages(Dir, FileName), - %% Valid messages are in file order so the last message is - %% the last message from the list. - {ValidMessages, ValidTotalSize} = - lists:foldl( - fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> - %% Fan-out may result in the same message data in multiple - %% files so we have to guard against it. - case index_lookup(IndexEts, MsgId) of - #msg_location { file = undefined } = StoreEntry -> - ok = index_update(IndexEts, StoreEntry #msg_location { - file = File, offset = Offset, - total_size = TotalSize }), - {[Obj | VMAcc], VTSAcc + TotalSize}; - _ -> - {VMAcc, VTSAcc} - end - end, {[], 0}, Messages), - FileSize1 = - case Files of - %% if it's the last file, we'll truncate to remove any - %% rubbish above the last valid message. This affects the - %% file size. - [] -> case ValidMessages of - [] -> 0; - _ -> {_MsgId, TotalSize, Offset} = - lists:last(ValidMessages), - Offset + TotalSize - end; - [_|_] -> FileSize - end, + %% within the file, and only returns valid messages (we do + %% the index lookup in the fun). But we get messages in reverse order. + {ok, Messages} = scan_file_for_valid_messages(Path, + fun (Obj = {MsgId, TotalSize, Offset}) -> + %% Fan-out may result in the same message data in multiple + %% files so we have to guard against it. + case index_lookup(IndexEts, MsgId) of + #msg_location { file = undefined } = StoreEntry -> + ok = index_update(IndexEts, StoreEntry #msg_location { + file = File, offset = Offset, + total_size = TotalSize }), + {valid, Obj}; + _ -> + invalid + end + end), + ValidTotalSize = lists:foldl(fun({_, TotalSize, _}, Acc) -> Acc + TotalSize end, 0, Messages), + %% Any file may have rubbish at the end of it that we will want truncated. + %% Note that the last message in the file is the first in the list. + FileSize = case Messages of + [] -> + 0; + [{_, TotalSize, Offset}|_] -> + Offset + TotalSize + end, ok = gatherer:in(Gatherer, #file_summary { file = File, valid_total_size = ValidTotalSize, - file_size = FileSize1, + file_size = FileSize, locked = false }), ok = gatherer:finish(Gatherer). @@ -1933,7 +1933,7 @@ compact_file(File, State = #gc_state { index_ets = IndexEts, %% Load the messages. It's possible to get 0 messages here; %% that's OK. That means we have little to do as the file is %% about to be deleted. - {Messages, _} = scan_and_vacuum_message_file(File, State), + Messages = scan_and_vacuum_message_file(File, State), %% Blank holes. We must do this first otherwise the file is left %% with data that may confuse the code (for example data that looks %% like a message, isn't a message, but spans over a real message). @@ -2087,7 +2087,7 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, _ -> [#file_summary{ valid_total_size = 0, file_size = FileSize }] = ets:lookup(FileSummaryEts, File), - {[], 0} = scan_and_vacuum_message_file(File, State), + [] = scan_and_vacuum_message_file(File, State), ok = file:delete(form_filename(Dir, filenum_to_name(File))), true = ets:delete(FileSummaryEts, File), rabbit_log:debug("Deleted empty file number ~tp; reclaimed ~tp bytes", [File, FileSize]), @@ -2096,28 +2096,31 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, scan_and_vacuum_message_file(File, #gc_state{ index_ets = IndexEts, dir = Dir }) -> %% Messages here will be end-of-file at start-of-list - {ok, Messages, _FileSize} = - scan_file_for_valid_messages(Dir, filenum_to_name(File)), - %% foldl will reverse so will end up with msgs in ascending offset order - lists:foldl( - fun ({MsgId, TotalSize, Offset}, Acc = {List, Size}) -> - case index_lookup(IndexEts, MsgId) of - #msg_location { file = File, total_size = TotalSize, - offset = Offset, ref_count = 0 } = Entry -> - index_delete_object(IndexEts, Entry), - Acc; - #msg_location { file = File, total_size = TotalSize, - offset = Offset } = Entry -> - {[ Entry | List ], TotalSize + Size}; - %% Fan-out may remove the entry but also write a new - %% entry in a different file when it needs to write - %% a message and the existing reference is in a file - %% that's about to be deleted. So we explicitly accept - %% these cases and ignore this message. - #msg_location { file = OtherFile, total_size = TotalSize } - when File =/= OtherFile -> - Acc; - not_found -> - Acc - end - end, {[], 0}, Messages). + Path = form_filename(Dir, filenum_to_name(File)), + {ok, Messages} = scan_file_for_valid_messages(Path, + fun ({MsgId, TotalSize, Offset}) -> + case index_lookup(IndexEts, MsgId) of + #msg_location { file = File, total_size = TotalSize, + offset = Offset, ref_count = 0 } = Entry -> + index_delete_object(IndexEts, Entry), + %% The message was valid, but since we have now deleted + %% it due to having no ref_count, it becomes invalid. + %% We still want to let the scan function skip though. + previously_valid; + #msg_location { file = File, total_size = TotalSize, + offset = Offset } = Entry -> + {valid, Entry}; + %% Fan-out may remove the entry but also write a new + %% entry in a different file when it needs to write + %% a message and the existing reference is in a file + %% that's about to be deleted. So we explicitly accept + %% these cases and ignore this message. + #msg_location { file = OtherFile, total_size = TotalSize } + when File =/= OtherFile -> + invalid; + not_found -> + invalid + end + end), + %% @todo Do we really need to reverse messages? + lists:reverse(Messages). diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index 2b4ce444c991..845cdc17ef56 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -630,6 +630,22 @@ msg_store_file_scan1(Config) -> %% Messages with no content. ok = Scan([{bin, <<0:64, "deadbeefdeadbeef", 255>>}]), ok = Scan([{msg, gen_id(), <<>>}]), + %% Tricky messages. + %% + %% These only get properly detected when the index is populated. + %% In this test case we simulate the index with a fun. + TrickyScan = fun (Blocks, Expected, Fun) -> + Path = gen_msg_file(Config, Blocks), + Result = rabbit_msg_store:scan_file_for_valid_messages(Path, Fun), + case Result of + Expected -> ok; + _ -> {expected, Expected, got, Result} + end + end, + ok = TrickyScan( + [{bin, <<0, 0:48, 17, 17, "idididididididid", 255, 0:4352/unit:8, 255>>}], + {ok, [{<<"idididididididid">>, 4378, 1}]}, + fun(Obj = {<<"idididididididid">>, 4378, 1}) -> {valid, Obj}; (_) -> invalid end), %% All good!! passed. @@ -662,12 +678,7 @@ gen_msg_file(Config, Blocks) -> gen_result(Blocks) -> Messages = gen_result(Blocks, 0, []), - case Messages of - [] -> - {ok, [], 0}; - [{_, TotalSize, Offset}|_] -> - {ok, Messages, Offset + TotalSize} - end. + {ok, Messages}. gen_result([], _, Acc) -> Acc;