@@ -640,7 +640,8 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) ->
640640 {{Segment = # segment { unacked = UnackedCount }, Dirty },
641641 UnackedBytes } =
642642 recover_segment (ContainsCheckFun , CleanShutdown ,
643- segment_find_or_new (Seg , Dir , Segments2 )),
643+ segment_find_or_new (Seg , Dir , Segments2 ),
644+ State1 # qistate .max_journal_entries ),
644645 {segment_store (Segment , Segments2 ),
645646 CountAcc + UnackedCount ,
646647 BytesAcc + UnackedBytes , DirtyCount + Dirty }
@@ -664,7 +665,7 @@ terminate(State = #qistate { journal_handle = JournalHdl,
664665 segments = undefined }}.
665666
666667recover_segment (ContainsCheckFun , CleanShutdown ,
667- Segment = # segment { journal_entries = JEntries }) ->
668+ Segment = # segment { journal_entries = JEntries }, MaxJournal ) ->
668669 {SegEntries , UnackedCount } = load_segment (false , Segment ),
669670 {SegEntries1 , UnackedCountDelta } =
670671 segment_plus_journal (SegEntries , JEntries ),
@@ -673,7 +674,7 @@ recover_segment(ContainsCheckFun, CleanShutdown,
673674 {SegmentAndDirtyCount , Bytes }) ->
674675 {MsgOrId , MsgProps } = parse_pub_record_body (Bin , MsgBin ),
675676 {recover_message (ContainsCheckFun (MsgOrId ), CleanShutdown ,
676- Del , RelSeq , SegmentAndDirtyCount ),
677+ Del , RelSeq , SegmentAndDirtyCount , MaxJournal ),
677678 Bytes + case IsPersistent of
678679 true -> MsgProps # message_properties .size ;
679680 false -> 0
@@ -682,15 +683,16 @@ recover_segment(ContainsCheckFun, CleanShutdown,
682683 {{Segment # segment { unacked = UnackedCount + UnackedCountDelta }, 0 }, 0 },
683684 SegEntries1 ).
684685
685- recover_message ( true , true , _Del , _RelSeq , SegmentAndDirtyCount ) ->
686+ recover_message ( true , true , _Del , _RelSeq , SegmentAndDirtyCount , _MaxJournal ) ->
686687 SegmentAndDirtyCount ;
687- recover_message ( true , false , del , _RelSeq , SegmentAndDirtyCount ) ->
688+ recover_message ( true , false , del , _RelSeq , SegmentAndDirtyCount , _MaxJournal ) ->
688689 SegmentAndDirtyCount ;
689- recover_message ( true , false , no_del , RelSeq , {Segment , DirtyCount }) ->
690- {add_to_journal (RelSeq , del , Segment ), DirtyCount + 1 };
691- recover_message (false , _ , del , RelSeq , {Segment , DirtyCount }) ->
690+ recover_message ( true , false , no_del , RelSeq , {Segment , _DirtyCount }, MaxJournal ) ->
691+ % % force to flush the segment
692+ {add_to_journal (RelSeq , del , Segment ), MaxJournal + 1 };
693+ recover_message (false , _ , del , RelSeq , {Segment , DirtyCount }, _MaxJournal ) ->
692694 {add_to_journal (RelSeq , ack , Segment ), DirtyCount + 1 };
693- recover_message (false , _ , no_del , RelSeq , {Segment , DirtyCount }) ->
695+ recover_message (false , _ , no_del , RelSeq , {Segment , DirtyCount }, _MaxJournal ) ->
694696 {add_to_journal (RelSeq , ack ,
695697 add_to_journal (RelSeq , del , Segment )),
696698 DirtyCount + 2 }.
0 commit comments