@@ -653,34 +653,41 @@ ack([], State) ->
653653% % optimisation: this head is essentially a partial evaluation of the
654654% % general case below, for the single-ack case.
655655ack ([SeqId ], State ) ->
656- {# msg_status { msg_id = MsgId ,
657- is_persistent = IsPersistent ,
658- msg_in_store = MsgInStore ,
659- index_on_disk = IndexOnDisk },
660- State1 = # vqstate { index_state = IndexState ,
661- msg_store_clients = MSCState ,
662- ack_out_counter = AckOutCount }} =
663- remove_pending_ack (true , SeqId , State ),
664- IndexState1 = case IndexOnDisk of
665- true -> rabbit_queue_index :ack ([SeqId ], IndexState );
666- false -> IndexState
667- end ,
668- case MsgInStore of
669- true -> ok = msg_store_remove (MSCState , IsPersistent , [MsgId ]);
670- false -> ok
671- end ,
672- {[MsgId ],
673- a (State1 # vqstate { index_state = IndexState1 ,
674- ack_out_counter = AckOutCount + 1 })};
656+ case remove_pending_ack (true , SeqId , State ) of
657+ {none , _ } ->
658+ State ;
659+ {# msg_status { msg_id = MsgId ,
660+ is_persistent = IsPersistent ,
661+ msg_in_store = MsgInStore ,
662+ index_on_disk = IndexOnDisk },
663+ State1 = # vqstate { index_state = IndexState ,
664+ msg_store_clients = MSCState ,
665+ ack_out_counter = AckOutCount }} ->
666+ IndexState1 = case IndexOnDisk of
667+ true -> rabbit_queue_index :ack ([SeqId ], IndexState );
668+ false -> IndexState
669+ end ,
670+ case MsgInStore of
671+ true -> ok = msg_store_remove (MSCState , IsPersistent , [MsgId ]);
672+ false -> ok
673+ end ,
674+ {[MsgId ],
675+ a (State1 # vqstate { index_state = IndexState1 ,
676+ ack_out_counter = AckOutCount + 1 })}
677+ end ;
675678ack (AckTags , State ) ->
676679 {{IndexOnDiskSeqIds , MsgIdsByStore , AllMsgIds },
677680 State1 = # vqstate { index_state = IndexState ,
678681 msg_store_clients = MSCState ,
679682 ack_out_counter = AckOutCount }} =
680683 lists :foldl (
681684 fun (SeqId , {Acc , State2 }) ->
682- {MsgStatus , State3 } = remove_pending_ack (true , SeqId , State2 ),
683- {accumulate_ack (MsgStatus , Acc ), State3 }
685+ case remove_pending_ack (true , SeqId , State2 ) of
686+ {none , _ } ->
687+ {Acc , State2 };
688+ {MsgStatus , State3 } ->
689+ {accumulate_ack (MsgStatus , Acc ), State3 }
690+ end
684691 end , {accumulate_ack_init (), State }, AckTags ),
685692 IndexState1 = rabbit_queue_index :ack (IndexOnDiskSeqIds , IndexState ),
686693 remove_msgs_by_id (MsgIdsByStore , MSCState ),
@@ -1998,8 +2005,12 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
19982005
19992006% % First parameter = UpdateStats
20002007remove_pending_ack (true , SeqId , State ) ->
2001- {MsgStatus , State1 } = remove_pending_ack (false , SeqId , State ),
2002- {MsgStatus , stats ({0 , - 1 }, {MsgStatus , none }, State1 )};
2008+ case remove_pending_ack (false , SeqId , State ) of
2009+ {none , _ } ->
2010+ {none , State };
2011+ {MsgStatus , State1 } ->
2012+ {MsgStatus , stats ({0 , - 1 }, {MsgStatus , none }, State1 )}
2013+ end ;
20032014remove_pending_ack (false , SeqId , State = # vqstate {ram_pending_ack = RPA ,
20042015 disk_pending_ack = DPA ,
20052016 qi_pending_ack = QPA }) ->
@@ -2011,9 +2022,13 @@ remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
20112022 DPA1 = gb_trees :delete (SeqId , DPA ),
20122023 {V , State # vqstate {disk_pending_ack = DPA1 }};
20132024 none ->
2014- QPA1 = gb_trees :delete (SeqId , QPA ),
2015- {gb_trees :get (SeqId , QPA ),
2016- State # vqstate {qi_pending_ack = QPA1 }}
2025+ case gb_trees :lookup (SeqId , QPA ) of
2026+ {value , V } ->
2027+ QPA1 = gb_trees :delete (SeqId , QPA ),
2028+ {V , State # vqstate {qi_pending_ack = QPA1 }};
2029+ none ->
2030+ {none , State }
2031+ end
20172032 end
20182033 end .
20192034
@@ -2164,11 +2179,15 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
21642179 Limit , PubFun , State );
21652180 {_ , _Q1 } ->
21662181 % % enqueue from the remaining list of sequence ids
2167- {MsgStatus , State1 } = msg_from_pending_ack (SeqId , State ),
2168- {# msg_status { msg_id = MsgId } = MsgStatus1 , State2 } =
2169- PubFun (MsgStatus , State1 ),
2170- queue_merge (Rest , Q , ? QUEUE :in (MsgStatus1 , Front ), [MsgId | MsgIds ],
2171- Limit , PubFun , State2 )
2182+ case msg_from_pending_ack (SeqId , State ) of
2183+ {none , _ } ->
2184+ queue_merge (Rest , Q , Front , MsgIds , Limit , PubFun , State );
2185+ {MsgStatus , State1 } ->
2186+ {# msg_status { msg_id = MsgId } = MsgStatus1 , State2 } =
2187+ PubFun (MsgStatus , State1 ),
2188+ queue_merge (Rest , Q , ? QUEUE :in (MsgStatus1 , Front ), [MsgId | MsgIds ],
2189+ Limit , PubFun , State2 )
2190+ end
21722191 end ;
21732192queue_merge (SeqIds , Q , Front , MsgIds ,
21742193 _Limit , _PubFun , State ) ->
@@ -2177,22 +2196,28 @@ queue_merge(SeqIds, Q, Front, MsgIds,
21772196delta_merge ([], Delta , MsgIds , State ) ->
21782197 {Delta , MsgIds , State };
21792198delta_merge (SeqIds , Delta , MsgIds , State ) ->
2180- lists :foldl (fun (SeqId , {Delta0 , MsgIds0 , State0 }) ->
2181- {# msg_status { msg_id = MsgId } = MsgStatus , State1 } =
2182- msg_from_pending_ack (SeqId , State0 ),
2183- {_MsgStatus , State2 } =
2184- maybe_prepare_write_to_disk (true , true , MsgStatus , State1 ),
2185- {expand_delta (SeqId , Delta0 ), [MsgId | MsgIds0 ],
2186- stats ({1 , - 1 }, {MsgStatus , none }, State2 )}
2199+ lists :foldl (fun (SeqId , {Delta0 , MsgIds0 , State0 } = Acc ) ->
2200+ case msg_from_pending_ack (SeqId , State0 ) of
2201+ {none , _ } ->
2202+ Acc ;
2203+ {# msg_status { msg_id = MsgId } = MsgStatus , State1 } ->
2204+ {_MsgStatus , State2 } =
2205+ maybe_prepare_write_to_disk (true , true , MsgStatus , State1 ),
2206+ {expand_delta (SeqId , Delta0 ), [MsgId | MsgIds0 ],
2207+ stats ({1 , - 1 }, {MsgStatus , none }, State2 )}
2208+ end
21872209 end , {Delta , MsgIds , State }, SeqIds ).
21882210
21892211% % Mostly opposite of record_pending_ack/2
21902212msg_from_pending_ack (SeqId , State ) ->
2191- {# msg_status { msg_props = MsgProps } = MsgStatus , State1 } =
2192- remove_pending_ack (false , SeqId , State ),
2193- {MsgStatus # msg_status {
2194- msg_props = MsgProps # message_properties { needs_confirming = false } },
2195- State1 }.
2213+ case remove_pending_ack (false , SeqId , State ) of
2214+ {none , _ } ->
2215+ {none , State };
2216+ {# msg_status { msg_props = MsgProps } = MsgStatus , State1 } ->
2217+ {MsgStatus # msg_status {
2218+ msg_props = MsgProps # message_properties { needs_confirming = false } },
2219+ State1 }
2220+ end .
21962221
21972222beta_limit (Q ) ->
21982223 case ? QUEUE :peek (Q ) of
0 commit comments