Skip to content

Commit 18e3f1f

Browse files
committed
Fix timestamp when recovering x-death header
This commit fixes a bug present in 3.13.3 which was introduced by #11339 When an AMQP 0.9.1 client re-publishes a message with the x-death header set, the `time` field was not converted from the AMQP 0.9.1 seconds resolution to the internal millisecond resolution of `mc` fields `first_time` and `last_time`. Additionally, this commit will ignore invalid `time` fields sent in the `x-death` header from AMQP 0.9.1 clients as reported in https://rabbitmq.slack.com/archives/C1EDN83PA/p1719986557420719 which results in the following crash later on when the message gets consumed: ``` supervisor: {<0.476725.0>,rabbit_channel_sup} errorContext: child_terminated reason: {badarith, [{erlang,'div', [<<"2024-07-02T02:58:50.000-07:00">>,1000], [{error_info,#{module => erl_erts_errors}}]}, {mc_amqpl,death_table,7,[{file,"mc_amqpl.erl"},{line,806}]}, {lists,map,2,[{file,"lists.erl"},{line,1559}]}, {mc_amqpl,deaths_to_headers,2, [{file,"mc_amqpl.erl"},{line,764}]}, {mc_amqpl,protocol_state,2, [{file,"mc_amqpl.erl"},{line,406}]}, {rabbit_channel,handle_deliver0,4, [{file,"rabbit_channel.erl"},{line,2685}]}, {lists,foldl,3,[{file,"lists.erl"},{line,1594}]}, {rabbit_channel,handle_cast,2, [{file,"rabbit_channel.erl"},{line,732}]}]} ```
1 parent 7078e93 commit 18e3f1f

File tree

3 files changed

+18
-5
lines changed

3 files changed

+18
-5
lines changed

deps/rabbit/src/mc_amqpl.erl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -725,13 +725,15 @@ recover_deaths([Map = #{<<"exchange">> := Exchange,
725725
<<"routing-keys">> := RKeys,
726726
<<"reason">> := ReasonBin,
727727
<<"count">> := Count,
728-
<<"time">> := Ts} | Rem], Acc0) ->
728+
<<"time">> := TsSeconds} | Rem], Acc0)
729+
when is_integer(TsSeconds) ->
729730
Reason = binary_to_existing_atom(ReasonBin),
730-
DeathAnns0 = #{first_time => Ts,
731+
TsMillis = TsSeconds * 1000,
732+
DeathAnns0 = #{first_time => TsMillis,
731733
%% Given that this timestamp is absent in the AMQP 0.9.1
732734
%% x-death header, the last_time we set here is incorrect
733735
%% if the message was dead lettered more than one time.
734-
last_time => Ts},
736+
last_time => TsMillis},
735737
DeathAnns = case Map of
736738
#{<<"original-expiration">> := Exp} ->
737739
DeathAnns0#{ttl => binary_to_integer(Exp)};

deps/rabbit/test/dead_lettering_SUITE.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1598,6 +1598,7 @@ x_death_header_from_amqpl_client(Config) ->
15981598
DLXQName = ?config(queue_name_dlx, Config),
15991599
declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-message-ttl">>, long, 0}]),
16001600

1601+
Ts = os:system_time(second),
16011602
Payload = <<"my payload">>,
16021603
ok = amqp_channel:call(Ch,
16031604
#'basic.publish'{routing_key = QName},
@@ -1610,6 +1611,10 @@ x_death_header_from_amqpl_client(Config) ->
16101611
no_ack = true}),
16111612
{array, [{table, Death1}]} = rabbit_misc:table_lookup(Headers1, <<"x-death">>),
16121613
?assertEqual({long, 1}, rabbit_misc:table_lookup(Death1, <<"count">>)),
1614+
%% AMQP 0.9.1 timestamp should be seconds since epoch.
1615+
{timestamp, Ts1} = rabbit_misc:table_lookup(Death1, <<"time">>),
1616+
?assert(Ts1 > Ts - 10),
1617+
?assert(Ts1 < Ts + 10),
16131618

16141619
ok = amqp_channel:call(Ch,
16151620
#'basic.publish'{routing_key = QName},
@@ -1623,7 +1628,10 @@ x_death_header_from_amqpl_client(Config) ->
16231628
} = amqp_channel:call(Ch, #'basic.get'{queue = DLXQName,
16241629
no_ack = true}),
16251630
{array, [{table, Death2}]} = rabbit_misc:table_lookup(Headers2, <<"x-death">>),
1626-
?assertEqual({long, 2}, rabbit_misc:table_lookup(Death2, <<"count">>)).
1631+
?assertEqual({long, 2}, rabbit_misc:table_lookup(Death2, <<"count">>)),
1632+
%% Timestamp should be exactly the same as the 1st timestamp because it denotes
1633+
%% when this message was dead lettered the **first** time from this queue for this reason.
1634+
?assertEqual({timestamp, Ts1}, rabbit_misc:table_lookup(Death2, <<"time">>)).
16271635

16281636
set_queue_options(QName, Options) ->
16291637
rabbit_amqqueue:update(rabbit_misc:r(<<"/">>, queue, QName),

deps/rabbit/test/mc_unit_SUITE.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ amqpl_death_records(Env) ->
245245
amqpl_parse_x_death(_Config) ->
246246
Q = <<"my queue">>,
247247
DLQ = <<"my dead letter queue">>,
248+
Ts = os:system_time(second),
248249

249250
Content0 = #content{class_id = 60,
250251
properties = #'P_basic'{headers = [],
@@ -273,7 +274,9 @@ amqpl_parse_x_death(_Config) ->
273274
?assertMatch({_, longstr, <<"rejected">>}, header(<<"reason">>, T1)),
274275
?assertMatch({_, longstr, Q}, header(<<"queue">>, T1)),
275276
?assertMatch({_, longstr, <<"exch">>}, header(<<"exchange">>, T1)),
276-
?assertMatch({_, timestamp, _}, header(<<"time">>, T1)),
277+
{_, timestamp, Ts1} = header(<<"time">>, T1),
278+
?assert(Ts1 > Ts - 10),
279+
?assert(Ts1 < Ts + 10),
277280
?assertMatch({_, array, [{longstr, <<"apple">>}]}, header(<<"routing-keys">>, T1)),
278281
?assertMatch({_, longstr, <<"9999">>}, header(<<"original-expiration">>, T1)),
279282

0 commit comments

Comments
 (0)