Skip to content

Commit bb541e5

Browse files
Merge pull request #4307 from rabbitmq/mk-rabbit-channel-naming-borrowed-from-rabbitmq-server-4004-and-4305
Naming from #4305
2 parents 5cb8179 + a43d989 commit bb541e5

File tree

1 file changed

+42
-29
lines changed

1 file changed

+42
-29
lines changed

deps/rabbit/src/rabbit_channel.erl

Lines changed: 42 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,18 @@
120120
writer_gc_threshold
121121
}).
122122

123-
-record(pending_ack, {delivery_tag,
123+
-record(pending_ack, {
124+
%% delivery identifier used by clients
125+
%% to acknowledge and reject deliveries
126+
delivery_tag,
127+
%% consumer tag
124128
tag,
125129
delivered_at,
126-
queue, %% queue name
127-
msg_id}).
130+
%% queue name
131+
queue,
132+
%% message ID used by queue and message store implementations
133+
msg_id
134+
}).
128135

129136
-record(ch, {cfg :: #conf{},
130137
%% limiter state, see rabbit_limiter
@@ -1342,7 +1349,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
13421349
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
13431350
State1 = State#ch{unacked_message_q = Remaining},
13441351
{noreply, case Tx of
1345-
none -> {State2, Actions} = ack(Acked, State1),
1352+
none -> {State2, Actions} = settle_acks(Acked, State1),
13461353
handle_queue_actions(Actions, State2);
13471354
{Msgs, Acks} -> Acks1 = ack_cons(ack, Acked, Acks),
13481355
State1#ch{tx = {Msgs, Acks1}}
@@ -1722,7 +1729,7 @@ handle_method(#'tx.commit'{}, _, State = #ch{tx = {Deliveries, Acks},
17221729
Rev = fun (X) -> lists:reverse(lists:sort(X)) end,
17231730
{State2, Actions2} =
17241731
lists:foldl(fun ({ack, A}, {Acc, Actions}) ->
1725-
{Acc0, Actions0} = ack(Rev(A), Acc),
1732+
{Acc0, Actions0} = settle_acks(Rev(A), Acc),
17261733
{Acc0, Actions ++ Actions0};
17271734
({Requeue, A}, {Acc, Actions}) ->
17281735
{Acc0, Actions0} = internal_reject(Requeue, Rev(A), Limiter, Acc),
@@ -2029,37 +2036,43 @@ record_sent(Type, QueueType, Tag, AckRequired,
20292036
end,
20302037
State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
20312038

2032-
%% NB: returns acks in youngest-first order
2033-
collect_acks(Q, 0, true) ->
2034-
{lists:reverse(?QUEUE:to_list(Q)), ?QUEUE:new()};
2035-
collect_acks(Q, DeliveryTag, Multiple) ->
2036-
collect_acks([], [], Q, DeliveryTag, Multiple).
2037-
2038-
collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
2039-
case ?QUEUE:out(Q) of
2040-
{{value, UnackedMsg = #pending_ack{delivery_tag = CurrentDeliveryTag}},
2041-
QTail} ->
2042-
if CurrentDeliveryTag == DeliveryTag ->
2043-
{[UnackedMsg | ToAcc],
2044-
case PrefixAcc of
2045-
[] -> QTail;
2039+
%% Records a client-sent acknowledgement. Handles both single delivery acks
2040+
%% and multi-acks.
2041+
%%
2042+
%% Returns a triple of acknowledged pending acks, remaining pending acks,
2043+
%% and outdated pending acks (if any).
2044+
%% Sorts each group in the youngest-first order (ascending by delivery tag).
2045+
collect_acks(UAMQ, 0, true) ->
2046+
{lists:reverse(?QUEUE:to_list(UAMQ)), ?QUEUE:new()};
2047+
collect_acks(UAMQ, DeliveryTag, Multiple) ->
2048+
collect_acks([], [], UAMQ, DeliveryTag, Multiple).
2049+
2050+
collect_acks(AcknowledgedAcc, RemainingAcc, UAMQ, DeliveryTag, Multiple) ->
2051+
case ?QUEUE:out(UAMQ) of
2052+
{{value, UnackedMsg = #pending_ack{delivery_tag = CurrentDT}},
2053+
UAMQTail} ->
2054+
if CurrentDT == DeliveryTag ->
2055+
{[UnackedMsg | AcknowledgedAcc],
2056+
case RemainingAcc of
2057+
[] -> UAMQTail;
20462058
_ -> ?QUEUE:join(
2047-
?QUEUE:from_list(lists:reverse(PrefixAcc)),
2048-
QTail)
2059+
?QUEUE:from_list(lists:reverse(RemainingAcc)),
2060+
UAMQTail)
20492061
end};
20502062
Multiple ->
2051-
collect_acks([UnackedMsg | ToAcc], PrefixAcc,
2052-
QTail, DeliveryTag, Multiple);
2063+
collect_acks([UnackedMsg | AcknowledgedAcc], RemainingAcc,
2064+
UAMQTail, DeliveryTag, Multiple);
20532065
true ->
2054-
collect_acks(ToAcc, [UnackedMsg | PrefixAcc],
2055-
QTail, DeliveryTag, Multiple)
2066+
collect_acks(AcknowledgedAcc, [UnackedMsg | RemainingAcc],
2067+
UAMQTail, DeliveryTag, Multiple)
20562068
end;
20572069
{empty, _} ->
20582070
precondition_failed("unknown delivery tag ~w", [DeliveryTag])
20592071
end.
20602072

2061-
%% NB: Acked is in youngest-first order
2062-
ack(Acked, State = #ch{queue_states = QueueStates0}) ->
2073+
%% Settles (acknowledges) messages at the queue replica process level.
2074+
%% This happens in the youngest-first order (ascending by delivery tag).
2075+
settle_acks(Acks, State = #ch{queue_states = QueueStates0}) ->
20632076
{QueueStates, Actions} =
20642077
foreach_per_queue(
20652078
fun ({QRef, CTag}, MsgIds, {Acc0, ActionsAcc0}) ->
@@ -2071,8 +2084,8 @@ ack(Acked, State = #ch{queue_states = QueueStates0}) ->
20712084
{protocol_error, ErrorType, Reason, ReasonArgs} ->
20722085
rabbit_misc:protocol_error(ErrorType, Reason, ReasonArgs)
20732086
end
2074-
end, Acked, {QueueStates0, []}),
2075-
ok = notify_limiter(State#ch.limiter, Acked),
2087+
end, Acks, {QueueStates0, []}),
2088+
ok = notify_limiter(State#ch.limiter, Acks),
20762089
{State#ch{queue_states = QueueStates}, Actions}.
20772090

20782091
incr_queue_stats(QName, MsgIds, State = #ch{queue_states = QueueStates}) ->

0 commit comments

Comments
 (0)