5858 msg_id
5959 }).
6060
61+ % % This is a significantly reduced version of its rabbit_amqp_session counterpart.
62+ % % Local shovels always use the maximum credit allowed.
63+ -record (credit_req , {
64+ delivery_count :: sequence_no ()
65+ }).
66+
6167parse (_Name , {source , Source }) ->
6268 Prefetch = parse_parameter (prefetch_count , fun parse_non_negative_integer /1 ,
6369 proplists :get_value (prefetch_count , Source ,
@@ -168,13 +174,16 @@ init_source(State = #{source := #{queue := QName0,
168174 end ) of
169175 {Remaining , {ok , QState1 }} ->
170176 {ok , QState , Actions } = rabbit_queue_type :credit (QName , CTag , ? INITIAL_DELIVERY_COUNT , MaxLinkCredit , false , QState1 ),
171- % % TODO handle actions
172177 State2 = State #{source => Src #{current => Current #{queue_states => QState ,
173178 consumer_tag => CTag },
174179 remaining => Remaining ,
175180 remaining_unacked => Remaining ,
176181 delivery_count => ? INITIAL_DELIVERY_COUNT ,
177- credit => MaxLinkCredit }},
182+ queue_delivery_count => ? INITIAL_DELIVERY_COUNT ,
183+ credit => MaxLinkCredit ,
184+ queue_credit => MaxLinkCredit ,
185+ at_least_one_credit_req_in_flight => true ,
186+ stashed_credit_req => none }},
178187 handle_queue_actions (Actions , State2 );
179188 {0 , {error , autodelete }} ->
180189 exit ({shutdown , autodelete });
@@ -331,7 +340,6 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current,
331340 Queues = rabbit_amqqueue :lookup_many (QNames ),
332341 case rabbit_queue_type :deliver (Queues , Msg , Options , QState ) of
333342 {ok , QState1 , Actions } ->
334- % % TODO handle credit?
335343 State1 = State #{dest => Dest1 #{current => Current1 #{queue_states => QState1 }}},
336344 #{dest := Dst1 } = State2 = rabbit_shovel_behaviour :incr_forwarded (State1 ),
337345 State4 = rabbit_shovel_behaviour :decr_remaining_unacked (
@@ -411,9 +419,8 @@ handle_queue_actions(Actions, State) ->
411419 lists :foldl (
412420 fun ({deliver , _CTag , AckRequired , Msgs }, S0 ) ->
413421 handle_deliver (AckRequired , Msgs , S0 );
414- ({credit_reply , _ , _ , _ , _ , _ }, S0 ) ->
415- % % TODO handle credit_reply
416- S0 ;
422+ ({credit_reply , _ , _ , _ , _ , _ } = Action , S0 ) ->
423+ handle_credit_reply (Action , S0 );
417424 (Action , S0 ) ->
418425 rabbit_log :warning (" ACTION NOT HANDLED ~p " , [Action ]),
419426 S0
@@ -429,7 +436,7 @@ handle_deliver(AckRequired, Msgs, State) when is_list(Msgs) ->
429436 lists :foldl (fun ({_QName , _QPid , MsgId , _Redelivered , Mc }, S0 ) ->
430437 DeliveryTag = next_tag (S0 ),
431438 S = record_pending (AckRequired , DeliveryTag , MsgId , increase_next_tag (S0 )),
432- sent_pending_delivery (rabbit_shovel_behaviour :forward (DeliveryTag , Mc , S ))
439+ sent_delivery (rabbit_shovel_behaviour :forward (DeliveryTag , Mc , S ))
433440 end , State , Msgs ).
434441
435442next_tag (#{source := #{current := #{next_tag := DeliveryTag }}}) ->
@@ -559,14 +566,14 @@ settle(Op, DeliveryTag, Multiple, #{unacked_message_q := UAMQ0,
559566 source := #{queue := Queue ,
560567 current := Current = #{queue_states := QState0 ,
561568 consumer_tag := CTag ,
562- vhost := VHost }} = Src } = State ) ->
569+ vhost := VHost }} = Src } = State0 ) ->
563570 {Acked , UAMQ } = collect_acks (UAMQ0 , DeliveryTag , Multiple ),
564571 QRef = rabbit_misc :r (VHost , queue , Queue ),
565572 MsgIds = [Ack # pending_ack .msg_id || Ack <- Acked ],
566573 case rabbit_queue_type :settle (QRef , Op , CTag , MsgIds , QState0 ) of
567574 {ok , QState1 , Actions } ->
568- State #{source => Src #{current => Current #{queue_states => QState1 }},
569- unacked_message_q => UAMQ },
575+ State = State0 #{source => Src #{current => Current #{queue_states => QState1 }},
576+ unacked_message_q => UAMQ },
570577 handle_queue_actions (Actions , State );
571578 {'protocol_error' , Type , Reason , Args } ->
572579 rabbit_log :error (" Shovel failed to settle ~p acknowledgments with ~tp : ~tp " ,
@@ -646,30 +653,100 @@ confirm_to_inbound(ConfirmFun, Seq, Multiple,
646653 State #{dest =>
647654 Dst #{unacked => Unacked1 }}).
648655
649- sent_pending_delivery (#{source := #{current := #{consumer_tag := CTag ,
650- vhost := VHost ,
651- queue_states := QState0
652- } = Current ,
653- delivery_count := DeliveryCount0 ,
654- credit := Credit0 ,
655- queue := QName0 } = Src } = State0 ) ->
656- % % TODO add check for credit request in-flight
656+ sent_delivery (#{source := #{current := #{consumer_tag := CTag ,
657+ vhost := VHost ,
658+ queue_states := QState0
659+ } = Current ,
660+ delivery_count := DeliveryCount0 ,
661+ credit := Credit0 ,
662+ queue_delivery_count := QDeliveryCount0 ,
663+ queue_credit := QCredit0 ,
664+ at_least_one_credit_req_in_flight := HaveCreditReqInFlight ,
665+ queue := QName0 } = Src ,
666+ dest := #{unacked := Unacked }} = State0 ) ->
657667 QName = rabbit_misc :r (VHost , queue , QName0 ),
658668 DeliveryCount = serial_number :add (DeliveryCount0 , 1 ),
659669 Credit = max (0 , Credit0 - 1 ),
660- {ok , QState , Actions } = case Credit =:= 0 of
670+ QDeliveryCount = serial_number :add (QDeliveryCount0 , 1 ),
671+ QCredit = max (0 , QCredit0 - 1 ),
672+ MaxLinkCredit = max_link_credit (),
673+ GrantLinkCredit = grant_link_credit (HaveCreditReqInFlight , Credit , MaxLinkCredit , maps :size (Unacked )),
674+ Src1 = case HaveCreditReqInFlight andalso GrantLinkCredit of
675+ true ->
676+ Req = # credit_req {
677+ delivery_count = DeliveryCount
678+ },
679+ maps :put (stashed_credit_req , Req , Src );
680+ false ->
681+ Src
682+ end ,
683+ {ok , QState , Actions } = case GrantLinkCredit of
661684 true ->
662685 rabbit_queue_type :credit (
663686 QName , CTag , DeliveryCount , max_link_credit (),
664687 false , QState0 );
665- false ->
688+ _ ->
666689 {ok , QState0 , []}
667690 end ,
668- State = State0 #{source => Src #{current => Current #{queue_states => QState },
691+ CreditReqInFlight = case GrantLinkCredit of
692+ true -> true ;
693+ false -> HaveCreditReqInFlight
694+ end ,
695+ State = State0 #{source => Src1 #{current => Current #{queue_states => QState },
669696 credit => Credit ,
670- delivery_count => DeliveryCount
697+ delivery_count => DeliveryCount ,
698+ queue_credit => QCredit ,
699+ queue_delivery_count => QDeliveryCount ,
700+ at_least_one_credit_req_in_flight => CreditReqInFlight
671701 }},
672702 handle_queue_actions (Actions , State ).
673703
674704max_link_credit () ->
675705 application :get_env (rabbit , max_link_credit , ? DEFAULT_MAX_LINK_CREDIT ).
706+
707+ grant_link_credit (true = _HaveCreditReqInFlight , _Credit , _MaxLinkCredit , _NumUnconfirmed ) ->
708+ false ;
709+ grant_link_credit (false = _HaveCreditReqInFlight , Credit , MaxLinkCredit , NumUnconfirmed ) ->
710+ Credit =< MaxLinkCredit div 2 andalso
711+ NumUnconfirmed < MaxLinkCredit .
712+
713+ % % Drain is ignored because local shovels do not use it.
714+ handle_credit_reply ({credit_reply , CTag , DeliveryCount , Credit , _Available , _Drain },
715+ #{source := #{credit := CCredit ,
716+ queue_delivery_count := QDeliveryCount ,
717+ stashed_credit_req := StashedCreditReq ,
718+ queue := QName0 ,
719+ current := Current = #{queue_states := QState0 ,
720+ vhost := VHost }} = Src } = State0 ) ->
721+ % % Assertion: Our (receiver) delivery-count should be always
722+ % % in sync with the delivery-count of the sending queue.
723+ QDeliveryCount = DeliveryCount ,
724+ case StashedCreditReq of
725+ # credit_req {delivery_count = StashedDeliveryCount } ->
726+ MaxLinkCredit = max_link_credit (),
727+ QName = rabbit_misc :r (VHost , queue , QName0 ),
728+ {ok , QState , Actions } = rabbit_queue_type :credit (QName , CTag , StashedDeliveryCount ,
729+ MaxLinkCredit , false , QState0 ),
730+ State = State0 #{source => Src #{queue_credit => MaxLinkCredit ,
731+ at_least_one_credit_req_in_flight => true ,
732+ stashed_credit_req => none ,
733+ current => Current #{queue_states => QState }}},
734+ handle_queue_actions (Actions , State );
735+ none when Credit =:= 0 andalso
736+ CCredit > 0 ->
737+ MaxLinkCredit = max_link_credit (),
738+ QName = rabbit_misc :r (VHost , queue , QName0 ),
739+ {ok , QState , Actions } = rabbit_queue_type :credit (QName , CTag , DeliveryCount , MaxLinkCredit , false , QState0 ),
740+ State = State0 #{source => Src #{queue_credit => MaxLinkCredit ,
741+ at_least_one_credit_req_in_flight => true ,
742+ current => Current #{queue_states => QState }}},
743+ handle_queue_actions (Actions , State );
744+ none ->
745+ % % Although we (the receiver) usually determine link credit, we set here
746+ % % our link credit to what the queue says our link credit is (which is safer
747+ % % in case credit requests got applied out of order in quorum queues).
748+ % % This should be fine given that we asserted earlier that our delivery-count is
749+ % % in sync with the delivery-count of the sending queue.
750+ State0 #{source => Src #{queue_credit => Credit ,
751+ at_least_one_credit_req_in_flight => false }}
752+ end .
0 commit comments