Skip to content

Commit 41c25cf

Browse files
Merge pull request #14360 from rabbitmq/fix-amqp10
Shovel: AMQP1.0 use prefetch-count as credit on delete-after
2 parents 2024a4b + fa66b4e commit 41c25cf

File tree

1 file changed

+4
-6
lines changed

1 file changed

+4
-6
lines changed

deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -135,12 +135,7 @@ connect(Name, SndSettleMode, Uri, Postfix, Addr, Map, AttachFun) ->
135135
-spec init_source(state()) -> state().
136136
init_source(State = #{source := #{current := #{link := Link},
137137
prefetch_count := Prefetch} = Src}) ->
138-
{Credit, RenewWhenBelow} = case Src of
139-
#{delete_after := R} when is_integer(R) ->
140-
{R, never};
141-
#{prefetch_count := Pre} ->
142-
{Pre, max(1, round(Prefetch/10))}
143-
end,
138+
{Credit, RenewWhenBelow} = {Prefetch, max(1, round(Prefetch/10))},
144139
ok = amqp10_client:flow_link_credit(Link, Credit, RenewWhenBelow),
145140
Remaining = case Src of
146141
#{delete_after := never} -> unlimited;
@@ -319,6 +314,9 @@ status(_) ->
319314
-spec forward(Tag :: tag(), Props :: #{atom() => any()},
320315
Payload :: binary(), state()) ->
321316
state() | {stop, any()}.
317+
forward(_Tag, _Props, _Payload,
318+
#{source := #{remaining := 0}} = State) ->
319+
State;
322320
forward(_Tag, _Props, _Payload,
323321
#{source := #{remaining_unacked := 0}} = State) ->
324322
State;

0 commit comments

Comments
 (0)