5050% % or by remote-incoming window (i.e. session flow control).
5151-define (DEFAULT_MAX_QUEUE_CREDIT , 256 ).
5252-define (DEFAULT_MAX_INCOMING_WINDOW , 400 ).
53- -define (MAX_LINK_CREDIT , persistent_term :get (max_link_credit )).
5453-define (MAX_MANAGEMENT_LINK_CREDIT , 8 ).
5554-define (MANAGEMENT_NODE_ADDRESS , <<" /management" >>).
5655-define (UINT_OUTGOING_WINDOW , {uint , ? UINT_MAX }).
253252 resource_alarms :: sets :set (rabbit_alarm :resource_alarm_source ()),
254253 trace_state :: rabbit_trace :state (),
255254 conn_name :: binary (),
256- max_incoming_window :: pos_integer ()
255+ max_incoming_window :: pos_integer (),
256+ max_link_credit :: pos_integer (),
257+ max_queue_credit :: pos_integer ()
257258 }).
258259
259260-record (state , {
@@ -386,8 +387,6 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
386387 true = is_valid_max (MaxLinkCredit ),
387388 true = is_valid_max (MaxQueueCredit ),
388389 true = is_valid_max (MaxIncomingWindow ),
389- ok = persistent_term :put (max_link_credit , MaxLinkCredit ),
390- ok = persistent_term :put (max_queue_credit , MaxQueueCredit ),
391390 IncomingWindow = case sets :is_empty (Alarms ) of
392391 true -> MaxIncomingWindow ;
393392 false -> 0
@@ -420,7 +419,9 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
420419 resource_alarms = Alarms ,
421420 trace_state = rabbit_trace :init (Vhost ),
422421 conn_name = ConnName ,
423- max_incoming_window = MaxIncomingWindow
422+ max_incoming_window = MaxIncomingWindow ,
423+ max_link_credit = MaxLinkCredit ,
424+ max_queue_credit = MaxQueueCredit
424425 }}}.
425426
426427terminate (_Reason , # state {incoming_links = IncomingLinks ,
@@ -582,7 +583,8 @@ send_delivery_state_changes(#state{stashed_rejected = [],
582583 stashed_eol = []} = State ) ->
583584 State ;
584585send_delivery_state_changes (State0 = # state {cfg = # cfg {writer_pid = Writer ,
585- channel_num = ChannelNum }}) ->
586+ channel_num = ChannelNum ,
587+ max_link_credit = MaxLinkCredit }}) ->
586588 % % Order is important:
587589 % % 1. Process queue rejections.
588590 {RejectedIds , GrantCredits0 , State1 } = handle_stashed_rejected (State0 ),
@@ -603,15 +605,16 @@ send_delivery_state_changes(State0 = #state{cfg = #cfg{writer_pid = Writer,
603605 rabbit_amqp_writer :send_command (Writer , ChannelNum , Frame )
604606 end , DetachFrames ),
605607 maps :foreach (fun (HandleInt , DeliveryCount ) ->
606- F0 = flow (? UINT (HandleInt ), DeliveryCount ),
608+ F0 = flow (? UINT (HandleInt ), DeliveryCount , MaxLinkCredit ),
607609 F = session_flow_fields (F0 , State ),
608610 rabbit_amqp_writer :send_command (Writer , ChannelNum , F )
609611 end , GrantCredits ),
610612 State .
611613
612614handle_stashed_rejected (# state {stashed_rejected = []} = State ) ->
613615 {[], #{}, State };
614- handle_stashed_rejected (# state {stashed_rejected = Actions ,
616+ handle_stashed_rejected (# state {cfg = # cfg {max_link_credit = MaxLinkCredit },
617+ stashed_rejected = Actions ,
615618 incoming_links = Links } = State0 ) ->
616619 {Ids , GrantCredits , Ls } =
617620 lists :foldl (
@@ -628,7 +631,8 @@ handle_stashed_rejected(#state{stashed_rejected = Actions,
628631 end ,
629632 Link1 = Link0 # incoming_link {incoming_unconfirmed_map = U },
630633 {Link , GrantCreds } = maybe_grant_link_credit (
631- HandleInt , Link1 , GrantCreds0 ),
634+ MaxLinkCredit , HandleInt ,
635+ Link1 , GrantCreds0 ),
632636 {Ids1 , GrantCreds , maps :update (HandleInt , Link , Links0 )};
633637 error ->
634638 Acc
@@ -645,7 +649,8 @@ handle_stashed_rejected(#state{stashed_rejected = Actions,
645649
646650handle_stashed_settled (GrantCredits , # state {stashed_settled = []} = State ) ->
647651 {[], GrantCredits , State };
648- handle_stashed_settled (GrantCredits0 , # state {stashed_settled = Actions ,
652+ handle_stashed_settled (GrantCredits0 , # state {cfg = # cfg {max_link_credit = MaxLinkCredit },
653+ stashed_settled = Actions ,
649654 incoming_links = Links } = State0 ) ->
650655 {Ids , GrantCredits , Ls } =
651656 lists :foldl (
@@ -674,7 +679,8 @@ handle_stashed_settled(GrantCredits0, #state{stashed_settled = Actions,
674679 end ,
675680 Link1 = Link0 # incoming_link {incoming_unconfirmed_map = U },
676681 {Link , GrantCreds } = maybe_grant_link_credit (
677- HandleInt , Link1 , GrantCreds0 ),
682+ MaxLinkCredit , HandleInt ,
683+ Link1 , GrantCreds0 ),
678684 {Ids2 , GrantCreds , maps :update (HandleInt , Link , Links0 )};
679685 _ ->
680686 Acc
@@ -714,11 +720,14 @@ handle_stashed_down(#state{stashed_down = QNames,
714720
715721handle_stashed_eol (DetachFrames , GrantCredits , # state {stashed_eol = []} = State ) ->
716722 {[], [], DetachFrames , GrantCredits , State };
717- handle_stashed_eol (DetachFrames0 , GrantCredits0 , # state {stashed_eol = Eols } = State0 ) ->
723+ handle_stashed_eol (DetachFrames0 , GrantCredits0 , # state {cfg = # cfg {max_link_credit = MaxLinkCredit },
724+ stashed_eol = Eols } = State0 ) ->
718725 {ReleasedIs , AcceptedIds , DetachFrames , GrantCredits , State1 } =
719726 lists :foldl (fun (QName , {RIds0 , AIds0 , DetachFrames1 , GrantCreds0 , S0 = # state {incoming_links = Links0 ,
720727 queue_states = QStates0 }}) ->
721- {RIds , AIds , GrantCreds1 , Links } = settle_eol (QName , {RIds0 , AIds0 , GrantCreds0 , Links0 }),
728+ {RIds , AIds , GrantCreds1 , Links } = settle_eol (
729+ QName , MaxLinkCredit ,
730+ {RIds0 , AIds0 , GrantCreds0 , Links0 }),
722731 QStates = rabbit_queue_type :remove (QName , QStates0 ),
723732 S1 = S0 # state {incoming_links = Links ,
724733 queue_states = QStates },
@@ -729,14 +738,14 @@ handle_stashed_eol(DetachFrames0, GrantCredits0, #state{stashed_eol = Eols} = St
729738 State = State1 # state {stashed_eol = []},
730739 {ReleasedIs , AcceptedIds , DetachFrames , GrantCredits , State }.
731740
732- settle_eol (QName , {_ReleasedIds , _AcceptedIds , _GrantCredits , Links } = Acc ) ->
741+ settle_eol (QName , MaxLinkCredit , {_ReleasedIds , _AcceptedIds , _GrantCredits , Links } = Acc ) ->
733742 maps :fold (fun (HandleInt ,
734743 # incoming_link {incoming_unconfirmed_map = U0 } = Link0 ,
735744 {RelIds0 , AcceptIds0 , GrantCreds0 , Links0 }) ->
736745 {RelIds , AcceptIds , U } = settle_eol0 (QName , {RelIds0 , AcceptIds0 , U0 }),
737746 Link1 = Link0 # incoming_link {incoming_unconfirmed_map = U },
738747 {Link , GrantCreds } = maybe_grant_link_credit (
739- HandleInt , Link1 , GrantCreds0 ),
748+ MaxLinkCredit , HandleInt , Link1 , GrantCreds0 ),
740749 Links1 = maps :update (HandleInt ,
741750 Link ,
742751 Links0 ),
@@ -984,7 +993,8 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
984993 } = Attach ,
985994 State0 = # state {incoming_links = IncomingLinks0 ,
986995 permission_cache = PermCache0 ,
987- cfg = # cfg {vhost = Vhost ,
996+ cfg = # cfg {max_link_credit = MaxLinkCredit ,
997+ vhost = Vhost ,
988998 user = User }}) ->
989999 ok = validate_attach (Attach ),
9901000 case ensure_target (Target , Vhost , User , PermCache0 ) of
@@ -994,7 +1004,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
9941004 routing_key = RoutingKey ,
9951005 queue_name_bin = QNameBin ,
9961006 delivery_count = DeliveryCountInt ,
997- credit = ? MAX_LINK_CREDIT },
1007+ credit = MaxLinkCredit },
9981008 _Outcomes = outcomes (Source ),
9991009 Reply = # 'v1_0.attach' {
10001010 name = LinkName ,
@@ -1008,7 +1018,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
10081018 max_message_size = {ulong , persistent_term :get (max_message_size )}},
10091019 Flow = # 'v1_0.flow' {handle = Handle ,
10101020 delivery_count = DeliveryCount ,
1011- link_credit = ? UINT (? MAX_LINK_CREDIT )},
1021+ link_credit = ? UINT (MaxLinkCredit )},
10121022 % %TODO check that handle is not in use for any other open links.
10131023 % %"The handle MUST NOT be used for other open links. An attempt to attach
10141024 % % using a handle which is already associated with a link MUST be responded to
@@ -1458,7 +1468,7 @@ handle_credit_reply0(
14581468 CCredit > 0 ->
14591469 QName = Link0 # outgoing_link .queue_name ,
14601470 % % Provide queue next batch of credits.
1461- CappedCredit = cap_credit (CCredit ),
1471+ CappedCredit = cap_credit (CCredit , S0 # state . cfg # cfg . max_queue_credit ),
14621472 {ok , QStates , Actions } =
14631473 rabbit_queue_type :credit (
14641474 QName , Ctag , DeliveryCount , CappedCredit , false , QStates0 ),
@@ -1496,7 +1506,8 @@ handle_credit_reply0(
14961506 } = QFC ,
14971507 stashed_credit_req = StashedCreditReq },
14981508 S0 = # state {cfg = # cfg {writer_pid = Writer ,
1499- channel_num = ChanNum },
1509+ channel_num = ChanNum ,
1510+ max_queue_credit = MaxQueueCredit },
15001511 outgoing_links = OutgoingLinks ,
15011512 queue_states = QStates0 }) ->
15021513 % % If the queue sent us a drain credit_reply,
@@ -1512,7 +1523,7 @@ handle_credit_reply0(
15121523 % % the current drain credit top-up rounds over a stashed credit request because
15131524 % % this is easier to reason about and the queue will reply promptly meaning
15141525 % % the stashed request will be processed soon enough.
1515- CappedCredit = cap_credit (CCredit ),
1526+ CappedCredit = cap_credit (CCredit , MaxQueueCredit ),
15161527 {ok , QStates , Actions } = rabbit_queue_type :credit (
15171528 QName , Ctag , DeliveryCount ,
15181529 CappedCredit , true , QStates0 ),
@@ -1578,11 +1589,12 @@ pop_credit_req(
15781589 drain = Drain ,
15791590 echo = Echo
15801591 }},
1581- S0 = # state {outgoing_links = OutgoingLinks ,
1592+ S0 = # state {cfg = # cfg {max_queue_credit = MaxQueueCredit },
1593+ outgoing_links = OutgoingLinks ,
15821594 queue_states = QStates0 }) ->
15831595 LinkCreditSnd = amqp10_util :link_credit_snd (
15841596 DeliveryCountRcv , LinkCreditRcv , CDeliveryCount ),
1585- CappedCredit = cap_credit (LinkCreditSnd ),
1597+ CappedCredit = cap_credit (LinkCreditSnd , MaxQueueCredit ),
15861598 {ok , QStates , Actions } = rabbit_queue_type :credit (
15871599 QName , Ctag , QDeliveryCount ,
15881600 CappedCredit , Drain , QStates0 ),
@@ -1753,7 +1765,8 @@ sent_pending_delivery(
17531765 % % assertion
17541766 none = Link0 # outgoing_link .stashed_credit_req ,
17551767 % % Provide queue next batch of credits.
1756- CappedCredit = cap_credit (CCredit ),
1768+ CappedCredit = cap_credit (CCredit ,
1769+ S0 # state .cfg # cfg .max_queue_credit ),
17571770 {ok , QStates1 , Actions0 } =
17581771 rabbit_queue_type :credit (
17591772 QName , Ctag , QDeliveryCount , CappedCredit ,
@@ -1891,11 +1904,6 @@ settle_op_from_outcome(Outcome) ->
18911904 " Unrecognised state: ~tp in DISPOSITION" ,
18921905 [Outcome ]).
18931906
1894- -spec flow ({uint , link_handle ()}, sequence_no ()) ->
1895- # 'v1_0.flow' {}.
1896- flow (Handle , DeliveryCount ) ->
1897- flow (Handle , DeliveryCount , ? MAX_LINK_CREDIT ).
1898-
18991907-spec flow ({uint , link_handle ()}, sequence_no (), rabbit_queue_type :credit ()) ->
19001908 # 'v1_0.flow' {}.
19011909flow (Handle , DeliveryCount , LinkCredit ) ->
@@ -2281,7 +2289,8 @@ incoming_link_transfer(
22812289 vhost = Vhost ,
22822290 trace_state = Trace ,
22832291 conn_name = ConnName ,
2284- channel_num = ChannelNum }}) ->
2292+ channel_num = ChannelNum ,
2293+ max_link_credit = MaxLinkCredit }}) ->
22852294
22862295 {PayloadBin , DeliveryId , Settled } =
22872296 case MultiTransfer of
@@ -2326,7 +2335,8 @@ incoming_link_transfer(
23262335 DeliveryCount = add (DeliveryCount0 , 1 ),
23272336 Credit1 = Credit0 - 1 ,
23282337 {Credit , Reply1 } = maybe_grant_link_credit (
2329- Credit1 , DeliveryCount , map_size (U ), Handle ),
2338+ Credit1 , MaxLinkCredit ,
2339+ DeliveryCount , map_size (U ), Handle ),
23302340 Reply = Reply0 ++ Reply1 ,
23312341 Link = Link0 # incoming_link {
23322342 delivery_count = DeliveryCount ,
@@ -2420,30 +2430,30 @@ released(DeliveryId) ->
24202430 settled = true ,
24212431 state = # 'v1_0.released' {}}.
24222432
2423- maybe_grant_link_credit (Credit , DeliveryCount , NumUnconfirmed , Handle ) ->
2424- case grant_link_credit (Credit , NumUnconfirmed ) of
2433+ maybe_grant_link_credit (Credit , MaxLinkCredit , DeliveryCount , NumUnconfirmed , Handle ) ->
2434+ case grant_link_credit (Credit , MaxLinkCredit , NumUnconfirmed ) of
24252435 true ->
2426- {? MAX_LINK_CREDIT , [flow (Handle , DeliveryCount )]};
2436+ {MaxLinkCredit , [flow (Handle , DeliveryCount , MaxLinkCredit )]};
24272437 false ->
24282438 {Credit , []}
24292439 end .
24302440
24312441maybe_grant_link_credit (
2442+ MaxLinkCredit ,
24322443 HandleInt ,
24332444 Link = # incoming_link {credit = Credit ,
24342445 incoming_unconfirmed_map = U ,
24352446 delivery_count = DeliveryCount },
24362447 AccMap ) ->
2437- case grant_link_credit (Credit , map_size (U )) of
2448+ case grant_link_credit (Credit , MaxLinkCredit , map_size (U )) of
24382449 true ->
2439- {Link # incoming_link {credit = ? MAX_LINK_CREDIT },
2450+ {Link # incoming_link {credit = MaxLinkCredit },
24402451 AccMap #{HandleInt => DeliveryCount }};
24412452 false ->
24422453 {Link , AccMap }
24432454 end .
24442455
2445- grant_link_credit (Credit , NumUnconfirmed ) ->
2446- MaxLinkCredit = ? MAX_LINK_CREDIT ,
2456+ grant_link_credit (Credit , MaxLinkCredit , NumUnconfirmed ) ->
24472457 Credit =< MaxLinkCredit div 2 andalso
24482458 NumUnconfirmed < MaxLinkCredit .
24492459
@@ -2739,7 +2749,8 @@ handle_outgoing_link_flow_control(
27392749 DeliveryCountRcv ,
27402750 LinkCreditRcv ,
27412751 CFC # client_flow_ctl .delivery_count ),
2742- CappedCredit = cap_credit (LinkCreditSnd ),
2752+ CappedCredit = cap_credit (LinkCreditSnd ,
2753+ State0 # state .cfg # cfg .max_queue_credit ),
27432754 Link = Link0 # outgoing_link {
27442755 client_flow_ctl = CFC # client_flow_ctl {
27452756 credit = LinkCreditSnd ,
@@ -3444,10 +3455,9 @@ is_valid_max(Val) ->
34443455pg_scope () ->
34453456 rabbit :pg_local_scope (amqp_session ).
34463457
3447- -spec cap_credit (rabbit_queue_type :credit ()) ->
3458+ -spec cap_credit (rabbit_queue_type :credit (), pos_integer () ) ->
34483459 rabbit_queue_type :credit ().
3449- cap_credit (DesiredCredit ) ->
3450- MaxCredit = persistent_term :get (max_queue_credit ),
3460+ cap_credit (DesiredCredit , MaxCredit ) ->
34513461 min (DesiredCredit , MaxCredit ).
34523462
34533463ensure_mc_cluster_compat (Mc ) ->
0 commit comments