Skip to content

Commit dbeaac4

Browse files
committed
CQ: Propagate delivery-count to DLQs
Also don't ignore DelFailed in modified outcome when undeliverable-here is true, so that delivery-count is properly incremented before being propagated to DLQ. Thanks @ansd for hand holding.
1 parent 9e4a275 commit dbeaac4

8 files changed

+74
-47
lines changed

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -909,6 +909,18 @@ requeue(AckTags, DelFailed, ChPid, State) ->
909909
subtract_acks(ChPid, AckTags, State,
910910
fun (State1) -> requeue_and_run(AckTags, DelFailed, false, State1) end).
911911

912+
discard(AckTags, DelFailed, ChPid, State) ->
913+
with_dlx(
914+
State#q.dlx,
915+
fun (X) -> subtract_acks(ChPid, AckTags, State,
916+
fun (State1) ->
917+
dead_letter_rejected_msgs(
918+
AckTags, DelFailed, X, State1)
919+
end) end,
920+
fun () -> rabbit_global_counters:messages_dead_lettered(rejected, rabbit_classic_queue,
921+
disabled, length(AckTags)),
922+
ack(AckTags, ChPid, State) end).
923+
912924
requeue_and_run(AckTags, ActiveConsumersChanged, State) ->
913925
requeue_and_run(AckTags, true, ActiveConsumersChanged, State).
914926

@@ -1083,11 +1095,11 @@ dead_letter_expired_msgs(ExpirePred, X, State = #q{backing_queue = BQ}) ->
10831095
BQ:fetchwhile(ExpirePred, DLFun, Acc, BQS1)
10841096
end, expired, X, State).
10851097

1086-
dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
1098+
dead_letter_rejected_msgs(AckTags, DelFailed, X, State = #q{backing_queue = BQ}) ->
10871099
{ok, State1} =
10881100
dead_letter_msgs(
10891101
fun (DLFun, Acc, BQS) ->
1090-
{Acc1, BQS1} = BQ:ackfold(DLFun, Acc, BQS, AckTags),
1102+
{Acc1, BQS1} = BQ:ackfold(DLFun, Acc, BQS, AckTags, DelFailed),
10911103
{ok, Acc1, BQS1}
10921104
end, rejected, X, State),
10931105
State1.
@@ -1544,25 +1556,16 @@ handle_cast({complete, AckTags, ChPid}, State) ->
15441556
noreply(ack(AckTags, ChPid, State));
15451557

15461558
handle_cast({requeue, AckTags, ChPid}, State) ->
1547-
noreply(requeue(AckTags, true, ChPid, State));
1559+
noreply(requeue(AckTags, false, ChPid, State));
15481560

15491561
handle_cast({discard, AckTags, ChPid}, State) ->
1550-
noreply(with_dlx(
1551-
State#q.dlx,
1552-
fun (X) -> subtract_acks(ChPid, AckTags, State,
1553-
fun (State1) ->
1554-
dead_letter_rejected_msgs(
1555-
AckTags, X, State1)
1556-
end) end,
1557-
fun () -> rabbit_global_counters:messages_dead_lettered(rejected, rabbit_classic_queue,
1558-
disabled, length(AckTags)),
1559-
ack(AckTags, ChPid, State) end));
1562+
noreply(discard(AckTags, true, ChPid, State));
15601563

15611564
handle_cast({modify, AckTags, DelFailed, false, _Anns, ChPid}, State) ->
15621565
noreply(requeue(AckTags, DelFailed, ChPid, State));
15631566

1564-
handle_cast({modify, AckTags, _DelFailed, true, _Anns, ChPid}, State) ->
1565-
handle_cast({discard, AckTags, ChPid}, State);
1567+
handle_cast({modify, AckTags, DelFailed, true, _Anns, ChPid}, State) ->
1568+
noreply(discard(AckTags, DelFailed, ChPid, State));
15661569

15671570
handle_cast({delete_exclusive, ConnPid}, State) ->
15681571
log_delete_exclusive(ConnPid, State),

deps/rabbit/src/rabbit_backing_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@
167167

168168
%% Fold over messages by ack tag. The supplied function is called with
169169
%% each message, its ack tag, and an accumulator.
170-
-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}.
170+
-callback ackfold(msg_fun(A), A, state(), [ack()], boolean()) -> {A, state()}.
171171

172172
%% How long is my queue?
173173
-callback len(state()) -> non_neg_integer().

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,8 @@ supports_stateful_delivery() -> true.
445445
deliver(Qs0, Msg0, Options) ->
446446
%% add guid to content here instead of in rabbit_basic:message/3,
447447
%% as classic queues are the only ones that need it
448+
%% @todo Do we need to regenerate it for every time it gets dead lettered?
449+
%% We can likely do better and avoid rewriting to the shared message store.
448450
Msg = mc:prepare(store, mc:set_annotation(id, rabbit_guid:gen(), Msg0)),
449451
Mandatory = maps:get(mandatory, Options, false),
450452
MsgSeqNo = maps:get(correlation, Options, undefined),

