Skip to content

Commit 117652b

Browse files
ansdmergify[bot]
authored andcommitted
Fix quorum queue drop-head dead letter order
Prior to this commit, quorum queue at-most-once dead lettering for the overflow behaviour `drop-head` was dead lettering in the wrong order. (cherry picked from commit 960918e)
1 parent 1eff3e4 commit 117652b

File tree

2 files changed

+61
-14
lines changed

2 files changed

+61
-14
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1639,30 +1639,32 @@ drop_head(#?STATE{ra_indexes = Indexes0} = State0, Effects) ->
16391639
#?STATE{cfg = #cfg{dead_letter_handler = DLH},
16401640
dlx = DlxState} = State = State3,
16411641
{_, DlxEffects} = rabbit_fifo_dlx:discard([Msg], maxlen, DLH, DlxState),
1642-
{State, combine_effects(DlxEffects, Effects)};
1642+
{State, add_drop_head_effects(DlxEffects, Effects)};
16431643
empty ->
16441644
{State0, Effects}
16451645
end.
16461646

1647-
%% combine global counter update effects to avoid bulding a huge list of
1648-
%% effects if many messages are dropped at the same time as could happen
1649-
%% when the `max_length' is changed via a configuration update.
1650-
combine_effects([{mod_call,
1651-
rabbit_global_counters,
1652-
messages_dead_lettered,
1653-
[Reason, rabbit_quorum_queue, Type, NewLen]}],
1654-
[{mod_call,
1655-
rabbit_global_counters,
1656-
messages_dead_lettered,
1657-
[Reason, rabbit_quorum_queue, Type, PrevLen]} | Rem]) ->
1647+
add_drop_head_effects([{mod_call,
1648+
rabbit_global_counters,
1649+
messages_dead_lettered,
1650+
[Reason, rabbit_quorum_queue, Type, NewLen]}],
1651+
[{mod_call,
1652+
rabbit_global_counters,
1653+
messages_dead_lettered,
1654+
[Reason, rabbit_quorum_queue, Type, PrevLen]} | Rem]) ->
1655+
%% combine global counter update effects to avoid bulding a huge list of
1656+
%% effects if many messages are dropped at the same time as could happen
1657+
%% when the `max_length' is changed via a configuration update.
16581658
[{mod_call,
16591659
rabbit_global_counters,
16601660
messages_dead_lettered,
16611661
[Reason, rabbit_quorum_queue, Type, PrevLen + NewLen]} | Rem];
1662-
combine_effects(New, Old) ->
1662+
add_drop_head_effects([{log, _, _}] = DlxEffs, Effs) ->
1663+
%% dead letter in the correct order
1664+
Effs ++ DlxEffs;
1665+
add_drop_head_effects(New, Old) ->
16631666
New ++ Old.
16641667

1665-
16661668
maybe_set_msg_ttl(Msg, RaCmdTs, Header,
16671669
#?STATE{cfg = #cfg{msg_ttl = MsgTTL}}) ->
16681670
case mc:is(Msg) of

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ all_tests() ->
167167
subscribe_redelivery_count,
168168
message_bytes_metrics,
169169
queue_length_limit_drop_head,
170+
queue_length_bytes_limit_drop_head,
170171
queue_length_limit_reject_publish,
171172
queue_length_limit_policy_cleared,
172173
subscribe_redelivery_limit,
@@ -3697,6 +3698,50 @@ queue_length_limit_drop_head(Config) ->
36973698
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
36983699
no_ack = true})).
36993700

3701+
queue_length_bytes_limit_drop_head(Config) ->
3702+
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
3703+
3704+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
3705+
QQ = ?config(queue_name, Config),
3706+
DLQ = <<"dead letter queue">>,
3707+
3708+
?assertEqual({'queue.declare_ok', DLQ, 0, 0},
3709+
declare(Ch, DLQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
3710+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
3711+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
3712+
{<<"x-overflow">>, longstr, <<"drop-head">>},
3713+
{<<"x-max-length-bytes">>, long, 1000},
3714+
{<<"x-dead-letter-exchange">>, longstr, <<>>},
3715+
{<<"x-dead-letter-routing-key">>, longstr, DLQ}])),
3716+
3717+
LargePayload = binary:copy(<<"x">>, 1500),
3718+
ok = amqp_channel:cast(Ch,
3719+
#'basic.publish'{routing_key = QQ},
3720+
#amqp_msg{payload = <<"m1">>}),
3721+
ok = amqp_channel:cast(Ch,
3722+
#'basic.publish'{routing_key = QQ},
3723+
#amqp_msg{payload = <<"m2">>}),
3724+
ok = amqp_channel:cast(Ch,
3725+
#'basic.publish'{routing_key = QQ},
3726+
#amqp_msg{payload = LargePayload}),
3727+
wait_for_consensus(QQ, Config),
3728+
wait_for_consensus(DLQ, Config),
3729+
RaName = ra_name(DLQ),
3730+
wait_for_messages_ready(Servers, RaName, 3),
3731+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
3732+
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
3733+
no_ack = true})),
3734+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
3735+
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
3736+
no_ack = true})),
3737+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = LargePayload}},
3738+
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
3739+
no_ack = true})),
3740+
3741+
[?assertEqual(#'queue.delete_ok'{message_count = 0},
3742+
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
3743+
|| Q <- [QQ, DLQ]].
3744+
37003745
queue_length_limit_reject_publish(Config) ->
37013746
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
37023747

0 commit comments

Comments
 (0)