Skip to content

Commit b7d8639

Browse files
ansdmergify[bot]
authored andcommitted
Fix dead lettering crash
Fixes #12933 The assumption that `x-last-death-*` annotations must have been set whenever the `deaths` annotation is set was wrong. Reproducation steps, Option 1: 1. In v3.13.7, dead letter a message from Q1 to Q2 (both can be classic queues). 2. Re-publish the message including its x-death header from Q2 back to Q1. (RabbitMQ 3.13.7 will interpret this x-death header and set the deaths annotation.) 3. Upgrade to v4.0.4 4. Dead letter the message from Q1 to Q2 will cause the following crash: ``` crasher: initial call: rabbit_amqqueue_process:init/1 pid: <0.577.0> registered_name: [] exception exit: {{badkey,<<"x-last-death-exchange">>}, [{mc,record_death,4,[{file,"mc.erl"},{line,410}]}, {rabbit_dead_letter,publish,5, [{file,"rabbit_dead_letter.erl"},{line,38}]}, {rabbit_amqqueue_process,'-dead_letter_msgs/4-fun-0-', 7, [{file,"rabbit_amqqueue_process.erl"},{line,1060}]}, {rabbit_variable_queue,'-ackfold/4-fun-0-',3, [{file,"rabbit_variable_queue.erl"},{line,655}]}, {lists,foldl,3,[{file,"lists.erl"},{line,2146}]}, {rabbit_variable_queue,ackfold,4, [{file,"rabbit_variable_queue.erl"},{line,652}]}, {rabbit_priority_queue,ackfold,4, [{file,"rabbit_priority_queue.erl"},{line,309}]}, {rabbit_amqqueue_process, '-dead_letter_rejected_msgs/3-fun-0-',5, [{file,"rabbit_amqqueue_process.erl"}, {line,1038}]}]} ``` Reproduction steps, Option 2: 1. Run a 4.0.4 / 3.13.7 mixed version cluster where both queues Q1 and Q2 are hosted on the 4.0.4 node. 2. Send a message to Q1 which dead letters to Q2. 3. Re-publish a message with the x-death AMQP 0.9.1 header from Q2 to Q1. However, this time make sure to publish to the 3.13.7 node which forwards this message to Q1 on the 4.0.4 node. 4. Subsequently dead lettering this message from Q1 to Q2 (happening on the 4.0.4 node) will also cause the crash. The modified test case in this commit was able to repro this crash via Option 2 in the mixed version cluster tests on the `v4.0.x` branch. (cherry picked from commit b6027ec)
1 parent 78602ec commit b7d8639

File tree

2 files changed

+20
-18
lines changed

2 files changed

+20
-18
lines changed

deps/rabbit/src/mc.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -407,9 +407,9 @@ record_death(Reason, SourceQueue,
407407
[{Key, NewDeath} | Deaths0]
408408
end
409409
end,
410-
Anns0#{<<"x-last-death-reason">> := ReasonBin,
411-
<<"x-last-death-queue">> := SourceQueue,
412-
<<"x-last-death-exchange">> := Exchange,
410+
Anns0#{<<"x-last-death-reason">> => ReasonBin,
411+
<<"x-last-death-queue">> => SourceQueue,
412+
<<"x-last-death-exchange">> => Exchange,
413413
deaths := Deaths};
414414
_ ->
415415
Deaths = case Env of

deps/rabbit/test/dead_lettering_SUITE.erl

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1331,52 +1331,54 @@ dead_letter_headers_should_be_appended_for_each_event(Config) ->
13311331

13321332
dead_letter_headers_should_not_be_appended_for_republish(Config) ->
13331333
%% here we (re-)publish a message with the DL headers already set
1334-
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
1334+
{Conn0, Ch0} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
1335+
{Conn1, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 1),
13351336
Args = ?config(queue_args, Config),
13361337
Durable = ?config(queue_durable, Config),
13371338
QName = ?config(queue_name, Config),
13381339
DlxName = ?config(queue_name_dlx, Config),
13391340

13401341
DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, <<>>},
13411342
{<<"x-dead-letter-routing-key">>, longstr, DlxName}],
1342-
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
1343-
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DlxName, arguments = Args, durable = Durable}),
1343+
#'queue.declare_ok'{} = amqp_channel:call(Ch0, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
1344+
#'queue.declare_ok'{} = amqp_channel:call(Ch0, #'queue.declare'{queue = DlxName, arguments = Args, durable = Durable}),
13441345

13451346
P = <<"msg1">>,
13461347

13471348
%% Publish message
1348-
publish(Ch, QName, [P]),
1349+
publish(Ch0, QName, [P]),
13491350
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
1350-
[DTag] = consume(Ch, QName, [P]),
1351+
[DTag] = consume(Ch0, QName, [P]),
13511352
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
1352-
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag,
1353-
multiple = false,
1354-
requeue = false}),
1353+
amqp_channel:cast(Ch0, #'basic.nack'{delivery_tag = DTag,
1354+
multiple = false,
1355+
requeue = false}),
13551356
wait_for_messages(Config, [[DlxName, <<"1">>, <<"1">>, <<"0">>]]),
13561357
{#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P,
13571358
props = #'P_basic'{headers = Headers1}}} =
1358-
amqp_channel:call(Ch, #'basic.get'{queue = DlxName}),
1359+
amqp_channel:call(Ch0, #'basic.get'{queue = DlxName}),
13591360
{array, [{table, Death1}]} = rabbit_misc:table_lookup(Headers1, <<"x-death">>),
13601361
?assertEqual({longstr, <<"rejected">>}, rabbit_misc:table_lookup(Death1, <<"reason">>)),
13611362

1362-
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag1}),
1363+
amqp_channel:cast(Ch0, #'basic.ack'{delivery_tag = DTag1}),
13631364

13641365
wait_for_messages(Config, [[DlxName, <<"0">>, <<"0">>, <<"0">>]]),
13651366

1366-
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
1367+
#'queue.delete_ok'{} = amqp_channel:call(Ch0, #'queue.delete'{queue = QName}),
13671368
DeadLetterArgs1 = DeadLetterArgs ++ [{<<"x-message-ttl">>, long, 1}],
1368-
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs1 ++ Args, durable = Durable}),
1369+
#'queue.declare_ok'{} = amqp_channel:call(Ch0, #'queue.declare'{queue = QName, arguments = DeadLetterArgs1 ++ Args, durable = Durable}),
13691370

1370-
publish(Ch, QName, [P], Headers1),
1371+
publish(Ch1, QName, [P], Headers1),
13711372

13721373
wait_for_messages(Config, [[DlxName, <<"1">>, <<"1">>, <<"0">>]]),
13731374
{#'basic.get_ok'{}, #amqp_msg{payload = P,
13741375
props = #'P_basic'{headers = Headers2}}} =
1375-
amqp_channel:call(Ch, #'basic.get'{queue = DlxName}),
1376+
amqp_channel:call(Ch0, #'basic.get'{queue = DlxName}),
13761377

13771378
{array, [{table, Death2}]} = rabbit_misc:table_lookup(Headers2, <<"x-death">>),
13781379
?assertEqual({longstr, <<"expired">>}, rabbit_misc:table_lookup(Death2, <<"reason">>)),
1379-
ok = rabbit_ct_client_helpers:close_connection(Conn).
1380+
ok = rabbit_ct_client_helpers:close_connection(Conn0),
1381+
ok = rabbit_ct_client_helpers:close_connection(Conn1).
13801382

13811383
%% Dead-lettering a message modifies its headers:
13821384
%% the exchange name is replaced with that of the latest dead-letter exchange,

0 commit comments

Comments
 (0)