deps/rabbit/src/rabbit_priority_queue.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
purge/1, purge_acks/1,
3030
publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
3131
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/3,
32-
ackfold/4, len/1, is_empty/1, depth/1,
32+
ackfold/5, len/1, is_empty/1, depth/1,
3333
update_rates/1, needs_timeout/1, timeout/1,
3434
handle_pre_hibernate/1, resume/1, msg_rates/1,
3535
info/2, invoke/3, is_duplicate/2,
@@ -287,19 +287,19 @@ requeue(AckTags, DelFailed, State = #passthrough{bq = BQ, bqs = BQS}) ->
287287
?passthrough2(requeue(AckTags, DelFailed, BQS)).
288288

289289
%% Similar problem to fetchwhile/4
290-
ackfold(MsgFun, Acc, State = #state{bq = BQ}, AckTags) ->
290+
ackfold(MsgFun, Acc, State = #state{bq = BQ}, AckTags, DelFailed) ->
291291
AckTagsByPriority = partition_acktags(AckTags),
292292
fold2(
293293
fun (P, BQSN, AccN) ->
294294
case maps:find(P, AckTagsByPriority) of
295295
{ok, ATagsN} -> {AccN1, BQSN1} =
296-
BQ:ackfold(MsgFun, AccN, BQSN, ATagsN),
296+
BQ:ackfold(MsgFun, AccN, BQSN, ATagsN, DelFailed),
297297
{priority_on_acktags(P, AccN1), BQSN1};
298298
error -> {AccN, BQSN}
299299
end
300300
end, Acc, State);
301-
ackfold(MsgFun, Acc, State = #passthrough{bq = BQ, bqs = BQS}, AckTags) ->
302-
?passthrough2(ackfold(MsgFun, Acc, BQS, AckTags)).
301+
ackfold(MsgFun, Acc, State = #passthrough{bq = BQ, bqs = BQS}, AckTags, DelFailed) ->
302+
?passthrough2(ackfold(MsgFun, Acc, BQS, AckTags, DelFailed)).
303303

304304
len(#state{bq = BQ, bqss = BQSs}) ->
305305
add0(fun (_P, BQSN) -> BQ:len(BQSN) end, BQSs);

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
publish/5, publish_delivered/4,
1313
discard/3, drain_confirmed/1,
1414
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/3,
15-
ackfold/4, len/1, is_empty/1, depth/1,
15+
ackfold/5, len/1, is_empty/1, depth/1,
1616
update_rates/1, needs_timeout/1, timeout/1,
1717
handle_pre_hibernate/1, resume/1, msg_rates/1,
1818
info/2, invoke/3, is_duplicate/2,
@@ -562,12 +562,12 @@ fetch(AckRequired, State) ->
562562
%% it is possible that the message wasn't read from disk
563563
%% at this point, so read it in.
564564
{Msg0, State2} = read_msg(MsgStatus, State1),
565-
Msg = add_delivery_count(MsgStatus#msg_status.seq_id, Msg0, State2),
565+
Msg = annotate_delivery_count(MsgStatus#msg_status.seq_id, Msg0, State2),
566566
{AckTag, State3} = remove(AckRequired, MsgStatus, State2),
567567
{{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)}
568568
end.
569569

570-
add_delivery_count(SeqId, Msg, #vqstate{
570+
annotate_delivery_count(SeqId, Msg, #vqstate{
571571
redeliver_seq_id=RedeliverSeqId,
572572
delivery_count=DeliveryCount}) ->
573573
case DeliveryCount of
@@ -589,6 +589,7 @@ drop(AckRequired, State) ->
589589
{empty, a(State1)};
590590
{{value, MsgStatus}, State1} ->
591591
{AckTag, State2} = remove(AckRequired, MsgStatus, State1),
592+
%% @todo I think the first element is not needed, only need to return state.
592593
{{MsgStatus#msg_status.msg_id, AckTag}, a(State2)}
593594
end.
594595

@@ -647,12 +648,17 @@ requeue(AckTags, DelFailed, #vqstate { q_head = QHead0,
647648
in_counter = InCounter + MsgCount
648649
}))}.
649650

650-
ackfold(MsgFun, Acc, State, AckTags) ->
651+
%% This function is called when messages get discarded (rejected AMQP 1.0 outcome)
652+
%% and delivered to a dead letter queue. We must therefore increase the delivery_count
653+
%% for these messages the same as if they were requeued.
654+
ackfold(MsgFun, Acc, State, AckTags, DelFailed) ->
651655
{AccN, StateN} =
652656
lists:foldl(fun(SeqId, {Acc0, State0}) ->
653657
MsgStatus = lookup_pending_ack(SeqId, State0),
654-
{Msg, State1} = read_msg(MsgStatus, State0),
655-
{MsgFun(Msg, SeqId, Acc0), State1}
658+
{Msg0, State1} = read_msg(MsgStatus, State0),
659+
State2 = maybe_inc_delivery_count(SeqId, DelFailed, State1),
660+
Msg = annotate_delivery_count(MsgStatus#msg_status.seq_id, Msg0, State2),
661+
{MsgFun(Msg, SeqId, Acc0), State2}
656662
end, {Acc, State}, AckTags),
657663
{AccN, a(StateN)}.
658664

@@ -1391,7 +1397,8 @@ process_queue_entries1(
13911397
#msg_status { seq_id = SeqId } = MsgStatus,
13921398
Fun,
13931399
{FetchAcc, State}) ->
1394-
{Msg, State1} = read_msg(MsgStatus, State),
1400+
{Msg0, State1} = read_msg(MsgStatus, State),
1401+
Msg = annotate_delivery_count(MsgStatus#msg_status.seq_id, Msg0, State1),
13951402
State2 = record_pending_ack(
13961403
MsgStatus #msg_status {
13971404
is_delivered = true }, State1),
@@ -1511,6 +1518,7 @@ publish1(Msg,
15111518
q_tail = QTail = #q_tail { count = QTailCount },
15121519
qi_embed_msgs_below = IndexMaxSize,
15131520
next_seq_id = SeqId,
1521+
delivery_count = DeliveryCount0,
15141522
in_counter = InCount,
15151523
durable = IsDurable,
15161524
unconfirmed = UC,
@@ -1538,7 +1546,9 @@ publish1(Msg,
15381546
end,
15391547
{UC1, UCS1} = maybe_needs_confirming(NeedsConfirming, persist_to(MsgStatus),
15401548
MsgId, UC, UCS),
1549+
DeliveryCount = update_delivery_count(SeqId, Msg, DeliveryCount0),
15411550
State3#vqstate{ next_seq_id = SeqId + 1,
1551+
delivery_count = DeliveryCount,
15421552
in_counter = InCount + 1,
15431553
unconfirmed = UC1,
15441554
unconfirmed_simple = UCS1 }.
@@ -1549,6 +1559,7 @@ publish_delivered1(Msg,
15491559
_ChPid, PersistFun,
15501560
State = #vqstate { qi_embed_msgs_below = IndexMaxSize,
15511561
next_seq_id = SeqId,
1562+
delivery_count = DeliveryCount0,
15521563
in_counter = InCount,
15531564
out_counter = OutCount,
15541565
durable = IsDurable,
@@ -1562,9 +1573,11 @@ publish_delivered1(Msg,
15621573
State2 = record_pending_ack(m(MsgStatus1), State1),
15631574
{UC1, UCS1} = maybe_needs_confirming(NeedsConfirming, persist_to(MsgStatus),
15641575
MsgId, UC, UCS),
1576+
DeliveryCount = update_delivery_count(SeqId, Msg, DeliveryCount0),
15651577
{SeqId,
15661578
stats_published_pending_acks(MsgStatus1,
15671579
State2#vqstate{ next_seq_id = SeqId + 1,
1580+
delivery_count = DeliveryCount,
15681581
out_counter = OutCount + 1,
15691582
in_counter = InCount + 1,
15701583
unconfirmed = UC1,
@@ -1580,6 +1593,12 @@ maybe_needs_confirming(true, queue_store, MsgId, UC, UCS) ->
15801593
maybe_needs_confirming(true, _, MsgId, UC, UCS) ->
15811594
{sets:add_element(MsgId, UC), UCS}.
15821595

1596+
update_delivery_count(SeqId, Msg, DeliveryCount) ->
1597+
case mc:get_annotation(delivery_count, Msg) of
1598+
undefined -> DeliveryCount;
1599+
Count -> DeliveryCount#{SeqId => Count}
1600+
end.
1601+
15831602
maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
15841603
seq_id = SeqId,
15851604
msg = Msg, msg_id = MsgId,
@@ -1827,26 +1846,29 @@ requeue_merge([SeqId | Rest] = SeqIds, DelFailed, Q, Front, MsgIds, State) ->
18271846
case msg_from_pending_ack(SeqId, State) of
18281847
{none, _} ->
18291848
requeue_merge(Rest, DelFailed, Q, Front, MsgIds, State);
1830-
{#msg_status { msg_id = MsgId } = MsgStatus, State1=#vqstate{redeliver_seq_id=RedeliverSeqId, delivery_count=DeliveryCount0}} ->
1849+
{#msg_status { msg_id = MsgId } = MsgStatus, State1} ->
18311850
%% Increment delivery_count.
1832-
DeliveryCount = case DelFailed of
1833-
true -> maps:update_with(SeqId,
1834-
fun(V) -> V + 1 end,
1835-
%% Message was possibly delivered at least once.
1836-
case SeqId < RedeliverSeqId of
1837-
true -> 2;
1838-
false -> 1
1839-
end,
1840-
DeliveryCount0);
1841-
false -> DeliveryCount0
1842-
end,
1843-
State2 = stats_requeued_memory(MsgStatus, State1#vqstate{delivery_count=DeliveryCount}),
1844-
requeue_merge(Rest, DelFailed, Q, ?QUEUE:in(MsgStatus, Front), [MsgId | MsgIds], State2)
1851+
State2 = maybe_inc_delivery_count(SeqId, DelFailed, State1),
1852+
State3 = stats_requeued_memory(MsgStatus, State2),
1853+
requeue_merge(Rest, DelFailed, Q, ?QUEUE:in(MsgStatus, Front), [MsgId | MsgIds], State3)
18451854
end
18461855
end;
18471856
requeue_merge([], _, Q, Front, MsgIds, State) ->
18481857
{?QUEUE:join(Front, Q), MsgIds, State}.
18491858

1859+
maybe_inc_delivery_count(_, false, State) ->
1860+
State;
1861+
maybe_inc_delivery_count(SeqId, true, State=#vqstate{redeliver_seq_id=RedeliverSeqId, delivery_count=DeliveryCount0}) ->
1862+
DeliveryCount = maps:update_with(SeqId,
1863+
fun(V) -> V + 1 end,
1864+
%% Message was possibly delivered at least once.
1865+
case SeqId < RedeliverSeqId of
1866+
true -> 2;
1867+
false -> 1
1868+
end,
1869+
DeliveryCount0),
1870+
State#vqstate{delivery_count=DeliveryCount}.
1871+
18501872
%% Mostly opposite of record_pending_ack/2
18511873
msg_from_pending_ack(SeqId, State) ->
18521874
case remove_pending_ack(false, SeqId, State) of

deps/rabbit/test/backing_queue_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1274,7 +1274,7 @@ variable_queue_fold_msg_on_disk2(VQ0, _QName) ->
12741274
VQ1 = variable_queue_publish(true, 1, VQ0),
12751275
{VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1),
12761276
{ok, VQ3} = rabbit_variable_queue:ackfold(fun (_M, _A, ok) -> ok end,
1277-
ok, VQ2, AckTags),
1277+
ok, VQ2, AckTags, true),
12781278
VQ3.
12791279

12801280
variable_queue_dropfetchwhile(Config) ->

deps/rabbit/test/channel_operation_timeout_test_queue.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
publish/5, publish_delivered/4,
1313
discard/3, drain_confirmed/1,
1414
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/3,
15-
ackfold/4, len/1, is_empty/1, depth/1,
15+
ackfold/5, len/1, is_empty/1, depth/1,
1616
update_rates/1, needs_timeout/1, timeout/1,
1717
handle_pre_hibernate/1, resume/1, msg_rates/1,
1818
info/2, invoke/3, is_duplicate/2,
@@ -108,8 +108,8 @@ requeue(AckTags, DelFailed, State) ->
108108
maybe_delay(QPA),
109109
rabbit_variable_queue:requeue(AckTags, DelFailed, State).
110110

111-
ackfold(MsgFun, Acc, State, AckTags) ->
112-
rabbit_variable_queue:ackfold(MsgFun, Acc, State, AckTags).
111+
ackfold(MsgFun, Acc, State, AckTags, DelFailed) ->
112+
rabbit_variable_queue:ackfold(MsgFun, Acc, State, AckTags, DelFailed).
113113

114114
len(State) ->
115115
QPA = ram_pending_acks(State),

deps/rabbit/test/priority_queue_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ end_per_testcase(Testcase, Config) ->
111111
%% * discard/3 - publish without acks straight through
112112
%% * dropwhile/2 - expire messages without DLX
113113
%% * fetchwhile/4 - expire messages with DLX
114-
%% * ackfold/4 - reject messages with DLX
114+
%% * ackfold/5 - reject messages with DLX
115115
%% * requeue/2 - reject messages without DLX
116116
%% * drop/2 - maxlen messages without DLX
117117
%% * purge/1 - issue AMQP queue.purge

0 commit comments

Comments
 (0)