Skip to content

Commit 165716f

Browse files
committed
Local shovels: grant credit after confirming
1 parent e00d83a commit 165716f

File tree

1 file changed

+39
-34
lines changed

1 file changed

+39
-34
lines changed

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 39 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ get_user_vhost_from_amqp_param(Uri) ->
558558
exit({shutdown, {access_refused, Username}})
559559
end;
560560
{refused, Username, _Msg, _Module} ->
561-
?LOG_ERROR("Local shovel user ~ts was refused access"),
561+
?LOG_ERROR("Local shovel user ~ts was refused access", [Username]),
562562
exit({shutdown, {access_refused, Username}})
563563
end.
564564

@@ -628,9 +628,10 @@ route(Msg, #{current := #{vhost := VHost}}) ->
628628

629629
confirm_to_inbound(ConfirmFun, SeqNos, State)
630630
when is_list(SeqNos) ->
631-
lists:foldl(fun(Seq, State0) ->
632-
confirm_to_inbound(ConfirmFun, Seq, State0)
633-
end, State, SeqNos);
631+
State1 = lists:foldl(fun(Seq, State0) ->
632+
confirm_to_inbound(ConfirmFun, Seq, State0)
633+
end, State, SeqNos),
634+
maybe_grant_or_stash_credit(State1);
634635
confirm_to_inbound(ConfirmFun, Seq,
635636
State0 = #{dest := #{unacked := Unacked} = Dst}) ->
636637
#{Seq := InTag} = Unacked,
@@ -639,37 +640,47 @@ confirm_to_inbound(ConfirmFun, Seq,
639640
rabbit_shovel_behaviour:decr_remaining(
640641
1, State#{dest => Dst#{unacked => Unacked1}}).
641642

642-
sent_delivery(#{source := #{current := #{consumer_tag := CTag,
643-
vhost := VHost,
644-
queue_states := QState0
645-
} = Current,
646-
delivery_count := DeliveryCount0,
643+
sent_delivery(#{source := #{delivery_count := DeliveryCount0,
647644
credit := Credit0,
648645
queue_delivery_count := QDeliveryCount0,
649-
queue_credit := QCredit0,
650-
at_least_one_credit_req_in_flight := HaveCreditReqInFlight,
651-
queue := QName0} = Src,
652-
dest := #{unacked := Unacked}} = State0) ->
653-
QName = rabbit_misc:r(VHost, queue, QName0),
646+
queue_credit := QCredit0} = Src
647+
} = State0) ->
654648
DeliveryCount = serial_number:add(DeliveryCount0, 1),
655649
Credit = max(0, Credit0 - 1),
656650
QDeliveryCount = serial_number:add(QDeliveryCount0, 1),
657651
QCredit = max(0, QCredit0 - 1),
652+
State = State0#{source => Src#{credit => Credit,
653+
delivery_count => DeliveryCount,
654+
queue_credit => QCredit,
655+
queue_delivery_count => QDeliveryCount
656+
}},
657+
maybe_grant_or_stash_credit(State).
658+
659+
maybe_grant_or_stash_credit(#{source := #{queue := QName0,
660+
credit := Credit,
661+
delivery_count := DeliveryCount,
662+
at_least_one_credit_req_in_flight := HaveCreditReqInFlight,
663+
current := #{consumer_tag := CTag,
664+
vhost := VHost,
665+
queue_states := QState0} = Current
666+
} = Src,
667+
dest := #{unacked := Unacked}} = State0) ->
658668
MaxLinkCredit = max_link_credit(),
659-
GrantLinkCredit = grant_link_credit(HaveCreditReqInFlight, Credit, MaxLinkCredit, maps:size(Unacked)),
669+
GrantLinkCredit = grant_link_credit(Credit, MaxLinkCredit, maps:size(Unacked)),
660670
Src1 = case HaveCreditReqInFlight andalso GrantLinkCredit of
661-
true ->
662-
Req = #credit_req {
663-
delivery_count = DeliveryCount
664-
},
665-
maps:put(stashed_credit_req, Req, Src);
666-
false ->
667-
Src
668-
end,
669-
{ok, QState, Actions} = case GrantLinkCredit of
671+
true ->
672+
Req = #credit_req {
673+
delivery_count = DeliveryCount
674+
},
675+
maps:put(stashed_credit_req, Req, Src);
676+
false ->
677+
Src
678+
end,
679+
{ok, QState, Actions} = case (GrantLinkCredit and not HaveCreditReqInFlight) of
670680
true ->
681+
QName = rabbit_misc:r(VHost, queue, QName0),
671682
rabbit_queue_type:credit(
672-
QName, CTag, DeliveryCount, max_link_credit(),
683+
QName, CTag, DeliveryCount, MaxLinkCredit,
673684
false, QState0);
674685
_ ->
675686
{ok, QState0, []}
@@ -679,20 +690,14 @@ sent_delivery(#{source := #{current := #{consumer_tag := CTag,
679690
false -> HaveCreditReqInFlight
680691
end,
681692
State = State0#{source => Src1#{current => Current#{queue_states => QState},
682-
credit => Credit,
683-
delivery_count => DeliveryCount,
684-
queue_credit => QCredit,
685-
queue_delivery_count => QDeliveryCount,
686-
at_least_one_credit_req_in_flight => CreditReqInFlight
687-
}},
693+
at_least_one_credit_req_in_flight => CreditReqInFlight
694+
}},
688695
handle_queue_actions(Actions, State).
689696

690697
max_link_credit() ->
691698
application:get_env(rabbit, max_link_credit, ?DEFAULT_MAX_LINK_CREDIT).
692699

693-
grant_link_credit(true = _HaveCreditReqInFlight, _Credit, _MaxLinkCredit, _NumUnconfirmed) ->
694-
false;
695-
grant_link_credit(false = _HaveCreditReqInFlight, Credit, MaxLinkCredit, NumUnconfirmed) ->
700+
grant_link_credit(Credit, MaxLinkCredit, NumUnconfirmed) ->
696701
Credit =< MaxLinkCredit div 2 andalso
697702
NumUnconfirmed < MaxLinkCredit.
698703

0 commit comments

Comments
 (0)