Skip to content

Commit 6dc72fd

Browse files
committed
Local shovels: slow down publishing when all messages are being rejected
1 parent 1fc7390 commit 6dc72fd

File tree

1 file changed

+23
-4
lines changed

1 file changed

+23
-4
lines changed

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,10 @@ init_source(State = #{source := #{queue_r := QName,
185185
delivery_count => ?INITIAL_DELIVERY_COUNT,
186186
max_link_credit => MaxLinkCredit,
187187
credit => MaxLinkCredit,
188-
at_least_one_credit_req_in_flight => true}},
188+
at_least_one_credit_req_in_flight => true,
189+
complete => 0,
190+
requeue => 0,
191+
last_settlement_count => {0, 0}}},
189192
handle_queue_actions(Actions, State2);
190193
{0, {error, autodelete}} ->
191194
exit({shutdown, autodelete});
@@ -600,7 +603,7 @@ settle(Op, DeliveryTag, Multiple,
600603
case rabbit_queue_type:settle(QRef, Op, CTag, [MsgId], QState0) of
601604
{ok, QState1, Actions} ->
602605
St = St0#{source => Src0#{current => Current0#{queue_states => QState1}}},
603-
handle_queue_actions(Actions, St);
606+
handle_queue_actions(Actions, increase_settled(St, Op));
604607
{'protocol_error', Type, Reason, Args} ->
605608
?LOG_ERROR("Shovel failed to settle ~p acknowledgments with ~tp: ~tp",
606609
[Op, Type, io_lib:format(Reason, Args)]),
@@ -716,17 +719,22 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra
716719
delivery_count := QDeliveryCount,
717720
queue_r := QName,
718721
current := Current = #{queue_states := QState0,
719-
unacked_message_q := Q}
722+
unacked_message_q := Q},
723+
complete := Complete,
724+
requeue := Requeue,
725+
last_settlement_count := Last
720726
} = Src} = State0) ->
721727
%% Assertion: Our (receiver) delivery-count should be always
722728
%% in sync with the delivery-count of the sending queue.
723729
QDeliveryCount = DeliveryCount,
724730
case grant_link_credit(CCredit, MaxLinkCredit, ?QUEUE:len(Q)) of
725731
true ->
732+
MaxCredit = cap_credit(MaxLinkCredit, Complete, Requeue, Last),
726733
{ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, QDeliveryCount,
727-
MaxLinkCredit, false, QState0),
734+
MaxCredit, false, QState0),
728735
State = State0#{source => Src#{credit => MaxLinkCredit,
729736
at_least_one_credit_req_in_flight => true,
737+
last_settlement_count => {Complete, Requeue},
730738
current => Current#{queue_states => QState}}},
731739
handle_queue_actions(Actions, State);
732740
false ->
@@ -738,3 +746,14 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra
738746
State0#{source => Src#{credit => Credit,
739747
at_least_one_credit_req_in_flight => false}}
740748
end.
749+
750+
increase_settled(State = #{source := Src = #{complete := Complete}}, complete) ->
751+
State#{source => Src#{complete => Complete + 1}};
752+
increase_settled(State = #{source := Src = #{requeue := Requeue}}, requeue) ->
753+
State#{source => Src#{requeue => Requeue + 1}}.
754+
755+
cap_credit(MaxLinkCredit, Complete, Requeue, {Complete, LastRequeue})
756+
when (Requeue - LastRequeue) > (MaxLinkCredit div 2) ->
757+
MaxLinkCredit div 2;
758+
cap_credit(MaxLinkCredit, _, _, _) ->
759+
MaxLinkCredit.

0 commit comments

Comments
 (0)