Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 84 additions & 74 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,9 @@
-record(queue_flow_ctl, {
delivery_count :: sequence_no(),
%% We cap the actual credit we grant to the sending queue.
%% If client_flow_ctl.credit is larger than LINK_CREDIT_RCV_FROM_QUEUE_MAX,
%% we will top up in batches to the sending queue.
credit :: 0..?LINK_CREDIT_RCV_FROM_QUEUE_MAX,
%% Credit as desired by the receiving client. If larger than
%% LINK_CREDIT_RCV_FROM_QUEUE_MAX, we will top up in batches to the sending queue.
desired_credit :: rabbit_queue_type:credit(),
drain :: boolean()
}).

Expand All @@ -197,10 +196,18 @@
%% client and for the link to the sending queue.
client_flow_ctl :: #client_flow_ctl{} | credit_api_v1,
queue_flow_ctl :: #queue_flow_ctl{} | credit_api_v1,
%% True if we sent a credit request to the sending queue
%% but haven't processed the corresponding credit reply yet.
credit_req_in_flight :: boolean() | credit_api_v1,
%% While credit_req_in_flight is true, we stash the
%% 'true' means:
%% * we haven't processed a credit reply yet since we last sent
%% a credit request to the sending queue.
%% * a credit request is certainly in flight
%% * possibly multiple credit requests are in flight (e.g. rabbit_fifo_client
%% will re-send credit requests on our behalf on quorum queue leader changes)
%% 'false' means:
%% * we processed a credit reply since we last sent a credit request to the sending queue
%% * probably no credit request is in flight, but there might be
%% (we aren't sure since we don't use correlations for credit requests)
at_least_one_credit_req_in_flight :: boolean() | credit_api_v1,
%% While at_least_one_credit_req_in_flight is true, we stash the
%% latest credit request from the receiving client.
stashed_credit_req :: none | #credit_req{} | credit_api_v1
}).
Expand Down Expand Up @@ -1066,7 +1073,6 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
echo = false},
#queue_flow_ctl{delivery_count = ?INITIAL_DELIVERY_COUNT,
credit = 0,
desired_credit = 0,
drain = false},
false,
none};
Expand Down Expand Up @@ -1116,7 +1122,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
delivery_count = DeliveryCount,
client_flow_ctl = ClientFlowCtl,
queue_flow_ctl = QueueFlowCtl,
credit_req_in_flight = CreditReqInFlight,
at_least_one_credit_req_in_flight = CreditReqInFlight,
stashed_credit_req = StashedCreditReq},
OutgoingLinks = OutgoingLinks0#{HandleInt => Link},
State1 = State0#state{queue_states = QStates,
Expand Down Expand Up @@ -1392,16 +1398,11 @@ send_pending(#state{remote_incoming_window = RemoteIncomingWindow,
end
end.

handle_credit_reply(Action = {credit_reply, Ctag, _DeliveryCount, _Credit, _Available, Drain},
handle_credit_reply(Action = {credit_reply, Ctag, _DeliveryCount, _Credit, _Available, _Drain},
State = #state{outgoing_links = OutgoingLinks}) ->
Handle = ctag_to_handle(Ctag),
case OutgoingLinks of
#{Handle := Link = #outgoing_link{queue_flow_ctl = QFC,
credit_req_in_flight = CreditReqInFlight}} ->
%% Assert that we expect a credit reply for this consumer.
true = CreditReqInFlight,
%% Assert that "The sender's value is always the last known value indicated by the receiver."
Drain = QFC#queue_flow_ctl.drain,
#{Handle := Link} ->
handle_credit_reply0(Action, Handle, Link, State);
_ ->
%% Ignore credit reply for a detached link.
Expand All @@ -1418,18 +1419,16 @@ handle_credit_reply0(
echo = CEcho
},
queue_flow_ctl = #queue_flow_ctl{
delivery_count = QDeliveryCount,
credit = QCredit,
desired_credit = DesiredCredit
} = QFC,
delivery_count = QDeliveryCount
} = QFC0,
stashed_credit_req = StashedCreditReq
} = Link0,
#state{outgoing_links = OutgoingLinks,
queue_states = QStates0
} = S0) ->

