Skip to content

Commit e9d767b

Browse files
committed
Local shovels: fix credit flow
1 parent 0b1aefd commit e9d767b

File tree

1 file changed

+6
-14
lines changed

1 file changed

+6
-14
lines changed

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -176,9 +176,7 @@ init_source(State = #{source := #{queue := QName0,
176176
remaining => Remaining,
177177
remaining_unacked => Remaining,
178178
delivery_count => ?INITIAL_DELIVERY_COUNT,
179-
queue_delivery_count => ?INITIAL_DELIVERY_COUNT,
180179
credit => MaxLinkCredit,
181-
queue_credit => MaxLinkCredit,
182180
at_least_one_credit_req_in_flight => true,
183181
stashed_credit_req => none}},
184182
handle_queue_actions(Actions, State2);
@@ -647,18 +645,12 @@ confirm_to_inbound(ConfirmFun, Seq,
647645
ConfirmFun(InTag, State).
648646

649647
sent_delivery(#{source := #{delivery_count := DeliveryCount0,
650-
credit := Credit0,
651-
queue_delivery_count := QDeliveryCount0,
652-
queue_credit := QCredit0} = Src
648+
credit := Credit0} = Src
653649
} = State0, NumMsgs) ->
654650
DeliveryCount = serial_number:add(DeliveryCount0, NumMsgs),
655651
Credit = max(0, Credit0 - NumMsgs),
656-
QDeliveryCount = serial_number:add(QDeliveryCount0, NumMsgs),
657-
QCredit = max(0, QCredit0 - NumMsgs),
658652
State0#{source => Src#{credit => Credit,
659-
delivery_count => DeliveryCount,
660-
queue_credit => QCredit,
661-
queue_delivery_count => QDeliveryCount
653+
delivery_count => DeliveryCount
662654
}}.
663655

664656
maybe_grant_or_stash_credit(#{source := #{queue := QName0,
@@ -709,7 +701,7 @@ grant_link_credit(Credit, MaxLinkCredit, NumUnconfirmed) ->
709701
%% Drain is ignored because local shovels do not use it.
710702
handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Drain},
711703
#{source := #{credit := CCredit,
712-
queue_delivery_count := QDeliveryCount,
704+
delivery_count := QDeliveryCount,
713705
stashed_credit_req := StashedCreditReq,
714706
queue := QName0,
715707
current := Current = #{queue_states := QState0,
@@ -723,7 +715,7 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra
723715
QName = rabbit_misc:r(VHost, queue, QName0),
724716
{ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, StashedDeliveryCount,
725717
MaxLinkCredit, false, QState0),
726-
State = State0#{source => Src#{queue_credit => MaxLinkCredit,
718+
State = State0#{source => Src#{credit => MaxLinkCredit,
727719
at_least_one_credit_req_in_flight => true,
728720
stashed_credit_req => none,
729721
current => Current#{queue_states => QState}}},
@@ -733,7 +725,7 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra
733725
MaxLinkCredit = max_link_credit(),
734726
QName = rabbit_misc:r(VHost, queue, QName0),
735727
{ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, DeliveryCount, MaxLinkCredit, false, QState0),
736-
State = State0#{source => Src#{queue_credit => MaxLinkCredit,
728+
State = State0#{source => Src#{credit => MaxLinkCredit,
737729
at_least_one_credit_req_in_flight => true,
738730
current => Current#{queue_states => QState}}},
739731
handle_queue_actions(Actions, State);
@@ -743,6 +735,6 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra
743735
%% in case credit requests got applied out of order in quorum queues).
744736
%% This should be fine given that we asserted earlier that our delivery-count is
745737
%% in sync with the delivery-count of the sending queue.
746-
State0#{source => Src#{queue_credit => Credit,
738+
State0#{source => Src#{credit => Credit,
747739
at_least_one_credit_req_in_flight => false}}
748740
end.

0 commit comments

Comments
 (0)