Skip to content

Commit 9c56475

Browse files
Merge pull request #15020 from rabbitmq/loic-cq-delivery-count-really
4.3: Implement AMQP-1.0 delivery-count for CQs
2 parents 90fddc1 + 960cd60 commit 9c56475

10 files changed

+375
-161
lines changed

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -905,16 +905,32 @@ ack(AckTags, ChPid, State) ->
905905
State1#q{backing_queue_state = BQS1}
906906
end).
907907

908-
requeue(AckTags, ChPid, State) ->
908+
requeue(AckTags, DelFailed, ChPid, State) ->
909909
subtract_acks(ChPid, AckTags, State,
910-
fun (State1) -> requeue_and_run(AckTags, false, State1) end).
910+
fun (State1) -> requeue_and_run(AckTags, DelFailed, false, State1) end).
911+
912+
discard(AckTags, DelFailed, ChPid, State) ->
913+
with_dlx(
914+
State#q.dlx,
915+
fun (X) -> subtract_acks(ChPid, AckTags, State,
916+
fun (State1) ->
917+
dead_letter_rejected_msgs(
918+
AckTags, DelFailed, X, State1)
919+
end) end,
920+
fun () -> rabbit_global_counters:messages_dead_lettered(rejected, rabbit_classic_queue,
921+
disabled, length(AckTags)),
922+
ack(AckTags, ChPid, State) end).
923+
924+
requeue_and_run(AckTags, ActiveConsumersChanged, State) ->
925+
requeue_and_run(AckTags, true, ActiveConsumersChanged, State).
911926

912927
requeue_and_run(AckTags,
928+
DelFailed,
913929
ActiveConsumersChanged,
914930
#q{backing_queue = BQ,
915931
backing_queue_state = BQS0} = State0) ->
916932
WasEmpty = BQ:is_empty(BQS0),
917-
{_MsgIds, BQS} = BQ:requeue(AckTags, BQS0),
933+
{_MsgIds, BQS} = BQ:requeue(AckTags, DelFailed, BQS0),
918934
State1 = State0#q{backing_queue_state = BQS},
919935
{_Dropped, State2} = maybe_drop_head(State1),
920936
State3 = drop_expired_msgs(State2),
@@ -1079,11 +1095,11 @@ dead_letter_expired_msgs(ExpirePred, X, State = #q{backing_queue = BQ}) ->
10791095
BQ:fetchwhile(ExpirePred, DLFun, Acc, BQS1)
10801096
end, expired, X, State).
10811097

