@@ -611,17 +611,13 @@ ack(AckTags, State) ->
611611 ack_out_counter = AckOutCount + length (AckTags ) })}.
612612
613613requeue (AckTags , # vqstate { q_head = QHead0 ,
614- q_tail = QTail ,
615- in_counter = InCounter } = State ) ->
616- {SeqIds , QHead , MsgIds , State1 } = requeue_merge (lists :sort (AckTags ), QHead0 , [],
617- q_tail_limit (QTail ), State ),
618- {QTail1 , MsgIds1 , State2 } = q_tail_merge (SeqIds , QTail , MsgIds , State1 ),
619- MsgCount = length (MsgIds1 ),
620- {MsgIds1 , a (
621- maybe_update_rates (
622- State2 # vqstate { q_head = QHead ,
623- q_tail = QTail1 ,
624- in_counter = InCounter + MsgCount }))}.
614+ in_counter = InCounter } = State0 ) ->
615+ {QHead , MsgIds , State } = requeue_merge (lists :sort (AckTags ), QHead0 , [], State0 ),
616+ MsgCount = length (MsgIds ),
617+ {MsgIds , a (maybe_update_rates (State # vqstate {
618+ q_head = QHead ,
619+ in_counter = InCounter + MsgCount
620+ }))}.
625621
626622ackfold (MsgFun , Acc , State , AckTags ) ->
627623 {AccN , StateN } =
@@ -1206,17 +1202,15 @@ stats_acked_pending(MS = #msg_status{is_persistent = false}, St) ->
12061202 St # vqstate {? UP (unacked_bytes , ram_bytes , - msg_size (MS ))}.
12071203
12081204% % Notice that this is the reverse of stats_pending_acks.
1205+ % % Note that messages are always requeued to memory in the current
1206+ % % implementation because they are necessarily at the front of the
1207+ % % queue which is in memory.
12091208stats_requeued_memory (MS = # msg_status {msg = undefined }, St ) ->
12101209 St # vqstate {? UP (bytes , + msg_size (MS )), ? UP (unacked_bytes , - msg_size (MS ))};
12111210stats_requeued_memory (MS , St ) ->
12121211 St # vqstate {? UP (ram_msg_count , + 1 ),
12131212 ? UP (bytes , + msg_size (MS )), ? UP (unacked_bytes , - msg_size (MS ))}.
12141213
1215- stats_requeued_disk (MS = # msg_status {is_persistent = true }, St ) ->
1216- St # vqstate {? UP (bytes , + msg_size (MS )), ? UP (unacked_bytes , - msg_size (MS ))};
1217- stats_requeued_disk (MS = # msg_status {is_persistent = false }, St ) ->
1218- St # vqstate {? UP (unacked_bytes , - msg_size (MS ))}.
1219-
12201214msg_size (# msg_status {msg_props = # message_properties {size = Size }}) -> Size .
12211215
12221216msg_in_ram (# msg_status {msg = Msg }) -> Msg =/= undefined .
@@ -1807,46 +1801,27 @@ msgs_written_to_disk(Callback, MsgIdSet, written) ->
18071801% %----------------------------------------------------------------------------
18081802
18091803% % Rebuild queue, inserting sequence ids to maintain ordering
1810- requeue_merge (SeqIds , Q , MsgIds , Limit , State ) ->
1811- requeue_merge (SeqIds , Q , ? QUEUE :new (), MsgIds ,
1812- Limit , State ).
1804+ requeue_merge (SeqIds , Q , MsgIds , State ) ->
1805+ requeue_merge (SeqIds , Q , ? QUEUE :new (), MsgIds , State ).
18131806
1814- requeue_merge ([SeqId | Rest ] = SeqIds , Q , Front , MsgIds ,
1815- Limit , State )
1816- when Limit == undefined orelse SeqId < Limit ->
1807+ requeue_merge ([SeqId | Rest ] = SeqIds , Q , Front , MsgIds , State ) ->
18171808 case ? QUEUE :out (Q ) of
18181809 {{value , # msg_status { seq_id = SeqIdQ } = MsgStatus }, Q1 }
18191810 when SeqIdQ < SeqId ->
18201811 % % enqueue from the remaining queue
1821- requeue_merge (SeqIds , Q1 , ? QUEUE :in (MsgStatus , Front ), MsgIds ,
1822- Limit , State );
1812+ requeue_merge (SeqIds , Q1 , ? QUEUE :in (MsgStatus , Front ), MsgIds , State );
18231813 {_ , _Q1 } ->
18241814 % % enqueue from the remaining list of sequence ids
18251815 case msg_from_pending_ack (SeqId , State ) of
18261816 {none , _ } ->
1827- requeue_merge (Rest , Q , Front , MsgIds , Limit , State );
1817+ requeue_merge (Rest , Q , Front , MsgIds , State );
18281818 {# msg_status { msg_id = MsgId } = MsgStatus , State1 } ->
18291819 State2 = stats_requeued_memory (MsgStatus , State1 ),
1830- requeue_merge (Rest , Q , ? QUEUE :in (MsgStatus , Front ), [MsgId | MsgIds ],
1831- Limit , State2 )
1820+ requeue_merge (Rest , Q , ? QUEUE :in (MsgStatus , Front ), [MsgId | MsgIds ], State2 )
18321821 end
18331822 end ;
1834- requeue_merge (SeqIds , Q , Front , MsgIds ,
1835- _Limit , State ) ->
1836- {SeqIds , ? QUEUE :join (Front , Q ), MsgIds , State }.
1837-
1838- q_tail_merge ([], QTail , MsgIds , State ) ->
1839- {QTail , MsgIds , State };
1840- q_tail_merge (SeqIds , QTail , MsgIds , State ) ->
1841- lists :foldl (fun (SeqId , {QTail0 , MsgIds0 , State0 } = Acc ) ->
1842- case msg_from_pending_ack (SeqId , State0 ) of
1843- {none , _ } ->
1844- Acc ;
1845- {# msg_status { msg_id = MsgId } = MsgStatus , State1 } ->
1846- {expand_q_tail (SeqId , QTail0 ), [MsgId | MsgIds0 ],
1847- stats_requeued_disk (MsgStatus , State1 )}
1848- end
1849- end , {QTail , MsgIds , State }, SeqIds ).
1823+ requeue_merge ([], Q , Front , MsgIds , State ) ->
1824+ {? QUEUE :join (Front , Q ), MsgIds , State }.
18501825
18511826% % Mostly opposite of record_pending_ack/2
18521827msg_from_pending_ack (SeqId , State ) ->
@@ -1859,9 +1834,6 @@ msg_from_pending_ack(SeqId, State) ->
18591834 State1 }
18601835 end .
18611836
1862- q_tail_limit (? BLANK_Q_TAIL_PATTERN (_ )) -> undefined ;
1863- q_tail_limit (# q_tail { start_seq_id = StartSeqId }) -> StartSeqId .
1864-
18651837% %----------------------------------------------------------------------------
18661838% % Phase changes
18671839% %----------------------------------------------------------------------------
0 commit comments