172172-record (queue_flow_ctl , {
173173 delivery_count :: sequence_no (),
174174 % % We cap the actual credit we grant to the sending queue.
175+ % % If client_flow_ctl.credit is larger than LINK_CREDIT_RCV_FROM_QUEUE_MAX,
176+ % % we will top up in batches to the sending queue.
175177 credit :: 0 ..? LINK_CREDIT_RCV_FROM_QUEUE_MAX ,
176- % % Credit as desired by the receiving client. If larger than
177- % % LINK_CREDIT_RCV_FROM_QUEUE_MAX, we will top up in batches to the sending queue.
178- desired_credit :: rabbit_queue_type :credit (),
179178 drain :: boolean ()
180179 }).
181180
197196 % % client and for the link to the sending queue.
198197 client_flow_ctl :: # client_flow_ctl {} | credit_api_v1 ,
199198 queue_flow_ctl :: # queue_flow_ctl {} | credit_api_v1 ,
200- % % True if we sent a credit request to the sending queue
201- % % but haven't processed the corresponding credit reply yet.
202- credit_req_in_flight :: boolean () | credit_api_v1 ,
203- % % While credit_req_in_flight is true, we stash the
199+ % % 'true' means:
200+ % % * we haven't processed a credit reply yet since we last sent
201+ % % a credit request to the sending queue.
202+ % % * a credit request is certainly is in flight
203+ % % * possibly multiple credit requests are in flight (e.g. rabbit_fifo_client
204+ % % will re-send credit requests on our behalf on quorum queue leader changes)
205+ % % 'false' means:
206+ % % * we processed a credit reply since we last sent a credit request to the sending queue
207+ % % * probably no credit request is in flight, but there might be
208+ % % (we aren't sure since we don't use correlations for credit requests)
209+ at_least_one_credit_req_in_flight :: boolean () | credit_api_v1 ,
210+ % % While at_least_one_credit_req_in_flight is true, we stash the
204211 % % latest credit request from the receiving client.
205212 stashed_credit_req :: none | # credit_req {} | credit_api_v1
206213 }).
@@ -1066,7 +1073,6 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
10661073 echo = false },
10671074 # queue_flow_ctl {delivery_count = ? INITIAL_DELIVERY_COUNT ,
10681075 credit = 0 ,
1069- desired_credit = 0 ,
10701076 drain = false },
10711077 false ,
10721078 none };
@@ -1116,7 +1122,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
11161122 delivery_count = DeliveryCount ,
11171123 client_flow_ctl = ClientFlowCtl ,
11181124 queue_flow_ctl = QueueFlowCtl ,
1119- credit_req_in_flight = CreditReqInFlight ,
1125+ at_least_one_credit_req_in_flight = CreditReqInFlight ,
11201126 stashed_credit_req = StashedCreditReq },
11211127 OutgoingLinks = OutgoingLinks0 #{HandleInt => Link },
11221128 State1 = State0 # state {queue_states = QStates ,
@@ -1392,16 +1398,11 @@ send_pending(#state{remote_incoming_window = RemoteIncomingWindow,
13921398 end
13931399 end .
13941400
1395- handle_credit_reply (Action = {credit_reply , Ctag , _DeliveryCount , _Credit , _Available , Drain },
1401+ handle_credit_reply (Action = {credit_reply , Ctag , _DeliveryCount , _Credit , _Available , _Drain },
13961402 State = # state {outgoing_links = OutgoingLinks }) ->
13971403 Handle = ctag_to_handle (Ctag ),
13981404 case OutgoingLinks of
1399- #{Handle := Link = # outgoing_link {queue_flow_ctl = QFC ,
1400- credit_req_in_flight = CreditReqInFlight }} ->
1401- % % Assert that we expect a credit reply for this consumer.
1402- true = CreditReqInFlight ,
1403- % % Assert that "The sender's value is always the last known value indicated by the receiver."
1404- Drain = QFC # queue_flow_ctl .drain ,
1405+ #{Handle := Link } ->
14051406 handle_credit_reply0 (Action , Handle , Link , State );
14061407 _ ->
14071408 % % Ignore credit reply for a detached link.
@@ -1418,18 +1419,16 @@ handle_credit_reply0(
14181419 echo = CEcho
14191420 },
14201421 queue_flow_ctl = # queue_flow_ctl {
1421- delivery_count = QDeliveryCount ,
1422- credit = QCredit ,
1423- desired_credit = DesiredCredit
1424- } = QFC ,
1422+ delivery_count = QDeliveryCount
1423+ } = QFC0 ,
14251424 stashed_credit_req = StashedCreditReq
14261425 } = Link0 ,
14271426 # state {outgoing_links = OutgoingLinks ,
14281427 queue_states = QStates0
14291428 } = S0 ) ->
14301429
1431- % % Assert that flow control state between us and the queue is in sync.
1432- QCredit = Credit ,
1430+ % % Assertion: Our (receiver) delivery-count should be always
1431+ % % in sync with the delivery-count of the sending queue.
14331432 QDeliveryCount = DeliveryCount ,
14341433
14351434 case StashedCreditReq of
@@ -1439,24 +1438,32 @@ handle_credit_reply0(
14391438 S = pop_credit_req (Handle , Ctag , Link0 , S0 ),
14401439 echo (CEcho , Handle , CDeliveryCount , CCredit , Available , S ),
14411440 S ;
1442- none when QCredit =:= 0 andalso
1443- DesiredCredit > 0 ->
1441+ none when Credit =:= 0 andalso
1442+ CCredit > 0 ->
14441443 QName = Link0 # outgoing_link .queue_name ,
14451444 % % Provide queue next batch of credits.
1446- CappedCredit = cap_credit (DesiredCredit ),
1445+ CappedCredit = cap_credit (CCredit ),
14471446 {ok , QStates , Actions } =
14481447 rabbit_queue_type :credit (
14491448 QName , Ctag , DeliveryCount , CappedCredit , false , QStates0 ),
14501449 Link = Link0 # outgoing_link {
1451- queue_flow_ctl = QFC # queue_flow_ctl {credit = CappedCredit }
1452- },
1450+ queue_flow_ctl = QFC0 # queue_flow_ctl {credit = CappedCredit },
1451+ at_least_one_credit_req_in_flight = true },
14531452 S = S0 # state {queue_states = QStates ,
14541453 outgoing_links = OutgoingLinks #{Handle := Link }},
14551454 handle_queue_actions (Actions , S );
14561455 none ->
1457- Link = Link0 # outgoing_link {credit_req_in_flight = false },
1456+ % % Although we (the receiver) usually determine link credit, we set here
1457+ % % our link credit to what the queue says our link credit is (which is safer
1458+ % % in case credit requests got applied out of order in quorum queues).
1459+ % % This should be fine given that we asserted earlier that our delivery-count is
1460+ % % in sync with the delivery-count of the sending queue.
1461+ QFC = QFC0 # queue_flow_ctl {credit = Credit },
1462+ Link = Link0 # outgoing_link {
1463+ queue_flow_ctl = QFC ,
1464+ at_least_one_credit_req_in_flight = false },
14581465 S = S0 # state {outgoing_links = OutgoingLinks #{Handle := Link }},
1459- echo (CEcho , Handle , CDeliveryCount , DesiredCredit , Available , S ),
1466+ echo (CEcho , Handle , CDeliveryCount , CCredit , Available , S ),
14601467 S
14611468 end ;
14621469handle_credit_reply0 (
@@ -1465,10 +1472,11 @@ handle_credit_reply0(
14651472 Link0 = # outgoing_link {
14661473 queue_name = QName ,
14671474 client_flow_ctl = # client_flow_ctl {
1468- delivery_count = CDeliveryCount0 } = CFC ,
1475+ delivery_count = CDeliveryCount0 ,
1476+ credit = CCredit
1477+ } = CFC ,
14691478 queue_flow_ctl = # queue_flow_ctl {
1470- delivery_count = QDeliveryCount0 ,
1471- desired_credit = DesiredCredit
1479+ delivery_count = QDeliveryCount0
14721480 } = QFC ,
14731481 stashed_credit_req = StashedCreditReq },
14741482 S0 = # state {cfg = # cfg {writer_pid = Writer ,
@@ -1480,31 +1488,38 @@ handle_credit_reply0(
14801488 0 = Credit ,
14811489
14821490 case DeliveryCount =:= QDeliveryCount0 andalso
1483- DesiredCredit > 0 of
1491+ CCredit > 0 of
14841492 true ->
14851493 % % We're in drain mode. The queue did not advance its delivery-count which means
1486- % % it might still have messages available for us. We also desire more messages.
1494+ % % it might still have messages available for us. The client also desires more messages.
14871495 % % Therefore, we do the next round of credit top-up. We prioritise finishing
14881496 % % the current drain credit top-up rounds over a stashed credit request because
14891497 % % this is easier to reason about and the queue will reply promptly meaning
14901498 % % the stashed request will be processed soon enough.
1491- CappedCredit = cap_credit (DesiredCredit ),
1492- Link = Link0 # outgoing_link {queue_flow_ctl = QFC # queue_flow_ctl {credit = CappedCredit }},
1493-
1494- {ok , QStates , Actions } =
1495- rabbit_queue_type :credit (
1496- QName , Ctag , DeliveryCount , CappedCredit , true , QStates0 ),
1499+ CappedCredit = cap_credit (CCredit ),
1500+ {ok , QStates , Actions } = rabbit_queue_type :credit (
1501+ QName , Ctag , DeliveryCount ,
1502+ CappedCredit , true , QStates0 ),
1503+ Link = Link0 # outgoing_link {
1504+ queue_flow_ctl = QFC # queue_flow_ctl {credit = CappedCredit },
1505+ at_least_one_credit_req_in_flight = true },
14971506 S = S0 # state {queue_states = QStates ,
14981507 outgoing_links = OutgoingLinks #{Handle := Link }},
14991508 handle_queue_actions (Actions , S );
15001509 false ->
1510+ case compare (DeliveryCount , QDeliveryCount0 ) of
1511+ equal -> ok ;
1512+ greater -> ok ; % % the sending queue advanced its delivery-count
1513+ less -> error ({unexpected_delivery_count , DeliveryCount , QDeliveryCount0 })
1514+ end ,
1515+
15011516 % % We're in drain mode.
15021517 % % The queue either advanced its delivery-count which means it has
1503- % % no more messages available for us, or we do not desire more messages.
1518+ % % no more messages available for us, or the client does not desire more messages.
15041519 % % Therefore, we're done with draining and we "the sender will (after sending
15051520 % % all available messages) advance the delivery-count as much as possible,
15061521 % % consuming all link-credit, and send the flow state to the receiver."
1507- CDeliveryCount = add (CDeliveryCount0 , DesiredCredit ),
1522+ CDeliveryCount = add (CDeliveryCount0 , CCredit ),
15081523 Flow0 = # 'v1_0.flow' {handle = ? UINT (Handle ),
15091524 delivery_count = ? UINT (CDeliveryCount ),
15101525 link_credit = ? UINT (0 ),
@@ -1519,9 +1534,8 @@ handle_credit_reply0(
15191534 queue_flow_ctl = QFC # queue_flow_ctl {
15201535 delivery_count = DeliveryCount ,
15211536 credit = 0 ,
1522- desired_credit = 0 ,
15231537 drain = false },
1524- credit_req_in_flight = false
1538+ at_least_one_credit_req_in_flight = false
15251539 },
15261540 S = S0 # state {outgoing_links = OutgoingLinks #{Handle := Link }},
15271541 case StashedCreditReq of
@@ -1553,19 +1567,17 @@ pop_credit_req(
15531567 LinkCreditSnd = amqp10_util :link_credit_snd (
15541568 DeliveryCountRcv , LinkCreditRcv , CDeliveryCount ),
15551569 CappedCredit = cap_credit (LinkCreditSnd ),
1556- {ok , QStates , Actions } =
1557- rabbit_queue_type : credit (
1558- QName , Ctag , QDeliveryCount , CappedCredit , Drain , QStates0 ),
1570+ {ok , QStates , Actions } = rabbit_queue_type : credit (
1571+ QName , Ctag , QDeliveryCount ,
1572+ CappedCredit , Drain , QStates0 ),
15591573 Link = Link0 # outgoing_link {
15601574 client_flow_ctl = CFC # client_flow_ctl {
15611575 credit = LinkCreditSnd ,
15621576 echo = Echo },
15631577 queue_flow_ctl = QFC # queue_flow_ctl {
15641578 credit = CappedCredit ,
1565- desired_credit = LinkCreditSnd ,
1566- drain = Drain
1567- },
1568- credit_req_in_flight = true ,
1579+ drain = Drain },
1580+ at_least_one_credit_req_in_flight = true ,
15691581 stashed_credit_req = none
15701582 },
15711583 S = S0 # state {queue_states = QStates ,
@@ -1685,19 +1697,20 @@ sent_pending_delivery(
16851697 credit_api_version = CreditApiVsn ,
16861698 client_flow_ctl = CFC0 ,
16871699 queue_flow_ctl = QFC0 ,
1688- credit_req_in_flight = CreditReqInFlight0
1700+ at_least_one_credit_req_in_flight = CreditReqInFlight0
16891701 } = Link0 = maps :get (Handle , OutgoingLinks0 ),
16901702
16911703 S = case CreditApiVsn of
1704+ 1 ->
1705+ S0 ;
16921706 2 ->
16931707 # client_flow_ctl {
16941708 delivery_count = CDeliveryCount0 ,
16951709 credit = CCredit0
16961710 } = CFC0 ,
16971711 # queue_flow_ctl {
16981712 delivery_count = QDeliveryCount0 ,
1699- credit = QCredit0 ,
1700- desired_credit = DesiredCredit0
1713+ credit = QCredit0
17011714 } = QFC0 ,
17021715
17031716 CDeliveryCount = add (CDeliveryCount0 , 1 ),
@@ -1715,17 +1728,16 @@ sent_pending_delivery(
17151728
17161729 QDeliveryCount = add (QDeliveryCount0 , 1 ),
17171730 QCredit1 = max (0 , QCredit0 - 1 ),
1718- DesiredCredit = max (0 , DesiredCredit0 - 1 ),
17191731
17201732 {QCredit , CreditReqInFlight , QStates , Actions } =
17211733 case QCredit1 =:= 0 andalso
1722- DesiredCredit > 0 andalso
1734+ CCredit > 0 andalso
17231735 not CreditReqInFlight0 of
17241736 true ->
17251737 % % assertion
17261738 none = Link0 # outgoing_link .stashed_credit_req ,
17271739 % % Provide queue next batch of credits.
1728- CappedCredit = cap_credit (DesiredCredit ),
1740+ CappedCredit = cap_credit (CCredit ),
17291741 {ok , QStates1 , Actions0 } =
17301742 rabbit_queue_type :credit (
17311743 QName , Ctag , QDeliveryCount , CappedCredit ,
@@ -1740,17 +1752,15 @@ sent_pending_delivery(
17401752 credit = CCredit },
17411753 QFC = QFC0 # queue_flow_ctl {
17421754 delivery_count = QDeliveryCount ,
1743- credit = QCredit ,
1744- desired_credit = DesiredCredit },
1745- Link = Link0 # outgoing_link { client_flow_ctl = CFC ,
1746- queue_flow_ctl = QFC ,
1747- credit_req_in_flight = CreditReqInFlight },
1755+ credit = QCredit } ,
1756+ Link = Link0 # outgoing_link {
1757+ client_flow_ctl = CFC ,
1758+ queue_flow_ctl = QFC ,
1759+ at_least_one_credit_req_in_flight = CreditReqInFlight },
17481760 OutgoingLinks = OutgoingLinks0 #{Handle := Link },
17491761 S1 = S0 # state {outgoing_links = OutgoingLinks ,
17501762 queue_states = QStates },
1751- handle_queue_actions (Actions , S1 );
1752- 1 ->
1753- S0
1763+ handle_queue_actions (Actions , S1 )
17541764 end ,
17551765 record_outgoing_unsettled (Pending , S ).
17561766
@@ -2677,7 +2687,7 @@ handle_outgoing_link_flow_control(
26772687 credit_api_version = CreditApiVsn ,
26782688 client_flow_ctl = CFC ,
26792689 queue_flow_ctl = QFC ,
2680- credit_req_in_flight = CreditReqInFlight
2690+ at_least_one_credit_req_in_flight = CreditReqInFlight
26812691 } = Link0 ,
26822692 # 'v1_0.flow' {handle = ? UINT (HandleInt ),
26832693 delivery_count = MaybeDeliveryCountRcv ,
@@ -2695,26 +2705,26 @@ handle_outgoing_link_flow_control(
26952705 2 ->
26962706 case CreditReqInFlight of
26972707 false ->
2698- DesiredCredit = amqp10_util :link_credit_snd (
2708+ LinkCreditSnd = amqp10_util :link_credit_snd (
26992709 DeliveryCountRcv ,
27002710 LinkCreditRcv ,
27012711 CFC # client_flow_ctl .delivery_count ),
2702- CappedCredit = cap_credit (DesiredCredit ),
2712+ CappedCredit = cap_credit (LinkCreditSnd ),
27032713 Link = Link0 # outgoing_link {
2704- credit_req_in_flight = true ,
27052714 client_flow_ctl = CFC # client_flow_ctl {
2706- credit = DesiredCredit ,
2715+ credit = LinkCreditSnd ,
27072716 echo = Echo },
27082717 queue_flow_ctl = QFC # queue_flow_ctl {
27092718 credit = CappedCredit ,
2710- desired_credit = DesiredCredit ,
2711- drain = Drain } },
2719+ drain = Drain } ,
2720+ at_least_one_credit_req_in_flight = true },
27122721 {ok , QStates , Actions } = rabbit_queue_type :credit (
27132722 QName , Ctag ,
27142723 QFC # queue_flow_ctl .delivery_count ,
27152724 CappedCredit , Drain , QStates0 ),
2716- State = State0 # state {queue_states = QStates ,
2717- outgoing_links = OutgoingLinks #{HandleInt := Link }},
2725+ State = State0 # state {
2726+ queue_states = QStates ,
2727+ outgoing_links = OutgoingLinks #{HandleInt := Link }},
27182728 handle_queue_actions (Actions , State );
27192729 true ->
27202730 % % A credit request is currently in-flight. Let's first process its reply
0 commit comments