1616
1717-export ([compact_file /2 , truncate_file /4 , delete_file /2 ]). % % internal
1818
19- -export ([scan_file_for_valid_messages /1 ]). % % salvage tool
19+ -export ([scan_file_for_valid_messages /1 , scan_file_for_valid_messages / 2 ]). % % salvage tool
2020
2121-export ([init /1 , handle_call /3 , handle_cast /2 , handle_info /2 , terminate /2 ,
2222 code_change /3 , prioritise_call /4 , prioritise_cast /3 ,
@@ -1472,15 +1472,17 @@ list_sorted_filenames(Dir, Ext) ->
14721472
14731473-define (SCAN_BLOCK_SIZE , 4194304 ). % % 4MB
14741474
1475- scan_file_for_valid_messages (Dir , FileName ) ->
1476- scan_file_for_valid_messages (form_filename (Dir , FileName )).
1477-
1475+ % % Exported as a salvage tool. Not as accurate as node recovery
1476+ % % because it doesn't have the queue index.
14781477scan_file_for_valid_messages (Path ) ->
1478+ scan_file_for_valid_messages (Path , fun (Obj ) -> {valid , Obj } end ).
1479+
1480+ scan_file_for_valid_messages (Path , Fun ) ->
14791481 case file :open (Path , [read , binary , raw ]) of
14801482 {ok , Fd } ->
14811483 {ok , FileSize } = file :position (Fd , eof ),
14821484 {ok , _ } = file :position (Fd , bof ),
1483- Messages = scan (<<>>, Fd , 0 , FileSize , #{}, []),
1485+ Messages = scan (<<>>, Fd , Fun , 0 , FileSize , #{}, []),
14841486 ok = file :close (Fd ),
14851487 case Messages of
14861488 [] ->
@@ -1496,7 +1498,7 @@ scan_file_for_valid_messages(Path) ->
14961498 Reason }}
14971499 end .
14981500
1499- scan (Buffer , Fd , Offset , FileSize , MsgIdsFound , Acc ) ->
1501+ scan (Buffer , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc ) ->
15001502 case file :read (Fd , ? SCAN_BLOCK_SIZE ) of
15011503 eof ->
15021504 Acc ;
@@ -1505,12 +1507,12 @@ scan(Buffer, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
15051507 <<>> -> Data0 ;
15061508 _ -> <<Buffer /binary , Data0 /binary >>
15071509 end ,
1508- scan_data (Data , Fd , Offset , FileSize , MsgIdsFound , Acc )
1510+ scan_data (Data , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc )
15091511 end .
15101512
15111513% % Message might have been found.
15121514scan_data (<<Size :64 , MsgIdAndMsg :Size /binary , 255 , Rest /bits >> = Data ,
1513- Fd , Offset , FileSize , MsgIdsFound , Acc )
1515+ Fd , Fun , Offset , FileSize , MsgIdsFound , Acc )
15141516 when Size >= 16 ->
15151517 <<MsgIdInt :128 , _ /bits >> = MsgIdAndMsg ,
15161518 case MsgIdsFound of
@@ -1519,26 +1521,33 @@ scan_data(<<Size:64, MsgIdAndMsg:Size/binary, 255, Rest/bits>> = Data,
15191521 % % simply be a coincidence. Try the next byte.
15201522 #{MsgIdInt := true } ->
15211523 <<_ , Rest2 /bits >> = Data ,
1522- scan_data (Rest2 , Fd , Offset + 1 , FileSize , MsgIdsFound , Acc );
1524+ scan_data (Rest2 , Fd , Fun , Offset + 1 , FileSize , MsgIdsFound , Acc );
15231525 % % Data looks to be a message.
15241526 _ ->
15251527 % % Avoid sub-binary construction.
15261528 MsgId = <<MsgIdInt :128 >>,
15271529 TotalSize = Size + 9 ,
1528- scan_data (Rest , Fd , Offset + TotalSize , FileSize ,
1529- MsgIdsFound #{MsgIdInt => true },
1530- [{MsgId , TotalSize , Offset }|Acc ])
1530+ case Fun ({MsgId , TotalSize , Offset }) of
1531+ % % Confirmed to be a message by the provided fun.
1532+ {valid , Entry } ->
1533+ scan_data (Rest , Fd , Fun , Offset + TotalSize , FileSize ,
1534+ MsgIdsFound #{MsgIdInt => true }, [Entry |Acc ]);
1535+ % % Not a message, try the next byte.
1536+ invalid ->
1537+ <<_ , Rest2 /bits >> = Data ,
1538+ scan_data (Rest2 , Fd , Fun , Offset + 1 , FileSize , MsgIdsFound , Acc )
1539+ end
15311540 end ;
15321541% % This might be the start of a message.
1533- scan_data (<<Size :64 , Rest /bits >> = Data , Fd , Offset , FileSize , MsgIdsFound , Acc )
1542+ scan_data (<<Size :64 , Rest /bits >> = Data , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc )
15341543 when byte_size (Rest ) < Size + 1 , Size < FileSize - Offset ->
1535- scan (Data , Fd , Offset , FileSize , MsgIdsFound , Acc );
1536- scan_data (Data , Fd , Offset , FileSize , MsgIdsFound , Acc )
1544+ scan (Data , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc );
1545+ scan_data (Data , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc )
15371546 when byte_size (Data ) < 8 ->
1538- scan (Data , Fd , Offset , FileSize , MsgIdsFound , Acc );
1547+ scan (Data , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc );
15391548% % This is definitely not a message. Try the next byte.
1540- scan_data (<<_ , Rest /bits >>, Fd , Offset , FileSize , MsgIdsFound , Acc ) ->
1541- scan_data (Rest , Fd , Offset + 1 , FileSize , MsgIdsFound , Acc ).
1549+ scan_data (<<_ , Rest /bits >>, Fd , Fun , Offset , FileSize , MsgIdsFound , Acc ) ->
1550+ scan_data (Rest , Fd , Fun , Offset + 1 , FileSize , MsgIdsFound , Acc ).
15421551
15431552% %----------------------------------------------------------------------------
15441553% % Ets index
@@ -1742,39 +1751,37 @@ build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},
17421751
17431752build_index_worker (Gatherer , # msstate { index_ets = IndexEts , dir = Dir },
17441753 File , Files ) ->
1745- FileName = filenum_to_name (File ),
1754+ Path = form_filename ( Dir , filenum_to_name (File ) ),
17461755 rabbit_log :debug (" Rebuilding message location index from ~ts (~B file(s) remaining)" ,
1747- [form_filename ( Dir , FileName ) , length (Files )]),
1756+ [Path , length (Files )]),
17481757 % % The scan function already dealt with duplicate messages
1749- % % within the file. We then get messages in reverse order.
1750- {ok , Messages , FileSize } =
1751- scan_file_for_valid_messages (Dir , FileName ),
1752- % % Valid messages are in file order so the last message is
1753- % % the last message from the list.
1754- {ValidMessages , ValidTotalSize } =
1755- lists :foldl (
1756- fun (Obj = {MsgId , TotalSize , Offset }, {VMAcc , VTSAcc }) ->
1757- % % Fan-out may result in the same message data in multiple
1758- % % files so we have to guard against it.
1759- case index_lookup (IndexEts , MsgId ) of
1760- # msg_location { file = undefined } = StoreEntry ->
1761- ok = index_update (IndexEts , StoreEntry # msg_location {
1762- file = File , offset = Offset ,
1763- total_size = TotalSize }),
1764- {[Obj | VMAcc ], VTSAcc + TotalSize };
1765- _ ->
1766- {VMAcc , VTSAcc }
1767- end
1768- end , {[], 0 }, Messages ),
1758+ % % within the file, and only returns valid messages (we do
1759+ % % the index lookup in the fun). But we get messages in reverse order.
1760+ {ok , Messages , FileSize } = scan_file_for_valid_messages (Path ,
1761+ fun (Obj = {MsgId , TotalSize , Offset }) ->
1762+ % % Fan-out may result in the same message data in multiple
1763+ % % files so we have to guard against it.
1764+ case index_lookup (IndexEts , MsgId ) of
1765+ # msg_location { file = undefined } = StoreEntry ->
1766+ ok = index_update (IndexEts , StoreEntry # msg_location {
1767+ file = File , offset = Offset ,
1768+ total_size = TotalSize }),
1769+ {valid , Obj };
1770+ _ ->
1771+ invalid
1772+ end
1773+ end ),
1774+ ValidTotalSize = lists :foldl (fun ({_ , TotalSize , _ }, Acc ) -> Acc + TotalSize end , 0 , Messages ),
17691775 FileSize1 =
17701776 case Files of
17711777 % % if it's the last file, we'll truncate to remove any
17721778 % % rubbish above the last valid message. This affects the
17731779 % % file size.
1774- [] -> case ValidMessages of
1780+ [] -> case Messages of
17751781 [] -> 0 ;
1776- _ -> {_MsgId , TotalSize , Offset } =
1777- lists :last (ValidMessages ),
1782+ % % Messages is in reverse order so the first in the list
1783+ % % is the last message in the file.
1784+ [{_ , TotalSize , Offset }|_ ] ->
17781785 Offset + TotalSize
17791786 end ;
17801787 [_ |_ ] -> FileSize
@@ -1933,7 +1940,7 @@ compact_file(File, State = #gc_state { index_ets = IndexEts,
19331940 % % Load the messages. It's possible to get 0 messages here;
19341941 % % that's OK. That means we have little to do as the file is
19351942 % % about to be deleted.
1936- { Messages , _ } = scan_and_vacuum_message_file (File , State ),
1943+ Messages = scan_and_vacuum_message_file (File , State ),
19371944 % % Blank holes. We must do this first otherwise the file is left
19381945 % % with data that may confuse the code (for example data that looks
19391946 % % like a message, isn't a message, but spans over a real message).
@@ -2076,9 +2083,9 @@ truncate_file(File, Size, ThresholdTimestamp, #gc_state{ file_summary_ets = File
20762083
20772084-spec delete_file (non_neg_integer (), gc_state ()) -> ok | defer .
20782085
2079- delete_file (File , State = # gc_state { file_summary_ets = FileSummaryEts ,
2080- file_handles_ets = FileHandlesEts ,
2081- dir = Dir }) ->
2086+ delete_file (File , # gc_state { file_summary_ets = FileSummaryEts ,
2087+ file_handles_ets = FileHandlesEts ,
2088+ dir = Dir }) ->
20822089 case ets :match_object (FileHandlesEts , {{'_' , File }, '_' }, 1 ) of
20832090 {[_ |_ ], _Cont } ->
20842091 rabbit_log :debug (" Asked to delete file ~p but it has active readers. Deferring." ,
@@ -2087,7 +2094,8 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
20872094 _ ->
20882095 [# file_summary { valid_total_size = 0 ,
20892096 file_size = FileSize }] = ets :lookup (FileSummaryEts , File ),
2090- {[], 0 } = scan_and_vacuum_message_file (File , State ),
2097+ % % @todo What do?
2098+ % [] = scan_and_vacuum_message_file(File, State),
20912099 ok = file :delete (form_filename (Dir , filenum_to_name (File ))),
20922100 true = ets :delete (FileSummaryEts , File ),
20932101 rabbit_log :debug (" Deleted empty file number ~tp ; reclaimed ~tp bytes" , [File , FileSize ]),
@@ -2096,28 +2104,30 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
20962104
20972105scan_and_vacuum_message_file (File , # gc_state { index_ets = IndexEts , dir = Dir }) ->
20982106 % % Messages here will be end-of-file at start-of-list
2099- {ok , Messages , _FileSize } =
2100- scan_file_for_valid_messages (Dir , filenum_to_name (File )),
2101- % % foldl will reverse so will end up with msgs in ascending offset order
2102- lists :foldl (
2103- fun ({MsgId , TotalSize , Offset }, Acc = {List , Size }) ->
2104- case index_lookup (IndexEts , MsgId ) of
2105- # msg_location { file = File , total_size = TotalSize ,
2106- offset = Offset , ref_count = 0 } = Entry ->
2107- index_delete_object (IndexEts , Entry ),
2108- Acc ;
2109- # msg_location { file = File , total_size = TotalSize ,
2110- offset = Offset } = Entry ->
2111- {[ Entry | List ], TotalSize + Size };
2112- % % Fan-out may remove the entry but also write a new
2113- % % entry in a different file when it needs to write
2114- % % a message and the existing reference is in a file
2115- % % that's about to be deleted. So we explicitly accept
2116- % % these cases and ignore this message.
2117- # msg_location { file = OtherFile , total_size = TotalSize }
2118- when File =/= OtherFile ->
2119- Acc ;
2120- not_found ->
2121- Acc
2122- end
2123- end , {[], 0 }, Messages ).
2107+ Path = form_filename (Dir , filenum_to_name (File )),
2108+ {ok , Messages , _FileSize } = scan_file_for_valid_messages (Path ,
2109+ fun ({MsgId , TotalSize , Offset }) ->
2110+ case index_lookup (IndexEts , MsgId ) of
2111+ # msg_location { file = File , total_size = TotalSize ,
2112+ offset = Offset , ref_count = 0 } = Entry ->
2113+ index_delete_object (IndexEts , Entry ),
2114+ % % The message was valid, but since we have now deleted
2115+ % % it due to having no ref_count, it becomes invalid.
2116+ invalid ;
2117+ # msg_location { file = File , total_size = TotalSize ,
2118+ offset = Offset } = Entry ->
2119+ {valid , Entry };
2120+ % % Fan-out may remove the entry but also write a new
2121+ % % entry in a different file when it needs to write
2122+ % % a message and the existing reference is in a file
2123+ % % that's about to be deleted. So we explicitly accept
2124+ % % these cases and ignore this message.
2125+ # msg_location { file = OtherFile , total_size = TotalSize }
2126+ when File =/= OtherFile ->
2127+ invalid ;
2128+ not_found ->
2129+ invalid
2130+ end
2131+ end ),
2132+ % % @todo Do we really need to reverse messages?
2133+ lists :reverse (Messages ).
0 commit comments