1082-
dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
1098+
dead_letter_rejected_msgs(AckTags, DelFailed, X, State = #q{backing_queue = BQ}) ->
10831099
{ok, State1} =
10841100
dead_letter_msgs(
10851101
fun (DLFun, Acc, BQS) ->
1086-
{Acc1, BQS1} = BQ:ackfold(DLFun, Acc, BQS, AckTags),
1102+
{Acc1, BQS1} = BQ:ackfold(DLFun, Acc, BQS, AckTags, DelFailed),
10871103
{ok, Acc1, BQS1}
10881104
end, rejected, X, State),
10891105
State1.
@@ -1258,7 +1274,8 @@ prioritise_cast(Msg, _Len, State) ->
12581274
delete_immediately -> 8;
12591275
{delete_exclusive, _Pid} -> 8;
12601276
{run_backing_queue, _Mod, _Fun} -> 6;
1261-
{ack, _AckTags, _ChPid} -> 4; %% [1]
1277+
{ack, _AckTags, _ChPid} -> 4; %% [1] %% @todo Remove when 'rabbitmq_4.3.0' FF is required.
1278+
{complete, _AckTags, _ChPid} -> 4; %% [1]
12621279
{resume, _ChPid} -> 3;
12631280
{notify_sent, _ChPid, _Credit} -> consumer_bias(State, 0, 2);
12641281
_ -> 0
@@ -1527,23 +1544,28 @@ handle_cast({deliver,
15271544
State1 = State#q{senders = Senders1},
15281545
noreply(maybe_deliver_or_enqueue(Delivery, Delivered, State1));
15291546

1547+
%% Compat for RabbitMQ 4.2. @todo Remove when 'rabbitmq_4.3.0' FF is required.
15301548
handle_cast({ack, AckTags, ChPid}, State) ->
1549+
handle_cast({complete, AckTags, ChPid}, State);
1550+
handle_cast({reject, true, AckTags, ChPid}, State) ->
1551+
handle_cast({requeue, AckTags, ChPid}, State);
1552+
handle_cast({reject, false, AckTags, ChPid}, State) ->
1553+
handle_cast({discard, AckTags, ChPid}, State);
1554+
1555+
handle_cast({complete, AckTags, ChPid}, State) ->
15311556
noreply(ack(AckTags, ChPid, State));
15321557

1533-
handle_cast({reject, true, AckTags, ChPid}, State) ->
1534-
noreply(requeue(AckTags, ChPid, State));
1558+
handle_cast({requeue, AckTags, ChPid}, State) ->
1559+
noreply(requeue(AckTags, false, ChPid, State));
15351560

1536-
handle_cast({reject, false, AckTags, ChPid}, State) ->
1537-
noreply(with_dlx(
1538-
State#q.dlx,
1539-
fun (X) -> subtract_acks(ChPid, AckTags, State,
1540-
fun (State1) ->
1541-
dead_letter_rejected_msgs(
1542-
AckTags, X, State1)
1543-
end) end,
1544-
fun () -> rabbit_global_counters:messages_dead_lettered(rejected, rabbit_classic_queue,
1545-
disabled, length(AckTags)),
1546-
ack(AckTags, ChPid, State) end));
1561+
handle_cast({discard, AckTags, ChPid}, State) ->
1562+
noreply(discard(AckTags, true, ChPid, State));
1563+
1564+
handle_cast({modify, AckTags, DelFailed, false, _Anns, ChPid}, State) ->
1565+
noreply(requeue(AckTags, DelFailed, ChPid, State));
1566+
1567+
handle_cast({modify, AckTags, DelFailed, true, _Anns, ChPid}, State) ->
1568+
noreply(discard(AckTags, DelFailed, ChPid, State));
15471569

15481570
handle_cast({delete_exclusive, ConnPid}, State) ->
15491571
log_delete_exclusive(ConnPid, State),

deps/rabbit/src/rabbit_backing_queue.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,11 +163,11 @@
163163

164164
%% Reinsert messages into the queue which have already been delivered
165165
%% and were pending acknowledgement.
166-
-callback requeue([ack()], state()) -> {msg_ids(), state()}.
166+
-callback requeue([ack()], boolean(), state()) -> {msg_ids(), state()}.
167167

168168
%% Fold over messages by ack tag. The supplied function is called with
169169
%% each message, its ack tag, and an accumulator.
170-
-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}.
170+
-callback ackfold(msg_fun(A), A, state(), [ack()], boolean()) -> {A, state()}.
171171

172172
%% How long is my queue?
173173
-callback len(state()) -> non_neg_integer().

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -333,16 +333,31 @@ cancel(Q, Spec, State) ->
333333
-spec settle(rabbit_amqqueue:name(), rabbit_queue_type:settle_op(),
334334
rabbit_types:ctag(), [non_neg_integer()], state()) ->
335335
{state(), rabbit_queue_type:actions()}.
336-
settle(QName, {modify, _DelFailed, Undel, _Anns}, CTag, MsgIds, State) ->
336+
settle(QName, Op, CTag, MsgIds, State) ->
337+
case rabbit_feature_flags:is_enabled('rabbitmq_4.3.0') of
338+
true -> settle_43(QName, Op, CTag, MsgIds, State);
339+
false -> settle_compat(QName, Op, CTag, MsgIds, State)
340+
end.
341+
342+
settle_43(_QName, {modify, DelFailed, Undel, Anns}, _CTag, MsgIds, State = #?STATE{pid = Pid}) ->
343+
Arg = {modify, MsgIds, DelFailed, Undel, Anns, self()},
344+
delegate:invoke_no_result(Pid, {gen_server2, cast, [Arg]}),
345+
{State, []};
346+
settle_43(_QName, Op, _CTag, MsgIds, State = #?STATE{pid = Pid}) ->
347+
Arg = {Op, MsgIds, self()},
348+
delegate:invoke_no_result(Pid, {gen_server2, cast, [Arg]}),
349+
{State, []}.
350+
351+
settle_compat(QName, {modify, _DelFailed, Undel, _Anns}, CTag, MsgIds, State) ->
337352
%% translate modify into other op
338353
Op = case Undel of
339354
true ->
340355
discard;
341356
false ->
342357
requeue
343358
end,
344-
settle(QName, Op, CTag, MsgIds, State);
345-
settle(_QName, Op, _CTag, MsgIds, State = #?STATE{pid = Pid}) ->
359+
settle_compat(QName, Op, CTag, MsgIds, State);
360+
settle_compat(_QName, Op, _CTag, MsgIds, State = #?STATE{pid = Pid}) ->
346361
Arg = case Op of
347362
complete ->
348363
{ack, MsgIds, self()};
@@ -430,6 +445,8 @@ supports_stateful_delivery() -> true.
430445
deliver(Qs0, Msg0, Options) ->
431446
%% add guid to content here instead of in rabbit_basic:message/3,
432447
%% as classic queues are the only ones that need it
448+
%% @todo Do we need to regenerate it for every time it gets dead lettered?
449+
%% We can likely do better and avoid rewriting to the shared message store.
433450
Msg = mc:prepare(store, mc:set_annotation(id, rabbit_guid:gen(), Msg0)),
434451
Mandatory = maps:get(mandatory, Options, false),
435452
MsgSeqNo = maps:get(correlation, Options, undefined),

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,3 +218,10 @@
218218
depends_on => ['rabbitmq_4.1.0'],
219219
callbacks => #{enable => {rabbit_khepri, enable_feature_flag}}
220220
}}).
221+
222+
-rabbit_feature_flag(
223+
{'rabbitmq_4.3.0',
224+
#{desc => "Allows rolling upgrades to 4.3.x",
225+
stability => stable,
226+
depends_on => ['rabbitmq_4.2.0']
227+
}}).

deps/rabbit/src/rabbit_priority_queue.erl

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828
-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
2929
purge/1, purge_acks/1,
3030
publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
31-
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
32-
ackfold/4, len/1, is_empty/1, depth/1,
31+
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/3,
32+
ackfold/5, len/1, is_empty/1, depth/1,
3333
update_rates/1, needs_timeout/1, timeout/1,
3434
handle_pre_hibernate/1, resume/1, msg_rates/1,
3535
info/2, invoke/3, is_duplicate/2,
@@ -279,27 +279,27 @@ ack(AckTags, State = #state{bq = BQ}) ->
279279
ack(AckTags, State = #passthrough{bq = BQ, bqs = BQS}) ->
280280
?passthrough2(ack(AckTags, BQS)).
281281

282-
requeue(AckTags, State = #state{bq = BQ}) ->
282+
requeue(AckTags, DelFailed, State = #state{bq = BQ}) ->
283283
fold_by_acktags2(fun (AckTagsN, BQSN) ->
284-
BQ:requeue(AckTagsN, BQSN)
284+
BQ:requeue(AckTagsN, DelFailed, BQSN)
285285
end, AckTags, State);
286-
requeue(AckTags, State = #passthrough{bq = BQ, bqs = BQS}) ->
287-
?passthrough2(requeue(AckTags, BQS)).
286+
requeue(AckTags, DelFailed, State = #passthrough{bq = BQ, bqs = BQS}) ->
287+
?passthrough2(requeue(AckTags, DelFailed, BQS)).
288288

289289
%% Similar problem to fetchwhile/4
290-
ackfold(MsgFun, Acc, State = #state{bq = BQ}, AckTags) ->
290+
ackfold(MsgFun, Acc, State = #state{bq = BQ}, AckTags, DelFailed) ->
291291
AckTagsByPriority = partition_acktags(AckTags),
292292
fold2(
293293
fun (P, BQSN, AccN) ->
294294
case maps:find(P, AckTagsByPriority) of
295295
{ok, ATagsN} -> {AccN1, BQSN1} =
296-
BQ:ackfold(MsgFun, AccN, BQSN, ATagsN),
296+
BQ:ackfold(MsgFun, AccN, BQSN, ATagsN, DelFailed),
297297
{priority_on_acktags(P, AccN1), BQSN1};
298298
error -> {AccN, BQSN}
299299
end
300300
end, Acc, State);
301-
ackfold(MsgFun, Acc, State = #passthrough{bq = BQ, bqs = BQS}, AckTags) ->
302-
?passthrough2(ackfold(MsgFun, Acc, BQS, AckTags)).
301+
ackfold(MsgFun, Acc, State = #passthrough{bq = BQ, bqs = BQS}, AckTags, DelFailed) ->
302+
?passthrough2(ackfold(MsgFun, Acc, BQS, AckTags, DelFailed)).
303303

304304
len(#state{bq = BQ, bqss = BQSs}) ->
305305
add0(fun (_P, BQSN) -> BQ:len(BQSN) end, BQSs);

0 commit comments

Comments
 (0)