@@ -626,7 +626,8 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) ->
626626 {{Segment = # segment { unacked = UnackedCount }, Dirty },
627627 UnackedBytes } =
628628 recover_segment (ContainsCheckFun , CleanShutdown ,
629- segment_find_or_new (Seg , Dir , Segments2 )),
629+ segment_find_or_new (Seg , Dir , Segments2 ),
630+ State1 # qistate .max_journal_entries ),
630631 {segment_store (Segment , Segments2 ),
631632 CountAcc + UnackedCount ,
632633 BytesAcc + UnackedBytes , DirtyCount + Dirty }
@@ -650,7 +651,7 @@ terminate(State = #qistate { journal_handle = JournalHdl,
650651 segments = undefined }}.
651652
652653recover_segment (ContainsCheckFun , CleanShutdown ,
653- Segment = # segment { journal_entries = JEntries }) ->
654+ Segment = # segment { journal_entries = JEntries }, MaxJournal ) ->
654655 {SegEntries , UnackedCount } = load_segment (false , Segment ),
655656 {SegEntries1 , UnackedCountDelta } =
656657 segment_plus_journal (SegEntries , JEntries ),
@@ -659,7 +660,7 @@ recover_segment(ContainsCheckFun, CleanShutdown,
659660 {SegmentAndDirtyCount , Bytes }) ->
660661 {MsgOrId , MsgProps } = parse_pub_record_body (Bin , MsgBin ),
661662 {recover_message (ContainsCheckFun (MsgOrId ), CleanShutdown ,
662- Del , RelSeq , SegmentAndDirtyCount ),
663+ Del , RelSeq , SegmentAndDirtyCount , MaxJournal ),
663664 Bytes + case IsPersistent of
664665 true -> MsgProps # message_properties .size ;
665666 false -> 0
@@ -668,15 +669,16 @@ recover_segment(ContainsCheckFun, CleanShutdown,
668669 {{Segment # segment { unacked = UnackedCount + UnackedCountDelta }, 0 }, 0 },
669670 SegEntries1 ).
670671
671- recover_message ( true , true , _Del , _RelSeq , SegmentAndDirtyCount ) ->
672+ recover_message ( true , true , _Del , _RelSeq , SegmentAndDirtyCount , _MaxJournal ) ->
672673 SegmentAndDirtyCount ;
673- recover_message ( true , false , del , _RelSeq , SegmentAndDirtyCount ) ->
674+ recover_message ( true , false , del , _RelSeq , SegmentAndDirtyCount , _MaxJournal ) ->
674675 SegmentAndDirtyCount ;
675- recover_message ( true , false , no_del , RelSeq , {Segment , DirtyCount }) ->
676- {add_to_journal (RelSeq , del , Segment ), DirtyCount + 1 };
677- recover_message (false , _ , del , RelSeq , {Segment , DirtyCount }) ->
676+ recover_message ( true , false , no_del , RelSeq , {Segment , _DirtyCount }, MaxJournal ) ->
677+ % % force to flush the segment
678+ {add_to_journal (RelSeq , del , Segment ), MaxJournal + 1 };
679+ recover_message (false , _ , del , RelSeq , {Segment , DirtyCount }, _MaxJournal ) ->
678680 {add_to_journal (RelSeq , ack , Segment ), DirtyCount + 1 };
679- recover_message (false , _ , no_del , RelSeq , {Segment , DirtyCount }) ->
681+ recover_message (false , _ , no_del , RelSeq , {Segment , DirtyCount }, _MaxJournal ) ->
680682 {add_to_journal (RelSeq , ack ,
681683 add_to_journal (RelSeq , del , Segment )),
682684 DirtyCount + 2 }.
0 commit comments