7777 current_file ,
7878 % % current file handle since the last fsync?
7979 current_file_handle ,
80- % % file handle cache
80+ % % current write file offset
8181 current_file_offset ,
82+ % % messages that were potentially removed from the current write file
83+ current_file_removes = [],
8284 % % TRef for our interval timer
8385 sync_timer_ref ,
8486 % % files that had removes
@@ -1150,7 +1152,8 @@ write_message(MsgId, Msg, CRef,
11501152 end , CRef , State1 )
11511153 end .
11521154
1153- remove_message (MsgId , CRef , State = # msstate { index_ets = IndexEts }) ->
1155+ remove_message (MsgId , CRef , State = # msstate {
1156+ index_ets = IndexEts , current_file = CurrentFile }) ->
11541157 case should_mask_action (CRef , MsgId , State ) of
11551158 {true , _Location } ->
11561159 State ;
@@ -1162,22 +1165,32 @@ remove_message(MsgId, CRef, State = #msstate{ index_ets = IndexEts }) ->
11621165 % % ets:lookup(FileSummaryEts, File),
11631166 State ;
11641167 {_Mask , # msg_location { ref_count = RefCount , file = File ,
1165- total_size = TotalSize }}
1168+ total_size = TotalSize } = Entry }
11661169 when RefCount > 0 ->
11671170 % % only update field, otherwise bad interaction with
11681171 % % concurrent GC
1169- Dec = fun () -> index_update_ref_counter (IndexEts , MsgId , - 1 ) end ,
11701172 case RefCount of
1171- % % don 't remove from cur_file_cache_ets here because
1173+ % % Don 't remove from cur_file_cache_ets here because
11721174 % % there may be further writes in the mailbox for the
1173- % % same msg.
1174- 1 -> ok = Dec (),
1175- delete_file_if_empty (
1176- File , gc_candidate (File ,
1177- adjust_valid_total_size (
1178- File , - TotalSize , State )));
1179- _ -> ok = Dec (),
1180- gc_candidate (File , State )
1175+ % % same msg. We will remove 0 ref_counts when rolling
1176+ % % over to the next write file.
1177+ 1 when File =:= CurrentFile ->
1178+ index_update_ref_counter (IndexEts , MsgId , - 1 ),
1179+ State1 = State # msstate {current_file_removes =
1180+ [Entry # msg_location {ref_count = 0 }|Removes ]},
1181+ delete_file_if_empty (
1182+ File , gc_candidate (File ,
1183+ adjust_valid_total_size (
1184+ File , - TotalSize , State1 )));
1185+ 1 ->
1186+ index_delete (IndexEts , MsgId ),
1187+ delete_file_if_empty (
1188+ File , gc_candidate (File ,
1189+ adjust_valid_total_size (
1190+ File , - TotalSize , State )));
1191+ _ ->
1192+ index_update_ref_counter (IndexEts , MsgId , - 1 ),
1193+ gc_candidate (File , State )
11811194 end
11821195 end .
11831196
@@ -1239,7 +1252,9 @@ flush_or_roll_to_new_file(
12391252 cur_file_cache_ets = CurFileCacheEts ,
12401253 file_size_limit = FileSizeLimit })
12411254 when Offset >= FileSizeLimit ->
1242- State1 = internal_sync (State ),
1255+ % % Cleanup the index of messages that were removed before rolling over.
1256+ State0 = cleanup_index_on_roll_over (State ),
1257+ State1 = internal_sync (State0 ),
12431258 ok = writer_close (CurHdl ),
12441259 NextFile = CurFile + 1 ,
12451260 {ok , NextHdl } = writer_open (Dir , NextFile ),
@@ -1267,6 +1282,8 @@ write_large_message(MsgId, MsgBodyBin,
12671282 index_ets = IndexEts ,
12681283 file_summary_ets = FileSummaryEts ,
12691284 cur_file_cache_ets = CurFileCacheEts }) ->
1285+ % % Cleanup the index of messages that were removed before rolling over.
1286+ State1 = cleanup_index_on_roll_over (State0 ),
12701287 {LargeMsgFile , LargeMsgHdl } = case CurOffset of
12711288 % % We haven't written in the file yet. Use it.
12721289 0 ->
@@ -1286,13 +1303,13 @@ write_large_message(MsgId, MsgBodyBin,
12861303 ok = index_insert (IndexEts ,
12871304 # msg_location { msg_id = MsgId , ref_count = 1 , file = LargeMsgFile ,
12881305 offset = 0 , total_size = TotalSize }),
1289- State1 = case CurFile of
1306+ State2 = case CurFile of
12901307 % % We didn't open a new file. We must update the existing value.
12911308 LargeMsgFile ->
12921309 [_ ,_ ] = ets :update_counter (FileSummaryEts , LargeMsgFile ,
12931310 [{# file_summary .valid_total_size , TotalSize },
12941311 {# file_summary .file_size , TotalSize }]),
1295- State0 ;
1312+ State1 ;
12961313 % % We opened a new file. We can insert it all at once.
12971314 % % We must also check whether we need to delete the previous
12981315 % % current file, because if there is no valid data this is
@@ -1303,7 +1320,7 @@ write_large_message(MsgId, MsgBodyBin,
13031320 valid_total_size = TotalSize ,
13041321 file_size = TotalSize ,
13051322 locked = false }),
1306- delete_file_if_empty (CurFile , State0 # msstate { current_file_handle = LargeMsgHdl ,
1323+ delete_file_if_empty (CurFile , State1 # msstate { current_file_handle = LargeMsgHdl ,
13071324 current_file = LargeMsgFile ,
13081325 current_file_offset = TotalSize })
13091326 end ,
@@ -1318,11 +1335,21 @@ write_large_message(MsgId, MsgBodyBin,
13181335 % % Delete messages from the cache that were written to disk.
13191336 true = ets :match_delete (CurFileCacheEts , {'_' , '_' , 0 }),
13201337 % % Process confirms (this won't flush; we already did) and continue.
1321- State = internal_sync (State1 ),
1338+ State = internal_sync (State2 ),
13221339 State # msstate { current_file_handle = NextHdl ,
13231340 current_file = NextFile ,
13241341 current_file_offset = 0 }.
13251342
1343+ cleanup_index_on_roll_over (State = # msstate { index_ets = IndexEts ,
1344+ current_file = CurrentFile , current_file_removes = Removes }) ->
1345+ lists :foreach (fun (Entry ) ->
1346+ % % We delete objects that have ref_count=0. If a message
1347+ % % got its ref_count increased, it will not be deleted.
1348+ % % We thus avoid extra index lookups to check for ref_count.
1349+ index_delete_object (IndexEts , Entry )
1350+ end , Removes ),
1351+ State = # msstate {current_file_removes = []}.
1352+
13261353contains_message (MsgId , From , State = # msstate { index_ets = IndexEts }) ->
13271354 MsgLocation = index_lookup_positive_ref_count (IndexEts , MsgId ),
13281355 gen_server2 :reply (From , MsgLocation =/= not_found ),
@@ -2134,7 +2161,6 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
21342161 _ ->
21352162 [# file_summary { valid_total_size = 0 ,
21362163 file_size = FileSize }] = ets :lookup (FileSummaryEts , File ),
2137- [] = scan_and_vacuum_message_file (File , State ),
21382164 ok = file :delete (form_filename (Dir , filenum_to_name (File ))),
21392165 true = ets :delete (FileSummaryEts , File ),
21402166 rabbit_log :debug (" Deleted empty file number ~tp ; reclaimed ~tp bytes" , [File , FileSize ]),
0 commit comments