2626
2727-include_lib (" rabbit_common/include/rabbit_msg_store.hrl" ).
2828
29- % % We flush to disk when the write buffer gets above the max size,
30- % % or at an interval to make sure we don't keep the data in memory
31- % % too long. Confirms are sent after the data is flushed to disk.
32- -define (HANDLE_CACHE_BUFFER_SIZE , 1048576 ). % % 1MB.
33- -define (SYNC_INTERVAL , 200 ). % % Milliseconds.
29+ % % We flush to disk at an interval to make sure we don't keep
30+ % % the data in memory too long. Confirms are sent after the
31+ % % data is flushed to disk.
32+ -define (SYNC_INTERVAL , 200 ). % % Milliseconds.
3433
3534-define (CLEAN_FILENAME , " clean.dot" ).
3635-define (FILE_SUMMARY_FILENAME , " file_summary.ets" ).
3736
38- -define (BINARY_MODE , [raw , binary ]).
39- -define (READ_MODE , [read ]).
40- -define (WRITE_MODE , [write ]).
41-
4237-define (FILE_EXTENSION , " .rdq" ).
43- -define (FILE_EXTENSION_TMP , " .rdt" ).
4438
4539% % We keep track of flying messages for writes and removes. The idea is that
4640% % when a remove comes in before we could process the write, we skip the
@@ -1575,24 +1569,22 @@ count_msg_refs(Gen, Seed, State) ->
15751569 case Gen (Seed ) of
15761570 finished ->
15771571 ok ;
1572+ % % @todo This is currently required by tests but can't happen otherwise?
15781573 {_MsgId , 0 , Next } ->
15791574 count_msg_refs (Gen , Next , State );
1580- {MsgId , Delta , Next } ->
1581- ok = case index_lookup (MsgId , State ) of
1582- not_found ->
1583- index_insert (# msg_location { msg_id = MsgId ,
1584- file = undefined ,
1585- ref_count = Delta },
1586- State );
1587- # msg_location { ref_count = RefCount } = StoreEntry ->
1588- NewRefCount = RefCount + Delta ,
1589- case NewRefCount of
1590- 0 -> index_delete (MsgId , State );
1591- _ -> index_update (StoreEntry # msg_location {
1592- ref_count = NewRefCount },
1593- State )
1594- end
1595- end ,
1575+ % % This clause is kept for v1 compatibility purposes.
1576+ % % It can be removed once we no longer support converting from v1 data.
1577+ {MsgId , 1 , Next } ->
1578+ % % @todo Remove index_module and avoid this element/2.
1579+ _ = ets :update_counter (element (2 , State # msstate .index_state ), MsgId , + 1 ,
1580+ # msg_location {msg_id = MsgId , file = undefined , ref_count = 1 }),
1581+ count_msg_refs (Gen , Next , State );
1582+ {MsgIds , Next } ->
1583+ lists :foreach (fun (MsgId ) ->
1584+ % % @todo Remove index_module and avoid this element/2.
1585+ ets :update_counter (element (2 , State # msstate .index_state ), MsgId , + 1 ,
1586+ # msg_location {msg_id = MsgId , file = undefined , ref_count = 1 })
1587+ end , MsgIds ),
15961588 count_msg_refs (Gen , Next , State )
15971589 end .
15981590
@@ -1621,15 +1613,17 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
16211613 FileName = filenum_to_name (File ),
16221614 rabbit_log :debug (" Rebuilding message location index from ~ts (~B file(s) remaining)" ,
16231615 [form_filename (Dir , FileName ), length (Files )]),
1624- {ok , Messages0 , FileSize } =
1616+ % % The scan function already dealt with duplicate messages
1617+ % % within the file. We then get messages in reverse order.
1618+ {ok , Messages , FileSize } =
16251619 scan_file_for_valid_messages (Dir , FileName ),
1626- % % The scan gives us messages end-of-file first so we reverse the list
1627- % % in case a compaction had occurred before shutdown to not have to repeat it.
1628- Messages = lists :reverse (Messages0 ),
1620+ % % Valid messages are in file order so the last message is
1621+ % % the last message from the list.
16291622 {ValidMessages , ValidTotalSize } =
16301623 lists :foldl (
16311624 fun (Obj = {MsgId , TotalSize , Offset }, {VMAcc , VTSAcc }) ->
1632- % % We only keep the first message in the file. Duplicates (due to compaction) get ignored.
1625+ % % Fan-out may result in the same message data in multiple
1626+ % % files so we have to guard against it.
16331627 case index_lookup (MsgId , State ) of
16341628 # msg_location { file = undefined } = StoreEntry ->
16351629 ok = index_update (StoreEntry # msg_location {
@@ -1649,7 +1643,7 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
16491643 [] -> case ValidMessages of
16501644 [] -> 0 ;
16511645 _ -> {_MsgId , TotalSize , Offset } =
1652- hd (ValidMessages ),
1646+ lists : last (ValidMessages ),
16531647 Offset + TotalSize
16541648 end ;
16551649 [_ |_ ] -> FileSize
0 commit comments