diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl index c87751201f88..84a4b7ea0b22 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl @@ -135,12 +135,7 @@ connect(Name, SndSettleMode, Uri, Postfix, Addr, Map, AttachFun) -> -spec init_source(state()) -> state(). init_source(State = #{source := #{current := #{link := Link}, prefetch_count := Prefetch} = Src}) -> - {Credit, RenewWhenBelow} = case Src of - #{delete_after := R} when is_integer(R) -> - {R, never}; - #{prefetch_count := Pre} -> - {Pre, max(1, round(Prefetch/10))} - end, + {Credit, RenewWhenBelow} = {Prefetch, max(1, round(Prefetch/10))}, ok = amqp10_client:flow_link_credit(Link, Credit, RenewWhenBelow), Remaining = case Src of #{delete_after := never} -> unlimited; @@ -319,6 +314,9 @@ status(_) -> -spec forward(Tag :: tag(), Props :: #{atom() => any()}, Payload :: binary(), state()) -> state() | {stop, any()}. +forward(_Tag, _Props, _Payload, + #{source := #{remaining := 0}} = State) -> + State; forward(_Tag, _Props, _Payload, #{source := #{remaining_unacked := 0}} = State) -> State;