@@ -574,21 +574,23 @@ get_user_vhost_from_amqp_param(Uri) ->
574574
575575settle (Op , DeliveryTag , Multiple ,
576576 #{source := #{queue_r := QRef ,
577- current := Current = #{queue_states := QState0 ,
578- consumer_tag := CTag ,
577+ current := Current = #{consumer_tag := CTag ,
579578 unacked_message_q := UAMQ0 }
580579 } = Src } = State0 ) ->
581580 {MsgIds , UAMQ } = collect_acks (UAMQ0 , DeliveryTag , Multiple ),
582- case rabbit_queue_type :settle (QRef , Op , CTag , MsgIds , QState0 ) of
583- {ok , QState1 , Actions } ->
584- State = State0 #{source => Src #{current => Current #{queue_states => QState1 ,
585- unacked_message_q => UAMQ }}},
586- handle_queue_actions (Actions , State );
587- {'protocol_error' , Type , Reason , Args } ->
588- ? LOG_ERROR (" Shovel failed to settle ~p acknowledgments with ~tp : ~tp " ,
589- [Op , Type , io_lib :format (Reason , Args )]),
590- exit ({shutdown , {ack_failed , Reason }})
591- end .
581+ State = State0 #{source => Src #{current => Current #{unacked_message_q => UAMQ }}},
582+ lists :foldl (
583+ fun (MsgId , #{source := Src0 = #{current := Current0 = #{queue_states := QState0 }}} = St0 ) ->
584+ case rabbit_queue_type :settle (QRef , Op , CTag , [MsgId ], QState0 ) of
585+ {ok , QState1 , Actions } ->
586+ St = St0 #{source => Src0 #{current => Current0 #{queue_states => QState1 }}},
587+ handle_queue_actions (Actions , St );
588+ {'protocol_error' , Type , Reason , Args } ->
589+ ? LOG_ERROR (" Shovel failed to settle ~p acknowledgments with ~tp : ~tp " ,
590+ [Op , Type , io_lib :format (Reason , Args )]),
591+ exit ({shutdown , {ack_failed , Reason }})
592+ end
593+ end , State , MsgIds ).
592594
593595% % From rabbit_channel
594596% % Records a client-sent acknowledgement. Handles both single delivery acks
0 commit comments