172172-record (queue_flow_ctl , {
173173 delivery_count :: sequence_no (),
174174 % % We cap the actual credit we grant to the sending queue.
175+ % % If client_flow_ctl.credit is larger than LINK_CREDIT_RCV_FROM_QUEUE_MAX,
176+ % % we will top up in batches to the sending queue.
175177 credit :: 0 ..? LINK_CREDIT_RCV_FROM_QUEUE_MAX ,
176- % % Credit as desired by the receiving client. If larger than
177- % % LINK_CREDIT_RCV_FROM_QUEUE_MAX, we will top up in batches to the sending queue.
178- desired_credit :: rabbit_queue_type :credit (),
179178 drain :: boolean ()
180179 }).
181180
197196 % % client and for the link to the sending queue.
198197 client_flow_ctl :: # client_flow_ctl {} | credit_api_v1 ,
199198 queue_flow_ctl :: # queue_flow_ctl {} | credit_api_v1 ,
200- % % True if we sent a credit request to the sending queue
201- % % but haven't processed the corresponding credit reply yet.
202- credit_req_in_flight :: boolean () | credit_api_v1 ,
203- % % While credit_req_in_flight is true, we stash the
199+ % % 'true' means:
200+ % % * we haven't processed a credit reply yet since we last sent
201+ % % a credit request to the sending queue.
202+ % % * a credit request is certainly is in flight
203+ % % * possibly multiple credit requests are in flight (e.g. rabbit_fifo_client
204+ % % will re-send credit requests on our behalf on quorum queue leader changes)
205+ % % 'false' means:
206+ % % * we processed a credit reply since we last sent a credit request to the sending queue
207+ % % * probably no credit request is in flight, but there might be
208+ % % (we aren't sure since we don't use correlations for credit requests)
209+ at_least_one_credit_req_in_flight :: boolean () | credit_api_v1 ,
210+ % % While at_least_one_credit_req_in_flight is true, we stash the
204211 % % latest credit request from the receiving client.
205212 stashed_credit_req :: none | # credit_req {} | credit_api_v1
206213 }).
@@ -1066,7 +1073,6 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
10661073 echo = false },
10671074 # queue_flow_ctl {delivery_count = ? INITIAL_DELIVERY_COUNT ,
10681075 credit = 0 ,
1069- desired_credit = 0 ,
10701076 drain = false },
10711077 false ,
10721078 none };
@@ -1116,7 +1122,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
11161122 delivery_count = DeliveryCount ,
11171123 client_flow_ctl = ClientFlowCtl ,
11181124 queue_flow_ctl = QueueFlowCtl ,
1119- credit_req_in_flight = CreditReqInFlight ,
1125+ at_least_one_credit_req_in_flight = CreditReqInFlight ,
11201126 stashed_credit_req = StashedCreditReq },
11211127 OutgoingLinks = OutgoingLinks0 #{HandleInt => Link },
11221128 State1 = State0 # state {queue_states = QStates ,
@@ -1392,16 +1398,11 @@ send_pending(#state{remote_incoming_window = RemoteIncomingWindow,
13921398 end
13931399 end .
13941400
1395- handle_credit_reply (Action = {credit_reply , Ctag , _DeliveryCount , _Credit , _Available , Drain },
1401+ handle_credit_reply (Action = {credit_reply , Ctag , _DeliveryCount , _Credit , _Available , _Drain },
13961402 State = # state {outgoing_links = OutgoingLinks }) ->
13971403 Handle = ctag_to_handle (Ctag ),
13981404 case OutgoingLinks of
1399- #{Handle := Link = # outgoing_link {queue_flow_ctl = QFC ,
1400- credit_req_in_flight = CreditReqInFlight }} ->
1401- % % Assert that we expect a credit reply for this consumer.
1402- true = CreditReqInFlight ,
1403- % % Assert that "The sender's value is always the last known value indicated by the receiver."
1404- Drain = QFC # queue_flow_ctl .drain ,
1405+ #{Handle := Link } ->
14051406 handle_credit_reply0 (Action , Handle , Link , State );
14061407 _ ->
14071408 % % Ignore credit reply for a detached link.
@@ -1418,18 +1419,16 @@ handle_credit_reply0(
14181419 echo = CEcho
14191420 },
14201421 queue_flow_ctl = # queue_flow_ctl {
1421- delivery_count = QDeliveryCount ,
1422- credit = QCredit ,
1423- desired_credit = DesiredCredit
1424- } = QFC ,
1422+ delivery_count = QDeliveryCount
1423+ } = QFC0 ,
14251424 stashed_credit_req = StashedCreditReq
14261425 } = Link0 ,
14271426 # state {outgoing_links = OutgoingLinks ,
14281427 queue_states = QStates0
14291428 } = S0 ) ->
14301429
1431- % % Assert that flow control state between us and the queue is in sync.
1432- QCredit = Credit ,
1430+ % % Assertion: Our (receiver) delivery-count should be always
1431+ % % in sync with the delivery-count of the sending queue.
14331432 QDeliveryCount = DeliveryCount ,
14341433
14351434 case StashedCreditReq of
@@ -1439,24 +1438,32 @@ handle_credit_reply0(
14391438 S = pop_credit_req (Handle , Ctag , Link0 , S0 ),
14401439 echo (CEcho , Handle , CDeliveryCount , CCredit , Available , S ),
14411440 S ;
1442- none when QCredit =:= 0 andalso
1443- DesiredCredit > 0 ->
1441+ none when Credit =:= 0 andalso
1442+ CCredit > 0 ->
14441443 QName = Link0 # outgoing_link .queue_name ,
14451444 % % Provide queue next batch of credits.
1446- CappedCredit = cap_credit (DesiredCredit ),
1445+ CappedCredit = cap_credit (CCredit ),
14471446 {ok , QStates , Actions } =
14481447 rabbit_queue_type :credit (
14491448 QName , Ctag , DeliveryCount , CappedCredit , false , QStates0 ),
14501449 Link = Link0 # outgoing_link {
1451- queue_flow_ctl = QFC # queue_flow_ctl {credit = CappedCredit }
1452- },
1450+ queue_flow_ctl = QFC0 # queue_flow_ctl {credit = CappedCredit },
1451+ at_least_one_credit_req_in_flight = true },
14531452 S = S0 # state {queue_states = QStates ,
14541453 outgoing_links = OutgoingLinks #{Handle := Link }},
14551454 handle_queue_actions (Actions , S );
14561455 none ->
1457- Link = Link0 # outgoing_link {credit_req_in_flight = false },
1456+ % % Although we (the receiver) usually determine link credit, we set here
1457+ % % our link credit to what the queue says our link credit is (which is safer
1458+ % % in case credit requests got applied out of order in quorum queues).
1459+ % % This should be fine given that we asserted earlier that our delivery-count is
1460+ % % in sync with the delivery-count of the sending queue.
1461+ QFC = QFC0 # queue_flow_ctl {credit = Credit },
1462+ Link = Link0 # outgoing_link {
1463+ queue_flow_ctl = QFC ,
1464+ at_least_one_credit_req_in_flight = false },
14581465 S = S0 # state {outgoing_links = OutgoingLinks #{Handle := Link }},
1459- echo (CEcho , Handle , CDeliveryCount , DesiredCredit , Available , S ),
1466+ echo (CEcho , Handle , CDeliveryCount , CCredit , Available , S ),
14601467 S
14611468 end ;
14621469handle_credit_reply0 (
@@ -1465,10 +1472,11 @@ handle_credit_reply0(
14651472 Link0 = # outgoing_link {
14661473 queue_name = QName ,
14671474 client_flow_ctl = # client_flow_ctl {
1468- delivery_count = CDeliveryCount0 } = CFC ,
1475+ delivery_count = CDeliveryCount0 ,
1476+ credit = CCredit
1477+ } = CFC ,
14691478 queue_flow_ctl = # queue_flow_ctl {
1470- delivery_count = QDeliveryCount0 ,
1471- desired_credit = DesiredCredit
1479+ delivery_count = QDeliveryCount0
14721480 } = QFC ,
14731481 stashed_credit_req = StashedCreditReq },
14741482 S0 = # state {cfg = # cfg {writer_pid = Writer ,
@@ -1480,31 +1488,37 @@ handle_credit_reply0(
14801488 0 = Credit ,
14811489
14821490 case DeliveryCount =:= QDeliveryCount0 andalso
1483- DesiredCredit > 0 of
1491+ CCredit > 0 of
14841492 true ->
14851493 % % We're in drain mode. The queue did not advance its delivery-count which means
1486- % % it might still have messages available for us. We also desire more messages.
1494+ % % it might still have messages available for us. The client also desires more messages.
14871495 % % Therefore, we do the next round of credit top-up. We prioritise finishing
14881496 % % the current drain credit top-up rounds over a stashed credit request because
14891497 % % this is easier to reason about and the queue will reply promptly meaning
14901498 % % the stashed request will be processed soon enough.
1491- CappedCredit = cap_credit (DesiredCredit ),
1492- Link = Link0 # outgoing_link {queue_flow_ctl = QFC # queue_flow_ctl {credit = CappedCredit }},
1493-
1494- {ok , QStates , Actions } =
1495- rabbit_queue_type :credit (
1496- QName , Ctag , DeliveryCount , CappedCredit , true , QStates0 ),
1499+ CappedCredit = cap_credit (CCredit ),
1500+ {ok , QStates , Actions } = rabbit_queue_type :credit (
1501+ QName , Ctag , DeliveryCount ,
1502+ CappedCredit , true , QStates0 ),
1503+ Link = Link0 # outgoing_link {
1504+ queue_flow_ctl = QFC # queue_flow_ctl {credit = CappedCredit },
1505+ at_least_one_credit_req_in_flight = true },
14971506 S = S0 # state {queue_states = QStates ,
14981507 outgoing_links = OutgoingLinks #{Handle := Link }},
14991508 handle_queue_actions (Actions , S );
15001509 false ->
1510+ % % Assertion: The delivery-count of the sending queue must be:
1511+ % % * 'greater' if the sending queue advanced its delivery-count
1512+ % % * 'equal' otherwise
1513+ compare (DeliveryCount , QDeliveryCount0 ) =/= less ,
1514+
15011515 % % We're in drain mode.
15021516 % % The queue either advanced its delivery-count which means it has
1503- % % no more messages available for us, or we do not desire more messages.
1517+ % % no more messages available for us, or the client does not desire more messages.
15041518 % % Therefore, we're done with draining and we "the sender will (after sending
15051519 % % all available messages) advance the delivery-count as much as possible,
15061520 % % consuming all link-credit, and send the flow state to the receiver."
1507- CDeliveryCount = add (CDeliveryCount0 , DesiredCredit ),
1521+ CDeliveryCount = add (CDeliveryCount0 , CCredit ),
15081522 Flow0 = # 'v1_0.flow' {handle = ? UINT (Handle ),
15091523 delivery_count = ? UINT (CDeliveryCount ),
15101524 link_credit = ? UINT (0 ),
@@ -1519,9 +1533,8 @@ handle_credit_reply0(
15191533 queue_flow_ctl = QFC # queue_flow_ctl {
15201534 delivery_count = DeliveryCount ,
15211535 credit = 0 ,
1522- desired_credit = 0 ,
15231536 drain = false },
1524- credit_req_in_flight = false
1537+ at_least_one_credit_req_in_flight = false
15251538 },
15261539 S = S0 # state {outgoing_links = OutgoingLinks #{Handle := Link }},
15271540 case StashedCreditReq of
@@ -1553,19 +1566,17 @@ pop_credit_req(
15531566 LinkCreditSnd = amqp10_util :link_credit_snd (
15541567 DeliveryCountRcv , LinkCreditRcv , CDeliveryCount ),
15551568 CappedCredit = cap_credit (LinkCreditSnd ),
1556- {ok , QStates , Actions } =
1557- rabbit_queue_type : credit (
1558- QName , Ctag , QDeliveryCount , CappedCredit , Drain , QStates0 ),
1569+ {ok , QStates , Actions } = rabbit_queue_type : credit (
1570+ QName , Ctag , QDeliveryCount ,
1571+ CappedCredit , Drain , QStates0 ),
15591572 Link = Link0 # outgoing_link {
15601573 client_flow_ctl = CFC # client_flow_ctl {
15611574 credit = LinkCreditSnd ,
15621575 echo = Echo },
15631576 queue_flow_ctl = QFC # queue_flow_ctl {
15641577 credit = CappedCredit ,
1565- desired_credit = LinkCreditSnd ,
1566- drain = Drain
1567- },
1568- credit_req_in_flight = true ,
1578+ drain = Drain },
1579+ at_least_one_credit_req_in_flight = true ,
15691580 stashed_credit_req = none
15701581 },
15711582 S = S0 # state {queue_states = QStates ,
@@ -1685,19 +1696,20 @@ sent_pending_delivery(
16851696 credit_api_version = CreditApiVsn ,
16861697 client_flow_ctl = CFC0 ,
16871698 queue_flow_ctl = QFC0 ,
1688- credit_req_in_flight = CreditReqInFlight0
1699+ at_least_one_credit_req_in_flight = CreditReqInFlight0
16891700 } = Link0 = maps :get (Handle , OutgoingLinks0 ),
16901701
16911702 S = case CreditApiVsn of
1703+ 1 ->
1704+ S0 ;
16921705 2 ->
16931706 # client_flow_ctl {
16941707 delivery_count = CDeliveryCount0 ,
16951708 credit = CCredit0
16961709 } = CFC0 ,
16971710 # queue_flow_ctl {
16981711 delivery_count = QDeliveryCount0 ,
1699- credit = QCredit0 ,
1700- desired_credit = DesiredCredit0
1712+ credit = QCredit0
17011713 } = QFC0 ,
17021714
17031715 CDeliveryCount = add (CDeliveryCount0 , 1 ),
@@ -1715,17 +1727,16 @@ sent_pending_delivery(
17151727
17161728 QDeliveryCount = add (QDeliveryCount0 , 1 ),
17171729 QCredit1 = max (0 , QCredit0 - 1 ),
1718- DesiredCredit = max (0 , DesiredCredit0 - 1 ),
17191730
17201731 {QCredit , CreditReqInFlight , QStates , Actions } =
17211732 case QCredit1 =:= 0 andalso
1722- DesiredCredit > 0 andalso
1733+ CCredit > 0 andalso
17231734 not CreditReqInFlight0 of
17241735 true ->
17251736 % % assertion
17261737 none = Link0 # outgoing_link .stashed_credit_req ,
17271738 % % Provide queue next batch of credits.
1728- CappedCredit = cap_credit (DesiredCredit ),
1739+ CappedCredit = cap_credit (CCredit ),
17291740 {ok , QStates1 , Actions0 } =
17301741 rabbit_queue_type :credit (
17311742 QName , Ctag , QDeliveryCount , CappedCredit ,
@@ -1740,17 +1751,15 @@ sent_pending_delivery(
17401751 credit = CCredit },
17411752 QFC = QFC0 # queue_flow_ctl {
17421753 delivery_count = QDeliveryCount ,
1743- credit = QCredit ,
1744- desired_credit = DesiredCredit },
1745- Link = Link0 # outgoing_link { client_flow_ctl = CFC ,
1746- queue_flow_ctl = QFC ,
1747- credit_req_in_flight = CreditReqInFlight },
1754+ credit = QCredit } ,
1755+ Link = Link0 # outgoing_link {
1756+ client_flow_ctl = CFC ,
1757+ queue_flow_ctl = QFC ,
1758+ at_least_one_credit_req_in_flight = CreditReqInFlight },
17481759 OutgoingLinks = OutgoingLinks0 #{Handle := Link },
17491760 S1 = S0 # state {outgoing_links = OutgoingLinks ,
17501761 queue_states = QStates },
1751- handle_queue_actions (Actions , S1 );
1752- 1 ->
1753- S0
1762+ handle_queue_actions (Actions , S1 )
17541763 end ,
17551764 record_outgoing_unsettled (Pending , S ).
17561765
@@ -2677,7 +2686,7 @@ handle_outgoing_link_flow_control(
26772686 credit_api_version = CreditApiVsn ,
26782687 client_flow_ctl = CFC ,
26792688 queue_flow_ctl = QFC ,
2680- credit_req_in_flight = CreditReqInFlight
2689+ at_least_one_credit_req_in_flight = CreditReqInFlight
26812690 } = Link0 ,
26822691 # 'v1_0.flow' {handle = ? UINT (HandleInt ),
26832692 delivery_count = MaybeDeliveryCountRcv ,
@@ -2695,26 +2704,26 @@ handle_outgoing_link_flow_control(
26952704 2 ->
26962705 case CreditReqInFlight of
26972706 false ->
2698- DesiredCredit = amqp10_util :link_credit_snd (
2707+ LinkCreditSnd = amqp10_util :link_credit_snd (
26992708 DeliveryCountRcv ,
27002709 LinkCreditRcv ,
27012710 CFC # client_flow_ctl .delivery_count ),
2702- CappedCredit = cap_credit (DesiredCredit ),
2711+ CappedCredit = cap_credit (LinkCreditSnd ),
27032712 Link = Link0 # outgoing_link {
2704- credit_req_in_flight = true ,
27052713 client_flow_ctl = CFC # client_flow_ctl {
2706- credit = DesiredCredit ,
2714+ credit = LinkCreditSnd ,
27072715 echo = Echo },
27082716 queue_flow_ctl = QFC # queue_flow_ctl {
27092717 credit = CappedCredit ,
2710- desired_credit = DesiredCredit ,
2711- drain = Drain } },
2718+ drain = Drain } ,
2719+ at_least_one_credit_req_in_flight = true },
27122720 {ok , QStates , Actions } = rabbit_queue_type :credit (
27132721 QName , Ctag ,
27142722 QFC # queue_flow_ctl .delivery_count ,
27152723 CappedCredit , Drain , QStates0 ),
2716- State = State0 # state {queue_states = QStates ,
2717- outgoing_links = OutgoingLinks #{HandleInt := Link }},
2724+ State = State0 # state {
2725+ queue_states = QStates ,
2726+ outgoing_links = OutgoingLinks #{HandleInt := Link }},
27182727 handle_queue_actions (Actions , State );
27192728 true ->
27202729 % % A credit request is currently in-flight. Let's first process its reply
0 commit comments