Skip to content

Commit b6027ec

Browse files
committed
Fix dead lettering crash
Fixes #12933 The assumption that `x-last-death-*` annotations must have been set whenever the `deaths` annotation is set was wrong. Reproducation steps, Option 1: 1. In v3.13.7, dead letter a message from Q1 to Q2 (both can be classic queues). 2. Re-publish the message including its x-death header from Q2 back to Q1. (RabbitMQ 3.13.7 will interpret this x-death header and set the deaths annotation.) 3. Upgrade to v4.0.4 4. Dead letter the message from Q1 to Q2 will cause the following crash: ``` crasher: initial call: rabbit_amqqueue_process:init/1 pid: <0.577.0> registered_name: [] exception exit: {{badkey,<<"x-last-death-exchange">>}, [{mc,record_death,4,[{file,"mc.erl"},{line,410}]}, {rabbit_dead_letter,publish,5, [{file,"rabbit_dead_letter.erl"},{line,38}]}, {rabbit_amqqueue_process,'-dead_letter_msgs/4-fun-0-', 7, [{file,"rabbit_amqqueue_process.erl"},{line,1060}]}, {rabbit_variable_queue,'-ackfold/4-fun-0-',3, [{file,"rabbit_variable_queue.erl"},{line,655}]}, {lists,foldl,3,[{file,"lists.erl"},{line,2146}]}, {rabbit_variable_queue,ackfold,4, [{file,"rabbit_variable_queue.erl"},{line,652}]}, {rabbit_priority_queue,ackfold,4, [{file,"rabbit_priority_queue.erl"},{line,309}]}, {rabbit_amqqueue_process, '-dead_letter_rejected_msgs/3-fun-0-',5, [{file,"rabbit_amqqueue_process.erl"}, {line,1038}]}]} ``` Reproduction steps, Option 2: 1. Run a 4.0.4 / 3.13.7 mixed version cluster where both queues Q1 and Q2 are hosted on the 4.0.4 node. 2. Send a message to Q1 which dead letters to Q2. 3. Re-publish a message with the x-death AMQP 0.9.1 header from Q2 to Q1. However, this time make sure to publish to the 3.13.7 node which forwards this message to Q1 on the 4.0.4 node. 4. Subsequently dead lettering this message from Q1 to Q2 (happening on the 4.0.4 node) will also cause the crash. The modified test case in this commit was able to repro this crash via Option 2 in the mixed version cluster tests on the `v4.0.x` branch.
1 parent fdfbaf8 commit b6027ec

File tree

2 files changed

+20
-18
lines changed

2 files changed

+20
-18
lines changed

deps/rabbit/src/mc.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -433,9 +433,9 @@ record_death(Reason, SourceQueue,
433433
[{Key, NewDeath} | Deaths0]
434434
end
435435
end,
436-
Anns0#{<<"x-last-death-reason">> := ReasonBin,
437-
<<"x-last-death-queue">> := SourceQueue,
438-
<<"x-last-death-exchange">> := Exchange,
436+
Anns0#{<<"x-last-death-reason">> => ReasonBin,
437+
<<"x-last-death-queue">> => SourceQueue,
438+
<<"x-last-death-exchange">> => Exchange,
439439
deaths := Deaths};
440440
_ ->
441441
Deaths = case Env of

deps/rabbit/test/dead_lettering_SUITE.erl

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1327,52 +1327,54 @@ dead_letter_headers_should_be_appended_for_each_event(Config) ->
13271327

13281328
dead_letter_headers_should_not_be_appended_for_republish(Config) ->
13291329
%% here we (re-)publish a message with the DL headers already set
1330-
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
1330+
{Conn0, Ch0} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
1331+
{Conn1, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 1),
13311332
Args = ?config(queue_args, Config),
13321333
Durable = ?config(queue_durable, Config),
13331334
QName = ?config(queue_name, Config),
13341335
DlxName = ?config(queue_name_dlx, Config),
13351336

13361337
DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, <<>>},
13371338
{<<"x-dead-letter-routing-key">>, longstr, DlxName}],
1338-
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
1339-
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DlxName, arguments = Args, durable = Durable}),
1339+
#'queue.declare_ok'{} = amqp_channel:call(Ch0, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
1340+
#'queue.declare_ok'{} = amqp_channel:call(Ch0, #'queue.declare'{queue = DlxName, arguments = Args, durable = Durable}),
13401341

13411342
P = <<"msg1">>,
13421343

13431344
%% Publish message
1344-
publish(Ch, QName, [P]),
1345+
publish(Ch0, QName, [P]),
13451346
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
1346-
[DTag] = consume(Ch, QName, [P]),
1347+
[DTag] = consume(Ch0, QName, [P]),
13471348
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
1348-
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag,
1349-
multiple = false,
1350-
requeue = false}),
1349+
amqp_channel:cast(Ch0, #'basic.nack'{delivery_tag = DTag,
1350+
multiple = false,
1351+
requeue = false}),
13511352
wait_for_messages(Config, [[DlxName, <<"1">>, <<"1">>, <<"0">>]]),
13521353
{#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P,
13531354
props = #'P_basic'{headers = Headers1}}} =
1354-
amqp_channel:call(Ch, #'basic.get'{queue = DlxName}),
1355+
amqp_channel:call(Ch0, #'basic.get'{queue = DlxName}),
13551356
{array, [{table, Death1}]} = rabbit_misc:table_lookup(Headers1, <<"x-death">>),
13561357
?assertEqual({longstr, <<"rejected">>}, rabbit_misc:table_lookup(Death1, <<"reason">>)),
13571358

1358-
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag1}),
1359+
amqp_channel:cast(Ch0, #'basic.ack'{delivery_tag = DTag1}),
13591360

13601361
wait_for_messages(Config, [[DlxName, <<"0">>, <<"0">>, <<"0">>]]),
13611362

1362-
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
1363+
#'queue.delete_ok'{} = amqp_channel:call(Ch0, #'queue.delete'{queue = QName}),
13631364
DeadLetterArgs1 = DeadLetterArgs ++ [{<<"x-message-ttl">>, long, 1}],
1364-
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs1 ++ Args, durable = Durable}),
1365+
#'queue.declare_ok'{} = amqp_channel:call(Ch0, #'queue.declare'{queue = QName, arguments = DeadLetterArgs1 ++ Args, durable = Durable}),
13651366

1366-
publish(Ch, QName, [P], Headers1),
1367+
publish(Ch1, QName, [P], Headers1),
13671368

13681369
wait_for_messages(Config, [[DlxName, <<"1">>, <<"1">>, <<"0">>]]),
13691370
{#'basic.get_ok'{}, #amqp_msg{payload = P,
13701371
props = #'P_basic'{headers = Headers2}}} =
1371-
amqp_channel:call(Ch, #'basic.get'{queue = DlxName}),
1372+
amqp_channel:call(Ch0, #'basic.get'{queue = DlxName}),
13721373

13731374
{array, [{table, Death2}]} = rabbit_misc:table_lookup(Headers2, <<"x-death">>),
13741375
?assertEqual({longstr, <<"expired">>}, rabbit_misc:table_lookup(Death2, <<"reason">>)),
1375-
ok = rabbit_ct_client_helpers:close_connection(Conn).
1376+
ok = rabbit_ct_client_helpers:close_connection(Conn0),
1377+
ok = rabbit_ct_client_helpers:close_connection(Conn1).
13761378

13771379
%% Dead-lettering a message modifies its headers:
13781380
%% the exchange name is replaced with that of the latest dead-letter exchange,

0 commit comments

Comments
 (0)