%% Assert that flow control state between us and the queue is in sync.
QCredit = Credit,
%% Assertion: Our (receiver) delivery-count should be always
%% in sync with the delivery-count of the sending queue.
QDeliveryCount = DeliveryCount,

case StashedCreditReq of
Expand All @@ -1439,24 +1438,32 @@ handle_credit_reply0(
S = pop_credit_req(Handle, Ctag, Link0, S0),
echo(CEcho, Handle, CDeliveryCount, CCredit, Available, S),
S;
none when QCredit =:= 0 andalso
DesiredCredit > 0 ->
none when Credit =:= 0 andalso
CCredit > 0 ->
QName = Link0#outgoing_link.queue_name,
%% Provide queue next batch of credits.
CappedCredit = cap_credit(DesiredCredit),
CappedCredit = cap_credit(CCredit),
{ok, QStates, Actions} =
rabbit_queue_type:credit(
QName, Ctag, DeliveryCount, CappedCredit, false, QStates0),
Link = Link0#outgoing_link{
queue_flow_ctl = QFC#queue_flow_ctl{credit = CappedCredit}
},
queue_flow_ctl = QFC0#queue_flow_ctl{credit = CappedCredit},
at_least_one_credit_req_in_flight = true},
S = S0#state{queue_states = QStates,
outgoing_links = OutgoingLinks#{Handle := Link}},
handle_queue_actions(Actions, S);
none ->
Link = Link0#outgoing_link{credit_req_in_flight = false},
%% Although we (the receiver) usually determine link credit, we set here
%% our link credit to what the queue says our link credit is (which is safer
%% in case credit requests got applied out of order in quorum queues).
%% This should be fine given that we asserted earlier that our delivery-count is
%% in sync with the delivery-count of the sending queue.
QFC = QFC0#queue_flow_ctl{credit = Credit},
Link = Link0#outgoing_link{
queue_flow_ctl = QFC,
at_least_one_credit_req_in_flight = false},
S = S0#state{outgoing_links = OutgoingLinks#{Handle := Link}},
echo(CEcho, Handle, CDeliveryCount, DesiredCredit, Available, S),
echo(CEcho, Handle, CDeliveryCount, CCredit, Available, S),
S
end;
handle_credit_reply0(
Expand All @@ -1465,10 +1472,11 @@ handle_credit_reply0(
Link0 = #outgoing_link{
queue_name = QName,
client_flow_ctl = #client_flow_ctl{
delivery_count = CDeliveryCount0 } = CFC,
delivery_count = CDeliveryCount0,
credit = CCredit
} = CFC,
queue_flow_ctl = #queue_flow_ctl{
delivery_count = QDeliveryCount0,
desired_credit = DesiredCredit
delivery_count = QDeliveryCount0
} = QFC,
stashed_credit_req = StashedCreditReq},
S0 = #state{cfg = #cfg{writer_pid = Writer,
Expand All @@ -1480,31 +1488,38 @@ handle_credit_reply0(
0 = Credit,

case DeliveryCount =:= QDeliveryCount0 andalso
DesiredCredit > 0 of
CCredit > 0 of
true ->
%% We're in drain mode. The queue did not advance its delivery-count which means
%% it might still have messages available for us. We also desire more messages.
%% it might still have messages available for us. The client also desires more messages.
%% Therefore, we do the next round of credit top-up. We prioritise finishing
%% the current drain credit top-up rounds over a stashed credit request because
%% this is easier to reason about and the queue will reply promptly meaning
%% the stashed request will be processed soon enough.
CappedCredit = cap_credit(DesiredCredit),
Link = Link0#outgoing_link{queue_flow_ctl = QFC#queue_flow_ctl{credit = CappedCredit}},

{ok, QStates, Actions} =
rabbit_queue_type:credit(
QName, Ctag, DeliveryCount, CappedCredit, true, QStates0),
CappedCredit = cap_credit(CCredit),
{ok, QStates, Actions} = rabbit_queue_type:credit(
QName, Ctag, DeliveryCount,
CappedCredit, true, QStates0),
Link = Link0#outgoing_link{
queue_flow_ctl = QFC#queue_flow_ctl{credit = CappedCredit},
at_least_one_credit_req_in_flight = true},
S = S0#state{queue_states = QStates,
outgoing_links = OutgoingLinks#{Handle := Link}},
handle_queue_actions(Actions, S);
false ->
case compare(DeliveryCount, QDeliveryCount0) of
equal -> ok;
greater -> ok; %% the sending queue advanced its delivery-count
less -> error({unexpected_delivery_count, DeliveryCount, QDeliveryCount0})
end,

