@@ -2032,10 +2032,10 @@ return_one(Meta, MsgId, Msg0, DeliveryFailed, Anns,
20322032return_all (Meta , #? STATE {consumers = Cons } = State0 , Effects0 , ConsumerKey ,
20332033 # consumer {checked_out = Checked } = Con , DeliveryFailed ) ->
20342034 State = State0 #? STATE {consumers = Cons #{ConsumerKey => Con }},
2035- lists : foldl (fun ({ MsgId , Msg } , {S , E }) ->
2036- return_one (Meta , MsgId , Msg , DeliveryFailed , #{},
2037- S , E , ConsumerKey )
2038- end , {State , Effects0 }, lists : sort ( maps :to_list (Checked ) )).
2035+ maps : fold (fun (MsgId , Msg , {S , E }) ->
2036+ return_one (Meta , MsgId , Msg , DeliveryFailed , #{},
2037+ S , E , ConsumerKey )
2038+ end , {State , Effects0 }, maps :iterator (Checked , ordered )).
20392039
20402040checkout (Meta , OldState , State0 , Effects0 ) ->
20412041 checkout (Meta , OldState , State0 , Effects0 , ok ).
@@ -2059,11 +2059,11 @@ checkout0(Meta, {success, ConsumerKey, MsgId,
20592059 Msg , ExpiredMsg , State , Effects },
20602060 SendAcc0 ) ->
20612061 DelMsg = {MsgId , Msg },
2062- SendAcc = case maps : get ( ConsumerKey , SendAcc0 , undefined ) of
2063- undefined ->
2064- SendAcc0 #{ConsumerKey => [DelMsg ]};
2065- LogMsgs ->
2066- SendAcc0 #{ConsumerKey => [DelMsg | LogMsgs ]}
2062+ SendAcc = case SendAcc0 of
2063+ #{ ConsumerKey : = LogMsgs } ->
2064+ SendAcc0 #{ConsumerKey : = [DelMsg | LogMsgs ]};
2065+ #{} ->
2066+ SendAcc0 #{ConsumerKey => [DelMsg ]}
20672067 end ,
20682068 checkout0 (Meta , checkout_one (Meta , ExpiredMsg , State , Effects ), SendAcc );
20692069checkout0 (_Meta , {_Activity , ExpiredMsg , State , Effects }, SendAcc ) ->
@@ -3240,7 +3240,7 @@ dlx_apply(_, {dlx, {checkout, Pid, Prefetch}},
32403240dlx_apply (_ , {dlx , {checkout , ConsumerPid , Prefetch }},
32413241 at_least_once ,
32423242 #? DLX {consumer = # dlx_consumer {pid = OldConsumerPid ,
3243- checked_out = CheckedOutOldConsumer },
3243+ checked_out = CheckedOldConsumer },
32443244 discards = Discards0 ,
32453245 msg_bytes = Bytes ,
32463246 msg_bytes_checkout = BytesCheckout } = State0 ) ->
@@ -3258,13 +3258,13 @@ dlx_apply(_, {dlx, {checkout, ConsumerPid, Prefetch}},
32583258 % % such that these messages will be re-delivered to the new consumer.
32593259 % % When inserting back into the discards queue, we respect the original order in which messages
32603260 % % were discarded.
3261- Checked0 = maps :to_list (CheckedOutOldConsumer ),
3262- Checked1 = lists :keysort (1 , Checked0 ),
3263- {Discards , BytesMoved } = lists :foldr (
3264- fun ({_Id , ? TUPLE (_ , Msg ) = RsnMsg }, {D , B }) ->
3261+ {Discards , BytesMoved } = maps :fold (
3262+ fun (_Id , ? TUPLE (_ , Msg ) = RsnMsg , {D , B }) ->
32653263 Size = get_header (size , get_msg_header (Msg )),
32663264 {lqueue :in_r (RsnMsg , D ), B + Size }
3267- end , {Discards0 , 0 }, Checked1 ),
3265+ end ,
3266+ {Discards0 , 0 },
3267+ maps :iterator (CheckedOldConsumer , reversed )),
32683268 State = State0 #? DLX {consumer = # dlx_consumer {pid = ConsumerPid ,
32693269 prefetch = Prefetch },
32703270 discards = Discards ,
0 commit comments