@@ -528,7 +528,7 @@ terminate(VHost, Terms, State0 = #qi { dir = Dir,
528528 fds = OpenFds }) ->
529529 ? DEBUG (" ~0p ~0p ~0p " , [VHost , Terms , State0 ]),
530530 % % Flush the buffer.
531- State = flush_buffer (State0 , full ),
531+ State = flush_buffer (State0 , full , segment_entry_count () ),
532532 % % Fsync and close all FDs.
533533 _ = maps :map (fun (_ , Fd ) ->
534534 ok = file :sync (Fd ),
@@ -575,22 +575,21 @@ publish(MsgId, SeqId, Location, Props, IsPersistent, TargetRamCount,
575575 SegmentEntryCount = segment_entry_count (),
576576 ThisSegment = SeqId div SegmentEntryCount ,
577577 State2 = case maps :get (ThisSegment , Segments , undefined ) of
578- undefined -> new_segment_file (ThisSegment , State1 );
578+ undefined -> new_segment_file (ThisSegment , SegmentEntryCount , State1 );
579579 ThisSegmentCount -> State1 # qi { segments = Segments #{ ThisSegment => ThisSegmentCount + 1 }}
580580 end ,
581581 % % When publisher confirms have been requested for this
582582 % % message we mark the message as unconfirmed.
583583 State = maybe_mark_unconfirmed (MsgId , Props , State2 ),
584- maybe_flush_buffer (State ).
584+ maybe_flush_buffer (State , SegmentEntryCount ).
585585
586- new_segment_file (Segment , State = # qi { segments = Segments }) ->
586+ new_segment_file (Segment , SegmentEntryCount , State = # qi { segments = Segments }) ->
587587 # qi { fds = OpenFds } = reduce_fd_usage (Segment , State ),
588588 false = maps :is_key (Segment , OpenFds ), % % assert
589589 {ok , Fd } = file :open (segment_file (Segment , State ), [read , write , raw , binary ]),
590590 % % We must preallocate space for the file. We want the space
591591 % % to be allocated to not have to worry about errors when
592592 % % writing later on.
593- SegmentEntryCount = segment_entry_count (),
594593 Size = ? HEADER_SIZE + SegmentEntryCount * ? ENTRY_SIZE ,
595594 case file :allocate (Fd , 0 , Size ) of
596595 ok ->
@@ -677,28 +676,24 @@ maybe_mark_unconfirmed(MsgId, #message_properties{ needs_confirming = true },
677676maybe_mark_unconfirmed (_ , _ , State ) ->
678677 State .
679678
680- % % @todo Perhaps make the two limits configurable. Also refine the default.
681679maybe_flush_buffer (State = # qi { write_buffer = WriteBuffer ,
682- write_buffer_updates = NumUpdates }) ->
680+ write_buffer_updates = NumUpdates },
681+ SegmentEntryCount ) ->
683682 if
684- % % When we have at least 100 entries, we always want to flush,
685- % % in order to limit the memory usage and avoid losing too much
686- % % data on crashes.
687- (map_size (WriteBuffer ) - NumUpdates ) >= 100 ->
688- flush_buffer (State , full );
689- % % We may want to flush updates (acks) when
690- % % too many have accumulated.
691- NumUpdates >= 2000 ->
692- flush_buffer (State , updates );
683+ % % We only flush updates (acks) when too many have accumulated.
684+ NumUpdates >= (SegmentEntryCount div 2 ) ->
685+ flush_buffer (State , updates , SegmentEntryCount );
686+ % % We flush when the write buffer exceeds the size of a segment.
687+ map_size (WriteBuffer ) >= SegmentEntryCount ->
688+ flush_buffer (State , full , SegmentEntryCount );
693689 % % Otherwise we do not flush this time.
694690 true ->
695691 State
696692 end .
697693
698694flush_buffer (State0 = # qi { write_buffer = WriteBuffer0 ,
699695 cache = Cache0 },
700- FlushType ) ->
701- SegmentEntryCount = segment_entry_count (),
696+ FlushType , SegmentEntryCount ) ->
702697 % % First we prepare the writes sorted by segment.
703698 {Writes , WriteBuffer , AcksToCache } = maps :fold (fun
704699 (SeqId , ack , {WritesAcc , BufferAcc , AcksAcc }) when FlushType =:= updates ->
@@ -813,10 +808,11 @@ ack(SeqIds, State0 = #qi{ write_buffer = WriteBuffer0,
813808 write_buffer_updates = NumUpdates0 ,
814809 cache = Cache0 }) ->
815810 ? DEBUG (" ~0p ~0p " , [SeqIds , State0 ]),
811+ SegmentEntryCount = segment_entry_count (),
816812 % % We start by updating the ack state information. We then
817813 % % use this information to delete segment files on disk that
818814 % % were fully acked.
819- {Deletes , State1 } = update_ack_state (SeqIds , State0 ),
815+ {Deletes , State1 } = update_ack_state (SeqIds , SegmentEntryCount , State0 ),
820816 State = delete_segments (Deletes , State1 ),
821817 % % We add acks to the write buffer. We take special care not to add
822818 % % acks for segments that have been deleted. We do this using second
@@ -831,16 +827,16 @@ ack(SeqIds, State0 = #qi{ write_buffer = WriteBuffer0,
831827 _ ->
832828 {WriteBuffer2 , NumUpdates2 , _ , _ } = maps :fold (
833829 fun ack_delete_fold_fun /3 ,
834- {#{}, 0 , Deletes , segment_entry_count () },
830+ {#{}, 0 , Deletes , SegmentEntryCount },
835831 WriteBuffer1 ),
836832 {WriteBuffer2 , NumUpdates2 }
837833 end ,
838834 {Deletes , maybe_flush_buffer (State # qi { write_buffer = WriteBuffer ,
839835 write_buffer_updates = NumUpdates ,
840- cache = Cache })}.
836+ cache = Cache },
837+ SegmentEntryCount )}.
841838
842- update_ack_state (SeqIds , State = # qi { segments = Segments0 }) ->
843- SegmentEntryCount = segment_entry_count (),
839+ update_ack_state (SeqIds , SegmentEntryCount , State = # qi { segments = Segments0 }) ->
844840 {Delete , Segments } = lists :foldl (fun (SeqId , {DeleteAcc , Segments1 }) ->
845841 Segment = SeqId div SegmentEntryCount ,
846842 case Segments1 of
@@ -1054,7 +1050,7 @@ parse_entries(<< Status:8,
10541050sync (State0 = # qi { confirms = Confirms ,
10551051 on_sync = OnSyncFun }) ->
10561052 ? DEBUG (" ~0p " , [State0 ]),
1057- State = flush_buffer (State0 , full ),
1053+ State = flush_buffer (State0 , full , segment_entry_count () ),
10581054 _ = case gb_sets :is_empty (Confirms ) of
10591055 true ->
10601056 ok ;
0 commit comments