1616
1717-export ([compact_file /2 , truncate_file /4 , delete_file /2 ]). % % internal
1818
19- -export ([scan_file_for_valid_messages /1 ]). % % salvage tool
19+ -export ([scan_file_for_valid_messages /1 , scan_file_for_valid_messages / 2 ]). % % salvage tool
2020
2121-export ([init /1 , handle_call /3 , handle_cast /2 , handle_info /2 , terminate /2 ,
2222 code_change /3 , prioritise_call /4 , prioritise_cast /3 ,
@@ -1472,31 +1472,28 @@ list_sorted_filenames(Dir, Ext) ->
14721472
14731473-define (SCAN_BLOCK_SIZE , 4194304 ). % % 4MB
14741474
1475- scan_file_for_valid_messages (Dir , FileName ) ->
1476- scan_file_for_valid_messages (form_filename (Dir , FileName )).
1477-
1475+ % % Exported as a salvage tool. Not as accurate as node recovery
1476+ % % because it doesn't have the queue index.
14781477scan_file_for_valid_messages (Path ) ->
1478+ scan_file_for_valid_messages (Path , fun (Obj ) -> {valid , Obj } end ).
1479+
1480+ scan_file_for_valid_messages (Path , Fun ) ->
14791481 case file :open (Path , [read , binary , raw ]) of
14801482 {ok , Fd } ->
14811483 {ok , FileSize } = file :position (Fd , eof ),
14821484 {ok , _ } = file :position (Fd , bof ),
1483- Messages = scan (<<>>, Fd , 0 , FileSize , #{}, []),
1485+ Messages = scan (<<>>, Fd , Fun , 0 , FileSize , #{}, []),
14841486 ok = file :close (Fd ),
1485- case Messages of
1486- [] ->
1487- {ok , [], 0 };
1488- [{_ , TotalSize , Offset }|_ ] ->
1489- {ok , Messages , Offset + TotalSize }
1490- end ;
1487+ {ok , Messages };
14911488 {error , enoent } ->
1492- {ok , [], 0 };
1489+ {ok , []};
14931490 {error , Reason } ->
14941491 {error , {unable_to_scan_file ,
14951492 filename :basename (Path ),
14961493 Reason }}
14971494 end .
14981495
1499- scan (Buffer , Fd , Offset , FileSize , MsgIdsFound , Acc ) ->
1496+ scan (Buffer , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc ) ->
15001497 case file :read (Fd , ? SCAN_BLOCK_SIZE ) of
15011498 eof ->
15021499 Acc ;
@@ -1505,12 +1502,12 @@ scan(Buffer, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
15051502 <<>> -> Data0 ;
15061503 _ -> <<Buffer /binary , Data0 /binary >>
15071504 end ,
1508- scan_data (Data , Fd , Offset , FileSize , MsgIdsFound , Acc )
1505+ scan_data (Data , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc )
15091506 end .
15101507
15111508% % Message might have been found.
15121509scan_data (<<Size :64 , MsgIdAndMsg :Size /binary , 255 , Rest /bits >> = Data ,
1513- Fd , Offset , FileSize , MsgIdsFound , Acc )
1510+ Fd , Fun , Offset , FileSize , MsgIdsFound , Acc )
15141511 when Size >= 16 ->
15151512 <<MsgIdInt :128 , _ /bits >> = MsgIdAndMsg ,
15161513 case MsgIdsFound of
@@ -1519,26 +1516,37 @@ scan_data(<<Size:64, MsgIdAndMsg:Size/binary, 255, Rest/bits>> = Data,
15191516 % % simply be a coincidence. Try the next byte.
15201517 #{MsgIdInt := true } ->
15211518 <<_ , Rest2 /bits >> = Data ,
1522- scan_data (Rest2 , Fd , Offset + 1 , FileSize , MsgIdsFound , Acc );
1519+ scan_data (Rest2 , Fd , Fun , Offset + 1 , FileSize , MsgIdsFound , Acc );
15231520 % % Data looks to be a message.
15241521 _ ->
15251522 % % Avoid sub-binary construction.
15261523 MsgId = <<MsgIdInt :128 >>,
15271524 TotalSize = Size + 9 ,
1528- scan_data (Rest , Fd , Offset + TotalSize , FileSize ,
1529- MsgIdsFound #{MsgIdInt => true },
1530- [{MsgId , TotalSize , Offset }|Acc ])
1525+ case Fun ({MsgId , TotalSize , Offset }) of
1526+ % % Confirmed to be a message by the provided fun.
1527+ {valid , Entry } ->
1528+ scan_data (Rest , Fd , Fun , Offset + TotalSize , FileSize ,
1529+ MsgIdsFound #{MsgIdInt => true }, [Entry |Acc ]);
1530+ % % Confirmed to be a message but we don't need it anymore.
1531+ previously_valid ->
1532+ scan_data (Rest , Fd , Fun , Offset + TotalSize , FileSize ,
1533+ MsgIdsFound #{MsgIdInt => true }, Acc );
1534+ % % Not a message, try the next byte.
1535+ invalid ->
1536+ <<_ , Rest2 /bits >> = Data ,
1537+ scan_data (Rest2 , Fd , Fun , Offset + 1 , FileSize , MsgIdsFound , Acc )
1538+ end
15311539 end ;
15321540% % This might be the start of a message.
1533- scan_data (<<Size :64 , Rest /bits >> = Data , Fd , Offset , FileSize , MsgIdsFound , Acc )
1541+ scan_data (<<Size :64 , Rest /bits >> = Data , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc )
15341542 when byte_size (Rest ) < Size + 1 , Size < FileSize - Offset ->
1535- scan (Data , Fd , Offset , FileSize , MsgIdsFound , Acc );
1536- scan_data (Data , Fd , Offset , FileSize , MsgIdsFound , Acc )
1543+ scan (Data , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc );
1544+ scan_data (Data , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc )
15371545 when byte_size (Data ) < 8 ->
1538- scan (Data , Fd , Offset , FileSize , MsgIdsFound , Acc );
1546+ scan (Data , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc );
15391547% % This is definitely not a message. Try the next byte.
1540- scan_data (<<_ , Rest /bits >>, Fd , Offset , FileSize , MsgIdsFound , Acc ) ->
1541- scan_data (Rest , Fd , Offset + 1 , FileSize , MsgIdsFound , Acc ).
1548+ scan_data (<<_ , Rest /bits >>, Fd , Fun , Offset , FileSize , MsgIdsFound , Acc ) ->
1549+ scan_data (Rest , Fd , Fun , Offset + 1 , FileSize , MsgIdsFound , Acc ).
15421550
15431551% %----------------------------------------------------------------------------
15441552% % Ets index
@@ -1742,47 +1750,39 @@ build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},
17421750
17431751build_index_worker (Gatherer , # msstate { index_ets = IndexEts , dir = Dir },
17441752 File , Files ) ->
1745- FileName = filenum_to_name (File ),
1753+ Path = form_filename ( Dir , filenum_to_name (File ) ),
17461754 rabbit_log :debug (" Rebuilding message location index from ~ts (~B file(s) remaining)" ,
1747- [form_filename ( Dir , FileName ) , length (Files )]),
1755+ [Path , length (Files )]),
17481756 % % The scan function already dealt with duplicate messages
1749- % % within the file. We then get messages in reverse order.
1750- {ok , Messages , FileSize } =
1751- scan_file_for_valid_messages (Dir , FileName ),
1752- % % Valid messages are in file order so the last message is
1753- % % the last message from the list.
1754- {ValidMessages , ValidTotalSize } =
1755- lists :foldl (
1756- fun (Obj = {MsgId , TotalSize , Offset }, {VMAcc , VTSAcc }) ->
1757- % % Fan-out may result in the same message data in multiple
1758- % % files so we have to guard against it.
1759- case index_lookup (IndexEts , MsgId ) of
1760- # msg_location { file = undefined } = StoreEntry ->
1761- ok = index_update (IndexEts , StoreEntry # msg_location {
1762- file = File , offset = Offset ,
1763- total_size = TotalSize }),
1764- {[Obj | VMAcc ], VTSAcc + TotalSize };
1765- _ ->
1766- {VMAcc , VTSAcc }
1767- end
1768- end , {[], 0 }, Messages ),
1769- FileSize1 =
1770- case Files of
1771- % % if it's the last file, we'll truncate to remove any
1772- % % rubbish above the last valid message. This affects the
1773- % % file size.
1774- [] -> case ValidMessages of
1775- [] -> 0 ;
1776- _ -> {_MsgId , TotalSize , Offset } =
1777- lists :last (ValidMessages ),
1778- Offset + TotalSize
1779- end ;
1780- [_ |_ ] -> FileSize
1781- end ,
1757+ % % within the file, and only returns valid messages (we do
1758+ % % the index lookup in the fun). But we get messages in reverse order.
1759+ {ok , Messages } = scan_file_for_valid_messages (Path ,
1760+ fun (Obj = {MsgId , TotalSize , Offset }) ->
1761+ % % Fan-out may result in the same message data in multiple
1762+ % % files so we have to guard against it.
1763+ case index_lookup (IndexEts , MsgId ) of
1764+ # msg_location { file = undefined } = StoreEntry ->
1765+ ok = index_update (IndexEts , StoreEntry # msg_location {
1766+ file = File , offset = Offset ,
1767+ total_size = TotalSize }),
1768+ {valid , Obj };
1769+ _ ->
1770+ invalid
1771+ end
1772+ end ),
1773+ ValidTotalSize = lists :foldl (fun ({_ , TotalSize , _ }, Acc ) -> Acc + TotalSize end , 0 , Messages ),
1774+ % % Any file may have rubbish at the end of it that we will want truncated.
1775+ % % Note that the last message in the file is the first in the list.
1776+ FileSize = case Messages of
1777+ [] ->
1778+ 0 ;
1779+ [{_ , TotalSize , Offset }|_ ] ->
1780+ Offset + TotalSize
1781+ end ,
17821782 ok = gatherer :in (Gatherer , # file_summary {
17831783 file = File ,
17841784 valid_total_size = ValidTotalSize ,
1785- file_size = FileSize1 ,
1785+ file_size = FileSize ,
17861786 locked = false }),
17871787 ok = gatherer :finish (Gatherer ).
17881788
@@ -1933,7 +1933,7 @@ compact_file(File, State = #gc_state { index_ets = IndexEts,
19331933 % % Load the messages. It's possible to get 0 messages here;
19341934 % % that's OK. That means we have little to do as the file is
19351935 % % about to be deleted.
1936- { Messages , _ } = scan_and_vacuum_message_file (File , State ),
1936+ Messages = scan_and_vacuum_message_file (File , State ),
19371937 % % Blank holes. We must do this first otherwise the file is left
19381938 % % with data that may confuse the code (for example data that looks
19391939 % % like a message, isn't a message, but spans over a real message).
@@ -2087,7 +2087,7 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
20872087 _ ->
20882088 [# file_summary { valid_total_size = 0 ,
20892089 file_size = FileSize }] = ets :lookup (FileSummaryEts , File ),
2090- {[], 0 } = scan_and_vacuum_message_file (File , State ),
2090+ [] = scan_and_vacuum_message_file (File , State ),
20912091 ok = file :delete (form_filename (Dir , filenum_to_name (File ))),
20922092 true = ets :delete (FileSummaryEts , File ),
20932093 rabbit_log :debug (" Deleted empty file number ~tp ; reclaimed ~tp bytes" , [File , FileSize ]),
@@ -2096,28 +2096,31 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
20962096
20972097scan_and_vacuum_message_file (File , # gc_state { index_ets = IndexEts , dir = Dir }) ->
20982098 % % Messages here will be end-of-file at start-of-list
2099- {ok , Messages , _FileSize } =
2100- scan_file_for_valid_messages (Dir , filenum_to_name (File )),
2101- % % foldl will reverse so will end up with msgs in ascending offset order
2102- lists :foldl (
2103- fun ({MsgId , TotalSize , Offset }, Acc = {List , Size }) ->
2104- case index_lookup (IndexEts , MsgId ) of
2105- # msg_location { file = File , total_size = TotalSize ,
2106- offset = Offset , ref_count = 0 } = Entry ->
2107- index_delete_object (IndexEts , Entry ),
2108- Acc ;
2109- # msg_location { file = File , total_size = TotalSize ,
2110- offset = Offset } = Entry ->
2111- {[ Entry | List ], TotalSize + Size };
2112- % % Fan-out may remove the entry but also write a new
2113- % % entry in a different file when it needs to write
2114- % % a message and the existing reference is in a file
2115- % % that's about to be deleted. So we explicitly accept
2116- % % these cases and ignore this message.
2117- # msg_location { file = OtherFile , total_size = TotalSize }
2118- when File =/= OtherFile ->
2119- Acc ;
2120- not_found ->
2121- Acc
2122- end
2123- end , {[], 0 }, Messages ).
2099+ Path = form_filename (Dir , filenum_to_name (File )),
2100+ {ok , Messages } = scan_file_for_valid_messages (Path ,
2101+ fun ({MsgId , TotalSize , Offset }) ->
2102+ case index_lookup (IndexEts , MsgId ) of
2103+ # msg_location { file = File , total_size = TotalSize ,
2104+ offset = Offset , ref_count = 0 } = Entry ->
2105+ index_delete_object (IndexEts , Entry ),
2106+ % % The message was valid, but since we have now deleted
2107+ % % it due to having no ref_count, it becomes invalid.
2108+ % % We still want to let the scan function skip though.
2109+ previously_valid ;
2110+ # msg_location { file = File , total_size = TotalSize ,
2111+ offset = Offset } = Entry ->
2112+ {valid , Entry };
2113+ % % Fan-out may remove the entry but also write a new
2114+ % % entry in a different file when it needs to write
2115+ % % a message and the existing reference is in a file
2116+ % % that's about to be deleted. So we explicitly accept
2117+ % % these cases and ignore this message.
2118+ # msg_location { file = OtherFile , total_size = TotalSize }
2119+ when File =/= OtherFile ->
2120+ invalid ;
2121+ not_found ->
2122+ invalid
2123+ end
2124+ end ),
2125+ % % @todo Do we really need to reverse messages?
2126+ lists :reverse (Messages ).
0 commit comments