From 18e3f1ff5c429fcbd3b600e41e0675930e06ee42 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Wed, 3 Jul 2024 10:30:22 +0200 Subject: [PATCH] Fix timestamp when recovering x-death header This commit fixes a bug present in 3.13.3 which was introduced by https://github.com/rabbitmq/rabbitmq-server/pull/11339 When an AMQP 0.9.1 client re-publishes a message with the x-death header set, the `time` field was not converted from the AMQP 0.9.1 seconds resolution to the internal millisecond resolution of `mc` fields `first_time` and `last_time`. Additionally, this commit will ignore invalid `time` fields sent in the `x-death` header from AMQP 0.9.1 clients as reported in https://rabbitmq.slack.com/archives/C1EDN83PA/p1719986557420719 which results in the following crash later on when the message gets consumed: ``` supervisor: {<0.476725.0>,rabbit_channel_sup} errorContext: child_terminated reason: {badarith, [{erlang,'div', [<<"2024-07-02T02:58:50.000-07:00">>,1000], [{error_info,#{module => erl_erts_errors}}]}, {mc_amqpl,death_table,7,[{file,"mc_amqpl.erl"},{line,806}]}, {lists,map,2,[{file,"lists.erl"},{line,1559}]}, {mc_amqpl,deaths_to_headers,2, [{file,"mc_amqpl.erl"},{line,764}]}, {mc_amqpl,protocol_state,2, [{file,"mc_amqpl.erl"},{line,406}]}, {rabbit_channel,handle_deliver0,4, [{file,"rabbit_channel.erl"},{line,2685}]}, {lists,foldl,3,[{file,"lists.erl"},{line,1594}]}, {rabbit_channel,handle_cast,2, [{file,"rabbit_channel.erl"},{line,732}]}]} ``` --- deps/rabbit/src/mc_amqpl.erl | 8 +++++--- deps/rabbit/test/dead_lettering_SUITE.erl | 10 +++++++++- deps/rabbit/test/mc_unit_SUITE.erl | 5 ++++- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index f5d13c4456f6..8b92dbc64f50 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -725,13 +725,15 @@ recover_deaths([Map = #{<<"exchange">> := Exchange, <<"routing-keys">> := RKeys, <<"reason">> := ReasonBin, <<"count">> := Count, - <<"time">> := Ts} | Rem], Acc0) -> + <<"time">> := TsSeconds} | Rem], Acc0) + when is_integer(TsSeconds) -> Reason = binary_to_existing_atom(ReasonBin), - DeathAnns0 = #{first_time => Ts, + TsMillis = TsSeconds * 1000, + DeathAnns0 = #{first_time => TsMillis, %% Given that this timestamp is absent in the AMQP 0.9.1 %% x-death header, the last_time we set here is incorrect %% if the message was dead lettered more than one time. - last_time => Ts}, + last_time => TsMillis}, DeathAnns = case Map of #{<<"original-expiration">> := Exp} -> DeathAnns0#{ttl => binary_to_integer(Exp)}; diff --git a/deps/rabbit/test/dead_lettering_SUITE.erl b/deps/rabbit/test/dead_lettering_SUITE.erl index 7f016d63847a..8966d018dc93 100644 --- a/deps/rabbit/test/dead_lettering_SUITE.erl +++ b/deps/rabbit/test/dead_lettering_SUITE.erl @@ -1598,6 +1598,7 @@ x_death_header_from_amqpl_client(Config) -> DLXQName = ?config(queue_name_dlx, Config), declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-message-ttl">>, long, 0}]), + Ts = os:system_time(second), Payload = <<"my payload">>, ok = amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, @@ -1610,6 +1611,10 @@ x_death_header_from_amqpl_client(Config) -> no_ack = true}), {array, [{table, Death1}]} = rabbit_misc:table_lookup(Headers1, <<"x-death">>), ?assertEqual({long, 1}, rabbit_misc:table_lookup(Death1, <<"count">>)), + %% AMQP 0.9.1 timestamp should be seconds since epoch. + {timestamp, Ts1} = rabbit_misc:table_lookup(Death1, <<"time">>), + ?assert(Ts1 > Ts - 10), + ?assert(Ts1 < Ts + 10), ok = amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, @@ -1623,7 +1628,10 @@ x_death_header_from_amqpl_client(Config) -> } = amqp_channel:call(Ch, #'basic.get'{queue = DLXQName, no_ack = true}), {array, [{table, Death2}]} = rabbit_misc:table_lookup(Headers2, <<"x-death">>), - ?assertEqual({long, 2}, rabbit_misc:table_lookup(Death2, <<"count">>)). + ?assertEqual({long, 2}, rabbit_misc:table_lookup(Death2, <<"count">>)), + %% Timestamp should be exactly the same as the 1st timestamp because it denotes + %% when this message was dead lettered the **first** time from this queue for this reason. + ?assertEqual({timestamp, Ts1}, rabbit_misc:table_lookup(Death2, <<"time">>)). set_queue_options(QName, Options) -> rabbit_amqqueue:update(rabbit_misc:r(<<"/">>, queue, QName), diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index 09580ed69d5c..35c39b292067 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -245,6 +245,7 @@ amqpl_death_records(Env) -> amqpl_parse_x_death(_Config) -> Q = <<"my queue">>, DLQ = <<"my dead letter queue">>, + Ts = os:system_time(second), Content0 = #content{class_id = 60, properties = #'P_basic'{headers = [], @@ -273,7 +274,9 @@ amqpl_parse_x_death(_Config) -> ?assertMatch({_, longstr, <<"rejected">>}, header(<<"reason">>, T1)), ?assertMatch({_, longstr, Q}, header(<<"queue">>, T1)), ?assertMatch({_, longstr, <<"exch">>}, header(<<"exchange">>, T1)), - ?assertMatch({_, timestamp, _}, header(<<"time">>, T1)), + {_, timestamp, Ts1} = header(<<"time">>, T1), + ?assert(Ts1 > Ts - 10), + ?assert(Ts1 < Ts + 10), ?assertMatch({_, array, [{longstr, <<"apple">>}]}, header(<<"routing-keys">>, T1)), ?assertMatch({_, longstr, <<"9999">>}, header(<<"original-expiration">>, T1)),