@@ -135,12 +135,7 @@ connect(Name, SndSettleMode, Uri, Postfix, Addr, Map, AttachFun) ->
135135-spec init_source (state ()) -> state ().
136136init_source (State = #{source := #{current := #{link := Link },
137137 prefetch_count := Prefetch } = Src }) ->
138- {Credit , RenewWhenBelow } = case Src of
139- #{delete_after := R } when is_integer (R ) ->
140- {R , never };
141- #{prefetch_count := Pre } ->
142- {Pre , max (1 , round (Prefetch / 10 ))}
143- end ,
138+ {Credit , RenewWhenBelow } = {Prefetch , max (1 , round (Prefetch / 10 ))},
144139 ok = amqp10_client :flow_link_credit (Link , Credit , RenewWhenBelow ),
145140 Remaining = case Src of
146141 #{delete_after := never } -> unlimited ;
@@ -319,6 +314,9 @@ status(_) ->
319314-spec forward (Tag :: tag (), Props :: #{atom () => any ()},
320315 Payload :: binary (), state ()) ->
321316 state () | {stop , any ()}.
317+ forward (_Tag , _Props , _Payload ,
318+ #{source := #{remaining := 0 }} = State ) ->
319+ State ;
322320forward (_Tag , _Props , _Payload ,
323321 #{source := #{remaining_unacked := 0 }} = State ) ->
324322 State ;
0 commit comments