@@ -505,7 +505,10 @@ queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) ->
505505 [begin
506506 ok = gatherer :fork (Gatherer ),
507507 ok = worker_pool :submit_async (
508- fun () -> queue_index_walker_reader (QueueName , Gatherer )
508+ fun () -> link (Gatherer ),
509+ ok = queue_index_walker_reader (QueueName , Gatherer ),
510+ unlink (Gatherer ),
511+ ok
509512 end )
510513 end || QueueName <- DurableQueues ],
511514 queue_index_walker ({next , Gatherer });
@@ -837,13 +840,16 @@ segment_entries_foldr(Fun, Init,
837840% %
838841% % Does not do any combining with the journal at all.
839842load_segment (KeepAcked , # segment { path = Path }) ->
843+ Empty = {array_new (), 0 },
840844 case rabbit_file :is_file (Path ) of
841- false -> { array_new (), 0 } ;
845+ false -> Empty ;
842846 true -> {ok , Hdl } = file_handle_cache :open (Path , ? READ_AHEAD_MODE , []),
843847 {ok , 0 } = file_handle_cache :position (Hdl , bof ),
844- {ok , SegData } = file_handle_cache :read (
845- Hdl , ? SEGMENT_TOTAL_SIZE ),
846- Res = load_segment_entries (KeepAcked , SegData , array_new (), 0 ),
848+ Res = case file_handle_cache :read (Hdl , ? SEGMENT_TOTAL_SIZE ) of
849+ {ok , SegData } -> load_segment_entries (
850+ KeepAcked , SegData , Empty );
851+ eof -> Empty
852+ end ,
847853 ok = file_handle_cache :close (Hdl ),
848854 Res
849855 end .
@@ -853,15 +859,15 @@ load_segment_entries(KeepAcked,
853859 IsPersistentNum :1 , RelSeq :? REL_SEQ_BITS ,
854860 PubRecordBody :? PUB_RECORD_BODY_BYTES /binary ,
855861 SegData /binary >>,
856- SegEntries , UnackedCount ) ->
862+ { SegEntries , UnackedCount } ) ->
857863 {MsgId , MsgProps } = parse_pub_record_body (PubRecordBody ),
858864 Obj = {{MsgId , MsgProps , 1 == IsPersistentNum }, no_del , no_ack },
859865 SegEntries1 = array :set (RelSeq , Obj , SegEntries ),
860- load_segment_entries (KeepAcked , SegData , SegEntries1 , UnackedCount + 1 );
866+ load_segment_entries (KeepAcked , SegData , { SegEntries1 , UnackedCount + 1 } );
861867load_segment_entries (KeepAcked ,
862868 <<? REL_SEQ_ONLY_PREFIX :? REL_SEQ_ONLY_PREFIX_BITS ,
863869 RelSeq :? REL_SEQ_BITS , SegData /binary >>,
864- SegEntries , UnackedCount ) ->
870+ { SegEntries , UnackedCount } ) ->
865871 {UnackedCountDelta , SegEntries1 } =
866872 case array :get (RelSeq , SegEntries ) of
867873 {Pub , no_del , no_ack } ->
@@ -871,10 +877,10 @@ load_segment_entries(KeepAcked,
871877 {_Pub , del , no_ack } ->
872878 {- 1 , array :reset (RelSeq , SegEntries )}
873879 end ,
874- load_segment_entries (KeepAcked , SegData , SegEntries1 ,
875- UnackedCount + UnackedCountDelta );
876- load_segment_entries (_KeepAcked , _SegData , SegEntries , UnackedCount ) ->
877- { SegEntries , UnackedCount } .
880+ load_segment_entries (KeepAcked , SegData ,
881+ { SegEntries1 , UnackedCount + UnackedCountDelta } );
882+ load_segment_entries (_KeepAcked , _SegData , Res ) ->
883+ Res .
878884
879885array_new () ->
880886 array :new ([{default , undefined }, fixed , {size , ? SEGMENT_ENTRY_COUNT }]).
0 commit comments