%% We're in drain mode.
%% The queue either advanced its delivery-count which means it has
%% no more messages available for us, or we do not desire more messages.
%% no more messages available for us, or the client does not desire more messages.
%% Therefore, we're done with draining and we "the sender will (after sending
%% all available messages) advance the delivery-count as much as possible,
%% consuming all link-credit, and send the flow state to the receiver."
CDeliveryCount = add(CDeliveryCount0, DesiredCredit),
CDeliveryCount = add(CDeliveryCount0, CCredit),
Flow0 = #'v1_0.flow'{handle = ?UINT(Handle),
delivery_count = ?UINT(CDeliveryCount),
link_credit = ?UINT(0),
Expand All @@ -1519,9 +1534,8 @@ handle_credit_reply0(
queue_flow_ctl = QFC#queue_flow_ctl{
delivery_count = DeliveryCount,
credit = 0,
desired_credit = 0,
drain = false},
credit_req_in_flight = false
at_least_one_credit_req_in_flight = false
},
S = S0#state{outgoing_links = OutgoingLinks#{Handle := Link}},
case StashedCreditReq of
Expand Down Expand Up @@ -1553,19 +1567,17 @@ pop_credit_req(
LinkCreditSnd = amqp10_util:link_credit_snd(
DeliveryCountRcv, LinkCreditRcv, CDeliveryCount),
CappedCredit = cap_credit(LinkCreditSnd),
{ok, QStates, Actions} =
rabbit_queue_type:credit(
QName, Ctag, QDeliveryCount, CappedCredit, Drain, QStates0),
{ok, QStates, Actions} = rabbit_queue_type:credit(
QName, Ctag, QDeliveryCount,
CappedCredit, Drain, QStates0),
Link = Link0#outgoing_link{
client_flow_ctl = CFC#client_flow_ctl{
credit = LinkCreditSnd,
echo = Echo},
queue_flow_ctl = QFC#queue_flow_ctl{
credit = CappedCredit,
desired_credit = LinkCreditSnd,
drain = Drain
},
credit_req_in_flight = true,
drain = Drain},
at_least_one_credit_req_in_flight = true,
stashed_credit_req = none
},
S = S0#state{queue_states = QStates,
Expand Down Expand Up @@ -1685,19 +1697,20 @@ sent_pending_delivery(
credit_api_version = CreditApiVsn,
client_flow_ctl = CFC0,
queue_flow_ctl = QFC0,
credit_req_in_flight = CreditReqInFlight0
at_least_one_credit_req_in_flight = CreditReqInFlight0
} = Link0 = maps:get(Handle, OutgoingLinks0),

S = case CreditApiVsn of
1 ->
S0;
2 ->
#client_flow_ctl{
delivery_count = CDeliveryCount0,
credit = CCredit0
} = CFC0,
#queue_flow_ctl{
delivery_count = QDeliveryCount0,
credit = QCredit0,
desired_credit = DesiredCredit0
credit = QCredit0
} = QFC0,

