1616
1717-export ([compact_file /2 , truncate_file /4 , delete_file /2 ]). % % internal
1818
19- -export ([scan_file_for_valid_messages /1 ]). % % salvage tool
19+ -export ([scan_file_for_valid_messages /1 , scan_file_for_valid_messages / 2 ]). % % salvage tool
2020
2121-export ([init /1 , handle_call /3 , handle_cast /2 , handle_info /2 , terminate /2 ,
2222 code_change /3 , prioritise_call /4 , prioritise_cast /3 ,
@@ -1472,15 +1472,17 @@ list_sorted_filenames(Dir, Ext) ->
14721472
14731473-define (SCAN_BLOCK_SIZE , 4194304 ). % % 4MB
14741474
1475- scan_file_for_valid_messages (Dir , FileName ) ->
1476- scan_file_for_valid_messages (form_filename (Dir , FileName )).
1477-
1475+ % % Exported as a salvage tool. Not as accurate as node recovery
1476+ % % because it doesn't have the queue index.
14781477scan_file_for_valid_messages (Path ) ->
1478+ scan_file_for_valid_messages (Path , fun (_ ) -> valid end ).
1479+
1480+ scan_file_for_valid_messages (Path , Fun ) ->
14791481 case file :open (Path , [read , binary , raw ]) of
14801482 {ok , Fd } ->
14811483 {ok , FileSize } = file :position (Fd , eof ),
14821484 {ok , _ } = file :position (Fd , bof ),
1483- Messages = scan (<<>>, Fd , 0 , FileSize , #{}, []),
1485+ Messages = scan (<<>>, Fd , Fun , 0 , FileSize , #{}, []),
14841486 ok = file :close (Fd ),
14851487 case Messages of
14861488 [] ->
@@ -1496,7 +1498,7 @@ scan_file_for_valid_messages(Path) ->
14961498 Reason }}
14971499 end .
14981500
1499- scan (Buffer , Fd , Offset , FileSize , MsgIdsFound , Acc ) ->
1501+ scan (Buffer , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc ) ->
15001502 case file :read (Fd , ? SCAN_BLOCK_SIZE ) of
15011503 eof ->
15021504 Acc ;
@@ -1505,12 +1507,12 @@ scan(Buffer, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
15051507 <<>> -> Data0 ;
15061508 _ -> <<Buffer /binary , Data0 /binary >>
15071509 end ,
1508- scan_data (Data , Fd , Offset , FileSize , MsgIdsFound , Acc )
1510+ scan_data (Data , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc )
15091511 end .
15101512
15111513% % Message might have been found.
15121514scan_data (<<Size :64 , MsgIdAndMsg :Size /binary , 255 , Rest /bits >> = Data ,
1513- Fd , Offset , FileSize , MsgIdsFound , Acc )
1515+ Fd , Fun , Offset , FileSize , MsgIdsFound , Acc )
15141516 when Size >= 16 ->
15151517 <<MsgIdInt :128 , _ /bits >> = MsgIdAndMsg ,
15161518 case MsgIdsFound of
@@ -1519,26 +1521,34 @@ scan_data(<<Size:64, MsgIdAndMsg:Size/binary, 255, Rest/bits>> = Data,
15191521 % % simply be a coincidence. Try the next byte.
15201522 #{MsgIdInt := true } ->
15211523 <<_ , Rest2 /bits >> = Data ,
1522- scan_data (Rest2 , Fd , Offset + 1 , FileSize , MsgIdsFound , Acc );
1524+ scan_data (Rest2 , Fd , Fun , Offset + 1 , FileSize , MsgIdsFound , Acc );
15231525 % % Data looks to be a message.
15241526 _ ->
15251527 % % Avoid sub-binary construction.
15261528 MsgId = <<MsgIdInt :128 >>,
15271529 TotalSize = Size + 9 ,
1528- scan_data (Rest , Fd , Offset + TotalSize , FileSize ,
1529- MsgIdsFound #{MsgIdInt => true },
1530- [{MsgId , TotalSize , Offset }|Acc ])
1530+ Tuple = {MsgId , TotalSize , Offset },
1531+ case Fun (Tuple ) of
1532+ % % Confirmed to be a message by the provided fun.
1533+ valid ->
1534+ scan_data (Rest , Fd , Fun , Offset + TotalSize , FileSize ,
1535+ MsgIdsFound #{MsgIdInt => true }, [Tuple |Acc ]);
1536+ % % Not a message, try the next byte.
1537+ invalid ->
1538+ <<_ , Rest2 /bits >> = Data ,
1539+ scan_data (Rest2 , Fd , Fun , Offset + 1 , FileSize , MsgIdsFound , Acc )
1540+ end
15311541 end ;
15321542% % This might be the start of a message.
1533- scan_data (<<Size :64 , Rest /bits >> = Data , Fd , Offset , FileSize , MsgIdsFound , Acc )
1543+ scan_data (<<Size :64 , Rest /bits >> = Data , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc )
15341544 when byte_size (Rest ) < Size + 1 , Size < FileSize - Offset ->
1535- scan (Data , Fd , Offset , FileSize , MsgIdsFound , Acc );
1536- scan_data (Data , Fd , Offset , FileSize , MsgIdsFound , Acc )
1545+ scan (Data , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc );
1546+ scan_data (Data , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc )
15371547 when byte_size (Data ) < 8 ->
1538- scan (Data , Fd , Offset , FileSize , MsgIdsFound , Acc );
1548+ scan (Data , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc );
15391549% % This is definitely not a message. Try the next byte.
1540- scan_data (<<_ , Rest /bits >>, Fd , Offset , FileSize , MsgIdsFound , Acc ) ->
1541- scan_data (Rest , Fd , Offset + 1 , FileSize , MsgIdsFound , Acc ).
1550+ scan_data (<<_ , Rest /bits >>, Fd , Fun , Offset , FileSize , MsgIdsFound , Acc ) ->
1551+ scan_data (Rest , Fd , Fun , Offset + 1 , FileSize , MsgIdsFound , Acc ).
15421552
15431553% %----------------------------------------------------------------------------
15441554% % Ets index
@@ -1742,39 +1752,37 @@ build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},
17421752
17431753build_index_worker (Gatherer , # msstate { index_ets = IndexEts , dir = Dir },
17441754 File , Files ) ->
1745- FileName = filenum_to_name (File ),
1755+ Path = form_filename ( Dir , filenum_to_name (File ) ),
17461756 rabbit_log :debug (" Rebuilding message location index from ~ts (~B file(s) remaining)" ,
1747- [form_filename ( Dir , FileName ) , length (Files )]),
1757+ [Path , length (Files )]),
17481758 % % The scan function already dealt with duplicate messages
1749- % % within the file. We then get messages in reverse order.
1750- {ok , Messages , FileSize } =
1751- scan_file_for_valid_messages (Dir , FileName ),
1752- % % Valid messages are in file order so the last message is
1753- % % the last message from the list.
1754- {ValidMessages , ValidTotalSize } =
1755- lists :foldl (
1756- fun (Obj = {MsgId , TotalSize , Offset }, {VMAcc , VTSAcc }) ->
1757- % % Fan-out may result in the same message data in multiple
1758- % % files so we have to guard against it.
1759- case index_lookup (IndexEts , MsgId ) of
1760- # msg_location { file = undefined } = StoreEntry ->
1761- ok = index_update (IndexEts , StoreEntry # msg_location {
1762- file = File , offset = Offset ,
1763- total_size = TotalSize }),
1764- {[Obj | VMAcc ], VTSAcc + TotalSize };
1765- _ ->
1766- {VMAcc , VTSAcc }
1767- end
1768- end , {[], 0 }, Messages ),
1759+ % % within the file, and only returns valid messages (we do
1760+ % % the index lookup in the fun). But we get messages in reverse order.
1761+ {ok , Messages , FileSize } = scan_file_for_valid_messages (Path ,
1762+ fun ({MsgId , TotalSize , Offset }) ->
1763+ % % Fan-out may result in the same message data in multiple
1764+ % % files so we have to guard against it.
1765+ case index_lookup (IndexEts , MsgId ) of
1766+ # msg_location { file = undefined } = StoreEntry ->
1767+ ok = index_update (IndexEts , StoreEntry # msg_location {
1768+ file = File , offset = Offset ,
1769+ total_size = TotalSize }),
1770+ valid ;
1771+ _ ->
1772+ invalid
1773+ end
1774+ end ),
1775+ ValidTotalSize = lists :foldl (fun ({_ , TotalSize , _ }, Acc ) -> Acc + TotalSize end , 0 , Messages ),
17691776 FileSize1 =
17701777 case Files of
17711778 % % if it's the last file, we'll truncate to remove any
17721779 % % rubbish above the last valid message. This affects the
17731780 % % file size.
1774- [] -> case ValidMessages of
1781+ [] -> case Messages of
17751782 [] -> 0 ;
1776- _ -> {_MsgId , TotalSize , Offset } =
1777- lists :last (ValidMessages ),
1783+ % % Messages is in reverse order so the first in the list
1784+ % % is the last message in the file.
1785+ [{_ , TotalSize , Offset }|_ ] ->
17781786 Offset + TotalSize
17791787 end ;
17801788 [_ |_ ] -> FileSize
@@ -1933,7 +1941,7 @@ compact_file(File, State = #gc_state { index_ets = IndexEts,
19331941 % % Load the messages. It's possible to get 0 messages here;
19341942 % % that's OK. That means we have little to do as the file is
19351943 % % about to be deleted.
1936- { Messages , _ } = scan_and_vacuum_message_file (File , State ),
1944+ Messages = scan_and_vacuum_message_file (File , State ),
19371945 % % Blank holes. We must do this first otherwise the file is left
19381946 % % with data that may confuse the code (for example data that looks
19391947 % % like a message, isn't a message, but spans over a real message).
@@ -2076,9 +2084,9 @@ truncate_file(File, Size, ThresholdTimestamp, #gc_state{ file_summary_ets = File
20762084
20772085-spec delete_file (non_neg_integer (), gc_state ()) -> ok | defer .
20782086
2079- delete_file (File , State = # gc_state { file_summary_ets = FileSummaryEts ,
2080- file_handles_ets = FileHandlesEts ,
2081- dir = Dir }) ->
2087+ delete_file (File , # gc_state { file_summary_ets = FileSummaryEts ,
2088+ file_handles_ets = FileHandlesEts ,
2089+ dir = Dir }) ->
20822090 case ets :match_object (FileHandlesEts , {{'_' , File }, '_' }, 1 ) of
20832091 {[_ |_ ], _Cont } ->
20842092 rabbit_log :debug (" Asked to delete file ~p but it has active readers. Deferring." ,
@@ -2087,7 +2095,8 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
20872095 _ ->
20882096 [# file_summary { valid_total_size = 0 ,
20892097 file_size = FileSize }] = ets :lookup (FileSummaryEts , File ),
2090- {[], 0 } = scan_and_vacuum_message_file (File , State ),
2098+ % % @todo What do?
2099+ % [] = scan_and_vacuum_message_file(File, State),
20912100 ok = file :delete (form_filename (Dir , filenum_to_name (File ))),
20922101 true = ets :delete (FileSummaryEts , File ),
20932102 rabbit_log :debug (" Deleted empty file number ~tp ; reclaimed ~tp bytes" , [File , FileSize ]),
@@ -2096,28 +2105,28 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
20962105
20972106scan_and_vacuum_message_file (File , # gc_state { index_ets = IndexEts , dir = Dir }) ->
20982107 % % Messages here will be end-of-file at start-of-list
2099- { ok , Messages , _FileSize } =
2100- scan_file_for_valid_messages (Dir , filenum_to_name ( File )) ,
2101- % % foldl will reverse so will end up with msgs in ascending offset order
2102- lists : foldl (
2103- fun ({ MsgId , TotalSize , Offset }, Acc = { List , Size }) ->
2104- case index_lookup ( IndexEts , MsgId ) of
2105- # msg_location { file = File , total_size = TotalSize ,
2106- offset = Offset , ref_count = 0 } = Entry ->
2107- index_delete_object ( IndexEts , Entry ) ,
2108- Acc ;
2109- # msg_location { file = File , total_size = TotalSize ,
2110- offset = Offset } = Entry ->
2111- {[ Entry | List ], TotalSize + Size };
2112- % % Fan-out may remove the entry but also write a new
2113- % % entry in a different file when it needs to write
2114- % % a message and the existing reference is in a file
2115- % % that's about to be deleted. So we explicitly accept
2116- % % these cases and ignore this message.
2117- # msg_location { file = OtherFile , total_size = TotalSize }
2118- when File =/= OtherFile ->
2119- Acc ;
2120- not_found ->
2121- Acc
2122- end
2123- end , {[], 0 }, Messages ).
2108+ Path = form_filename ( Dir , filenum_to_name ( File )),
2109+ { ok , Messages , _FileSize } = scan_file_for_valid_messages (Path ,
2110+ fun ({ MsgId , TotalSize , Offset }) ->
2111+ case index_lookup ( IndexEts , MsgId ) of
2112+ # msg_location { file = File , total_size = TotalSize ,
2113+ offset = Offset , ref_count = 0 } = Entry ->
2114+ index_delete_object ( IndexEts , Entry ) ,
2115+ invalid ;
2116+ # msg_location { file = File , total_size = TotalSize ,
2117+ offset = Offset } ->
2118+ valid ;
2119+ % % Fan-out may remove the entry but also write a new
2120+ % % entry in a different file when it needs to write
2121+ % % a message and the existing reference is in a file
2122+ % % that's about to be deleted. So we explicitly accept
2123+ % % these cases and ignore this message.
2124+ # msg_location { file = OtherFile , total_size = TotalSize }
2125+ when File =/= OtherFile ->
2126+ invalid ;
2127+ not_found ->
2128+ invalid
2129+ end
2130+ end ),
2131+ % % @todo Do we really need to reverse messages?
2132+ lists : reverse ( Messages ).
0 commit comments