Skip to content

Commit 382fac3

Browse files
committed
Local shovels: remove stashed credit request
1 parent 3349321 commit 382fac3

File tree

1 file changed

+25
-42
lines changed

1 file changed

+25
-42
lines changed

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 25 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,7 @@ init_source(State = #{source := #{queue_r := QName,
186186
delivery_count => ?INITIAL_DELIVERY_COUNT,
187187
max_link_credit => MaxLinkCredit,
188188
credit => MaxLinkCredit,
189-
at_least_one_credit_req_in_flight => true,
190-
stashed_credit_req => none}},
189+
at_least_one_credit_req_in_flight => true}},
191190
handle_queue_actions(Actions, State2);
192191
{0, {error, autodelete}} ->
193192
exit({shutdown, autodelete});
@@ -323,10 +322,10 @@ handle_dest(_Msg, State) ->
323322
State.
324323

325324
ack(DeliveryTag, Multiple, State) ->
326-
maybe_grant_or_stash_credit(settle(complete, DeliveryTag, Multiple, State)).
325+
maybe_grant_credit(settle(complete, DeliveryTag, Multiple, State)).
327326

328327
nack(DeliveryTag, Multiple, State) ->
329-
maybe_grant_or_stash_credit(settle(discard, DeliveryTag, Multiple, State)).
328+
maybe_grant_credit(settle(discard, DeliveryTag, Multiple, State)).
330329

331330
forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current,
332331
unacked := Unacked} = Dest,
@@ -426,7 +425,7 @@ handle_queue_actions(Actions, State) ->
426425
end, State, Actions).
427426

428427
handle_deliver(AckRequired, Msgs, State) when is_list(Msgs) ->
429-
maybe_grant_or_stash_credit(
428+
maybe_grant_credit(
430429
lists:foldl(
431430
fun({_QName, _QPid, MsgId, _Redelivered, Mc}, S0) ->
432431
DeliveryTag = next_tag(S0),
@@ -654,25 +653,17 @@ sent_delivery(#{source := #{delivery_count := DeliveryCount0,
654653
delivery_count => DeliveryCount
655654
}}.
656655

657-
maybe_grant_or_stash_credit(#{source := #{queue_r := QName,
658-
credit := Credit,
659-
max_link_credit := MaxLinkCredit,
660-
delivery_count := DeliveryCount,
661-
at_least_one_credit_req_in_flight := HaveCreditReqInFlight,
662-
current := #{consumer_tag := CTag,
663-
queue_states := QState0} = Current
664-
} = Src,
665-
dest := #{unacked := Unacked}} = State0) ->
666-
GrantLinkCredit = grant_link_credit(Credit, MaxLinkCredit, maps:size(Unacked)),
667-
Src1 = case HaveCreditReqInFlight andalso GrantLinkCredit of
668-
true ->
669-
Req = #credit_req {
670-
delivery_count = DeliveryCount
671-
},
672-
maps:put(stashed_credit_req, Req, Src);
673-
false ->
674-
Src
675-
end,
656+
maybe_grant_credit(#{source := #{queue_r := QName,
657+
credit := Credit,
658+
max_link_credit := MaxLinkCredit,
659+
delivery_count := DeliveryCount,
660+
at_least_one_credit_req_in_flight := HaveCreditReqInFlight,
661+
current := #{consumer_tag := CTag,
662+
queue_states := QState0,
663+
unacked_message_q := Q} = Current
664+
} = Src,
665+
dest := #{unacked := Unacked}} = State0) ->
666+
GrantLinkCredit = grant_link_credit(Credit, MaxLinkCredit, ?QUEUE:len(Q)),
676667
{ok, QState, Actions} = case (GrantLinkCredit and not HaveCreditReqInFlight) of
677668
true ->
678669
rabbit_queue_type:credit(
@@ -685,9 +676,9 @@ maybe_grant_or_stash_credit(#{source := #{queue_r := QName,
685676
true -> true;
686677
false -> HaveCreditReqInFlight
687678
end,
688-
State = State0#{source => Src1#{current => Current#{queue_states => QState},
689-
at_least_one_credit_req_in_flight => CreditReqInFlight
690-
}},
679+
State = State0#{source => Src#{current => Current#{queue_states => QState},
680+
at_least_one_credit_req_in_flight => CreditReqInFlight
681+
}},
691682
handle_queue_actions(Actions, State).
692683

693684
max_link_credit() ->
@@ -702,30 +693,22 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra
702693
#{source := #{credit := CCredit,
703694
max_link_credit := MaxLinkCredit,
704695
delivery_count := QDeliveryCount,
705-
stashed_credit_req := StashedCreditReq,
706696
queue_r := QName,
707-
current := Current = #{queue_states := QState0}
697+
current := Current = #{queue_states := QState0,
698+
unacked_message_q := Q}
708699
} = Src} = State0) ->
709700
%% Assertion: Our (receiver) delivery-count should be always
710701
%% in sync with the delivery-count of the sending queue.
711702
QDeliveryCount = DeliveryCount,
712-
case StashedCreditReq of
713-
#credit_req{delivery_count = StashedDeliveryCount} ->
714-
{ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, StashedDeliveryCount,
715-
MaxLinkCredit, false, QState0),
716-
State = State0#{source => Src#{credit => MaxLinkCredit,
717-
at_least_one_credit_req_in_flight => true,
718-
stashed_credit_req => none,
719-
current => Current#{queue_states => QState}}},
720-
handle_queue_actions(Actions, State);
721-
none when Credit =:= 0 andalso
722-
CCredit >= 0 ->
723-
{ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, DeliveryCount, MaxLinkCredit, false, QState0),
703+
case grant_link_credit(CCredit, MaxLinkCredit, ?QUEUE:len(Q)) of
704+
true ->
705+
{ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, QDeliveryCount,
706+
MaxLinkCredit, false, QState0),
724707
State = State0#{source => Src#{credit => MaxLinkCredit,
725708
at_least_one_credit_req_in_flight => true,
726709
current => Current#{queue_states => QState}}},
727710
handle_queue_actions(Actions, State);
728-
none ->
711+
false ->
729712
%% Although we (the receiver) usually determine link credit, we set here
730713
%% our link credit to what the queue says our link credit is (which is safer
731714
%% in case credit requests got applied out of order in quorum queues).

0 commit comments

Comments
 (0)