CDeliveryCount = add(CDeliveryCount0, 1),
Expand All @@ -1715,17 +1728,16 @@ sent_pending_delivery(

QDeliveryCount = add(QDeliveryCount0, 1),
QCredit1 = max(0, QCredit0 - 1),
DesiredCredit = max(0, DesiredCredit0 - 1),

{QCredit, CreditReqInFlight, QStates, Actions} =
case QCredit1 =:= 0 andalso
DesiredCredit > 0 andalso
CCredit > 0 andalso
not CreditReqInFlight0 of
true ->
%% assertion
none = Link0#outgoing_link.stashed_credit_req,
%% Provide queue next batch of credits.
CappedCredit = cap_credit(DesiredCredit),
CappedCredit = cap_credit(CCredit),
{ok, QStates1, Actions0} =
rabbit_queue_type:credit(
QName, Ctag, QDeliveryCount, CappedCredit,
Expand All @@ -1740,17 +1752,15 @@ sent_pending_delivery(
credit = CCredit},
QFC = QFC0#queue_flow_ctl{
delivery_count = QDeliveryCount,
credit = QCredit,
desired_credit = DesiredCredit},
Link = Link0#outgoing_link{client_flow_ctl = CFC,
queue_flow_ctl = QFC,
credit_req_in_flight = CreditReqInFlight},
credit = QCredit},
Link = Link0#outgoing_link{
client_flow_ctl = CFC,
queue_flow_ctl = QFC,
at_least_one_credit_req_in_flight = CreditReqInFlight},
OutgoingLinks = OutgoingLinks0#{Handle := Link},
S1 = S0#state{outgoing_links = OutgoingLinks,
queue_states = QStates},
handle_queue_actions(Actions, S1);
1 ->
S0
handle_queue_actions(Actions, S1)
end,
record_outgoing_unsettled(Pending, S).

Expand Down Expand Up @@ -2677,7 +2687,7 @@ handle_outgoing_link_flow_control(
credit_api_version = CreditApiVsn,
client_flow_ctl = CFC,
queue_flow_ctl = QFC,
credit_req_in_flight = CreditReqInFlight
at_least_one_credit_req_in_flight = CreditReqInFlight
} = Link0,
#'v1_0.flow'{handle = ?UINT(HandleInt),
delivery_count = MaybeDeliveryCountRcv,
Expand All @@ -2695,26 +2705,26 @@ handle_outgoing_link_flow_control(
2 ->
case CreditReqInFlight of
false ->
DesiredCredit = amqp10_util:link_credit_snd(
LinkCreditSnd = amqp10_util:link_credit_snd(
DeliveryCountRcv,
LinkCreditRcv,
CFC#client_flow_ctl.delivery_count),
CappedCredit = cap_credit(DesiredCredit),
CappedCredit = cap_credit(LinkCreditSnd),
Link = Link0#outgoing_link{
credit_req_in_flight = true,
client_flow_ctl = CFC#client_flow_ctl{
credit = DesiredCredit,
credit = LinkCreditSnd,
echo = Echo},
queue_flow_ctl = QFC#queue_flow_ctl{
credit = CappedCredit,
desired_credit = DesiredCredit,
drain = Drain}},
drain = Drain},
at_least_one_credit_req_in_flight = true},
{ok, QStates, Actions} = rabbit_queue_type:credit(
QName, Ctag,
QFC#queue_flow_ctl.delivery_count,
CappedCredit, Drain, QStates0),
State = State0#state{queue_states = QStates,
outgoing_links = OutgoingLinks#{HandleInt := Link}},
State = State0#state{
queue_states = QStates,
outgoing_links = OutgoingLinks#{HandleInt := Link}},
handle_queue_actions(Actions, State);
true ->
%% A credit request is currently in-flight. Let's first process its reply
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_fifo_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ seq_applied({Seq, Response},
when Response /= not_enqueued ->
{[Corr | Corrs], Actions, State#state{pending = Pending}};
_ ->
{Corrs, Actions, State#state{}}
{Corrs, Actions, State}
end;
seq_applied(_Seq, Acc) ->
Acc.
Expand Down
Loading