@@ -318,10 +318,10 @@ handle_dest(_Msg, State) ->
318318 State .
319319
320320ack (DeliveryTag , Multiple , State ) ->
321- settle (complete , DeliveryTag , Multiple , State ).
321+ maybe_grant_or_stash_credit ( settle (complete , DeliveryTag , Multiple , State ) ).
322322
323323nack (DeliveryTag , Multiple , State ) ->
324- settle (discard , DeliveryTag , Multiple , State ).
324+ maybe_grant_or_stash_credit ( settle (discard , DeliveryTag , Multiple , State ) ).
325325
326326forward (Tag , Msg0 , #{dest := #{current := #{queue_states := QState } = Current ,
327327 unacked := Unacked } = Dest ,
@@ -398,11 +398,6 @@ parse_parameter(Param, Fun, Value) ->
398398 fail ({invalid_parameter_value , Param , Err })
399399 end .
400400
401- parse_non_negative_integer (N ) when is_integer (N ) andalso N >= 0 ->
402- N ;
403- parse_non_negative_integer (N ) ->
404- fail ({require_non_negative_integer , N }).
405-
406401parse_binary (Binary ) when is_binary (Binary ) ->
407402 Binary ;
408403parse_binary (NotABinary ) ->
@@ -431,8 +426,8 @@ handle_deliver(AckRequired, Msgs, State) when is_list(Msgs) ->
431426 fun ({_QName , _QPid , MsgId , _Redelivered , Mc }, S0 ) ->
432427 DeliveryTag = next_tag (S0 ),
433428 S = record_pending (AckRequired , DeliveryTag , MsgId , increase_next_tag (S0 )),
434- rabbit_shovel_behaviour :forward (DeliveryTag , Mc , sent_delivery ( S ) )
435- end , State , Msgs )).
429+ rabbit_shovel_behaviour :forward (DeliveryTag , Mc , S )
430+ end , sent_delivery ( State , length ( Msgs )) , Msgs )).
436431
437432next_tag (#{source := #{current := #{next_tag := DeliveryTag }}}) ->
438433 DeliveryTag .
@@ -443,15 +438,13 @@ increase_next_tag(#{source := Source = #{current := Current = #{next_tag := Deli
443438handle_dest_queue_actions (Actions , State ) ->
444439 lists :foldl (
445440 fun ({settled , _QName , MsgSeqNos }, S0 ) ->
446- maybe_grant_or_stash_credit (
447- confirm_to_inbound (fun (Tag , StateX ) ->
448- rabbit_shovel_behaviour :ack (Tag , false , StateX )
449- end , MsgSeqNos , S0 ));
441+ confirm_to_inbound (fun (Tag , StateX ) ->
442+ rabbit_shovel_behaviour :ack (Tag , false , StateX )
443+ end , MsgSeqNos , S0 );
450444 ({rejected , _QName , MsgSeqNos }, S0 ) ->
451- maybe_grant_or_stash_credit (
452- confirm_to_inbound (fun (Tag , StateX ) ->
453- rabbit_shovel_behaviour :nack (Tag , false , StateX )
454- end , MsgSeqNos , S0 ));
445+ confirm_to_inbound (fun (Tag , StateX ) ->
446+ rabbit_shovel_behaviour :nack (Tag , false , StateX )
447+ end , MsgSeqNos , S0 );
455448 % % TODO handle {block, QName}
456449 (_Action , S0 ) ->
457450 S0
@@ -646,25 +639,25 @@ confirm_to_inbound(ConfirmFun, SeqNos, State)
646639confirm_to_inbound (ConfirmFun , Seq ,
647640 State0 = #{dest := #{unacked := Unacked } = Dst }) ->
648641 #{Seq := InTag } = Unacked ,
649- State = ConfirmFun (InTag , State0 ),
650642 Unacked1 = maps :remove (Seq , Unacked ),
651- rabbit_shovel_behaviour :decr_remaining (
652- 1 , State #{dest => Dst #{unacked => Unacked1 }}).
643+ State = rabbit_shovel_behaviour :decr_remaining (
644+ 1 , State0 #{dest => Dst #{unacked => Unacked1 }}),
645+ ConfirmFun (InTag , State ).
653646
654647sent_delivery (#{source := #{delivery_count := DeliveryCount0 ,
655648 credit := Credit0 ,
656649 queue_delivery_count := QDeliveryCount0 ,
657650 queue_credit := QCredit0 } = Src
658- } = State0 ) ->
659- DeliveryCount = serial_number :add (DeliveryCount0 , 1 ),
660- Credit = max (0 , Credit0 - 1 ),
661- QDeliveryCount = serial_number :add (QDeliveryCount0 , 1 ),
662- QCredit = max (0 , QCredit0 - 1 ),
663- State = State0 #{source => Src #{credit => Credit ,
664- delivery_count => DeliveryCount ,
665- queue_credit => QCredit ,
666- queue_delivery_count => QDeliveryCount
667- }}.
651+ } = State0 , NumMsgs ) ->
652+ DeliveryCount = serial_number :add (DeliveryCount0 , NumMsgs ),
653+ Credit = max (0 , Credit0 - NumMsgs ),
654+ QDeliveryCount = serial_number :add (QDeliveryCount0 , NumMsgs ),
655+ QCredit = max (0 , QCredit0 - NumMsgs ),
656+ State0 #{source => Src #{credit => Credit ,
657+ delivery_count => DeliveryCount ,
658+ queue_credit => QCredit ,
659+ queue_delivery_count => QDeliveryCount
660+ }}.
668661
669662maybe_grant_or_stash_credit (#{source := #{queue := QName0 ,
670663 credit := Credit ,
0 commit comments