@@ -899,7 +899,7 @@ handle_cast({write, CRef, MsgRef, MsgId, Flow},
899899 % % or the non-current files. If the message *is* in the
900900 % % current file then the cache entry will be removed by
901901 % % the normal logic for that in write_message/4 and
902- % % maybe_roll_to_new_file /2.
902+ % % flush_or_roll_to_new_file /2.
903903 case index_lookup (MsgId , State ) of
904904 # msg_location { file = File }
905905 when File == State # msstate .current_file ->
@@ -1208,26 +1208,123 @@ gc_candidate(File, State = #msstate{ gc_candidates = Candidates,
12081208gc_candidate (File , State = # msstate { gc_candidates = Candidates }) ->
12091209 State # msstate { gc_candidates = Candidates #{ File => true }}.
12101210
1211- write_message (MsgId , Msg ,
1212- State0 = # msstate { current_file_handle = CurHdl ,
1213- current_file = CurFile ,
1214- current_file_offset = CurOffset ,
1215- file_summary_ets = FileSummaryEts }) ->
1216- {MaybeFlush , TotalSize } = writer_append (CurHdl , MsgId , Msg ),
1217- State = case MaybeFlush of
1218- flush -> internal_sync (State0 );
1219- ok -> State0
1220- end ,
1211+ % % This value must be smaller enough than ?SCAN_BLOCK_SIZE
1212+ % % to ensure we only ever need 2 reads when scanning files.
1213+ % % Hence the choice of 4MB here and 4MiB there, the difference
1214+ % % in size being more than enough to ensure that property.
1215+ -define (LARGE_MESSAGE_THRESHOLD , 4000000 ). % % 4MB.
1216+
1217+ write_message (MsgId , MsgBody , State ) ->
1218+ MsgBodyBin = term_to_binary (MsgBody ),
1219+ % % Large messages get written to their own files.
1220+ if
1221+ byte_size (MsgBodyBin ) >= ? LARGE_MESSAGE_THRESHOLD ->
1222+ write_large_message (MsgId , MsgBodyBin , State );
1223+ true ->
1224+ write_small_message (MsgId , MsgBodyBin , State )
1225+ end .
1226+
1227+ write_small_message (MsgId , MsgBodyBin ,
1228+ State = # msstate { current_file_handle = CurHdl ,
1229+ current_file = CurFile ,
1230+ current_file_offset = CurOffset ,
1231+ file_summary_ets = FileSummaryEts }) ->
1232+ {MaybeFlush , TotalSize } = writer_append (CurHdl , MsgId , MsgBodyBin ),
12211233 ok = index_insert (
12221234 # msg_location { msg_id = MsgId , ref_count = 1 , file = CurFile ,
12231235 offset = CurOffset , total_size = TotalSize }, State ),
12241236 [_ ,_ ] = ets :update_counter (FileSummaryEts , CurFile ,
12251237 [{# file_summary .valid_total_size , TotalSize },
12261238 {# file_summary .file_size , TotalSize }]),
1227- maybe_roll_to_new_file (CurOffset + TotalSize ,
1239+ flush_or_roll_to_new_file (CurOffset + TotalSize , MaybeFlush ,
12281240 State # msstate {
12291241 current_file_offset = CurOffset + TotalSize }).
12301242
1243+ flush_or_roll_to_new_file (
1244+ Offset , _MaybeFlush ,
1245+ State = # msstate { dir = Dir ,
1246+ current_file_handle = CurHdl ,
1247+ current_file = CurFile ,
1248+ file_summary_ets = FileSummaryEts ,
1249+ cur_file_cache_ets = CurFileCacheEts ,
1250+ file_size_limit = FileSizeLimit })
1251+ when Offset >= FileSizeLimit ->
1252+ State1 = internal_sync (State ),
1253+ ok = writer_close (CurHdl ),
1254+ NextFile = CurFile + 1 ,
1255+ {ok , NextHdl } = writer_open (Dir , NextFile ),
1256+ true = ets :insert_new (FileSummaryEts , # file_summary {
1257+ file = NextFile ,
1258+ valid_total_size = 0 ,
1259+ file_size = 0 ,
1260+ locked = false }),
1261+ % % Delete messages from the cache that were written to disk.
1262+ true = ets :match_delete (CurFileCacheEts , {'_' , '_' , 0 }),
1263+ State1 # msstate { current_file_handle = NextHdl ,
1264+ current_file = NextFile ,
1265+ current_file_offset = 0 };
1266+ % % If we need to flush, do so here.
1267+ flush_or_roll_to_new_file (_ , flush , State ) ->
1268+ internal_sync (State );
1269+ flush_or_roll_to_new_file (_ , _ , State ) ->
1270+ State .
1271+
1272+ write_large_message (MsgId , MsgBodyBin ,
1273+ State0 = # msstate { dir = Dir ,
1274+ current_file_handle = CurHdl ,
1275+ current_file = CurFile ,
1276+ current_file_offset = CurOffset ,
1277+ file_summary_ets = FileSummaryEts ,
1278+ cur_file_cache_ets = CurFileCacheEts }) ->
1279+ {LargeMsgFile , LargeMsgHdl } = case CurOffset of
1280+ % % We haven't written in the file yet. Use it.
1281+ 0 ->
1282+ {CurFile , CurHdl };
1283+ % % Flush the current file and close it. Open a new file.
1284+ _ ->
1285+ ok = writer_flush (CurHdl ),
1286+ ok = writer_close (CurHdl ),
1287+ LargeMsgFile0 = CurFile + 1 ,
1288+ {ok , LargeMsgHdl0 } = writer_open (Dir , LargeMsgFile0 ),
1289+ {LargeMsgFile0 , LargeMsgHdl0 }
1290+ end ,
1291+ % % Write the message directly and close the file.
1292+ TotalSize = writer_direct_write (LargeMsgHdl , MsgId , MsgBodyBin ),
1293+ ok = writer_close (LargeMsgHdl ),
1294+ % % Update ets with the new information.
1295+ ok = index_insert (
1296+ # msg_location { msg_id = MsgId , ref_count = 1 , file = LargeMsgFile ,
1297+ offset = 0 , total_size = TotalSize }, State0 ),
1298+ _ = case CurFile of
1299+ % % We didn't open a new file. We must update the existing value.
1300+ LargeMsgFile ->
1301+ [_ ,_ ] = ets :update_counter (FileSummaryEts , LargeMsgFile ,
1302+ [{# file_summary .valid_total_size , TotalSize },
1303+ {# file_summary .file_size , TotalSize }]);
1304+ % % We opened a new file. We can insert it all at once.
1305+ _ ->
1306+ true = ets :insert_new (FileSummaryEts , # file_summary {
1307+ file = LargeMsgFile ,
1308+ valid_total_size = TotalSize ,
1309+ file_size = TotalSize ,
1310+ locked = false })
1311+ end ,
1312+ % % Roll over to the next file.
1313+ NextFile = LargeMsgFile + 1 ,
1314+ {ok , NextHdl } = writer_open (Dir , NextFile ),
1315+ true = ets :insert_new (FileSummaryEts , # file_summary {
1316+ file = NextFile ,
1317+ valid_total_size = 0 ,
1318+ file_size = 0 ,
1319+ locked = false }),
1320+ % % Delete messages from the cache that were written to disk.
1321+ true = ets :match_delete (CurFileCacheEts , {'_' , '_' , 0 }),
1322+ % % Process confirms (this won't flush; we already did) and continue.
1323+ State = internal_sync (State0 ),
1324+ State # msstate { current_file_handle = NextHdl ,
1325+ current_file = NextFile ,
1326+ current_file_offset = 0 }.
1327+
12311328contains_message (MsgId , From , State ) ->
12321329 MsgLocation = index_lookup_positive_ref_count (MsgId , State ),
12331330 gen_server2 :reply (From , MsgLocation =/= not_found ),
@@ -1325,8 +1422,7 @@ writer_recover(Dir, Num, Offset) ->
13251422 ok = file :truncate (Fd ),
13261423 {ok , # writer {fd = Fd , buffer = prim_buffer :new ()}}.
13271424
1328- writer_append (# writer {buffer = Buffer }, MsgId , MsgBody ) ->
1329- MsgBodyBin = term_to_binary (MsgBody ),
1425+ writer_append (# writer {buffer = Buffer }, MsgId , MsgBodyBin ) ->
13301426 MsgBodyBinSize = byte_size (MsgBodyBin ),
13311427 EntrySize = MsgBodyBinSize + 16 , % % Size of MsgId + MsgBodyBin.
13321428 % % We send an iovec to the buffer instead of building a binary.
@@ -1354,6 +1450,21 @@ writer_flush(#writer{fd = Fd, buffer = Buffer}) ->
13541450 file :write (Fd , prim_buffer :read_iovec (Buffer , Size ))
13551451 end .
13561452
1453+ % % For large messages we don't buffer anything. Large messages
1454+ % % are kept within their own files.
1455+ % %
1456+ % % This is basically the same as writer_append except no buffering.
1457+ writer_direct_write (# writer {fd = Fd }, MsgId , MsgBodyBin ) ->
1458+ MsgBodyBinSize = byte_size (MsgBodyBin ),
1459+ EntrySize = MsgBodyBinSize + 16 , % % Size of MsgId + MsgBodyBin.
1460+ ok = file :write (Fd , [
1461+ <<EntrySize :64 >>,
1462+ MsgId ,
1463+ MsgBodyBin ,
1464+ <<255 >> % % OK marker.
1465+ ]),
1466+ EntrySize + 9 .
1467+
13571468writer_close (# writer {fd = Fd }) ->
13581469 file :close (Fd ).
13591470
@@ -1700,33 +1811,6 @@ rebuild_index(Gatherer, Files, State) ->
17001811% % garbage collection / compaction / aggregation -- internal
17011812% %----------------------------------------------------------------------------
17021813
1703- maybe_roll_to_new_file (
1704- Offset ,
1705- State = # msstate { dir = Dir ,
1706- current_file_handle = CurHdl ,
1707- current_file = CurFile ,
1708- file_summary_ets = FileSummaryEts ,
1709- cur_file_cache_ets = CurFileCacheEts ,
1710- file_size_limit = FileSizeLimit })
1711- when Offset >= FileSizeLimit ->
1712- State1 = internal_sync (State ),
1713- ok = writer_close (CurHdl ),
1714- NextFile = CurFile + 1 ,
1715- {ok , NextHdl } = writer_open (Dir , NextFile ),
1716- true = ets :insert_new (FileSummaryEts , # file_summary {
1717- file = NextFile ,
1718- valid_total_size = 0 ,
1719- file_size = 0 ,
1720- locked = false }),
1721- % % We only delete messages from the cache that were written to disk
1722- % % in the previous file.
1723- true = ets :match_delete (CurFileCacheEts , {'_' , '_' , 0 }),
1724- State1 # msstate { current_file_handle = NextHdl ,
1725- current_file = NextFile ,
1726- current_file_offset = 0 };
1727- maybe_roll_to_new_file (_ , State ) ->
1728- State .
1729-
17301814% % We keep track of files that have seen removes and
17311815% % check those periodically for compaction. We only
17321816% % compact files that have less than half valid data.
0 commit comments