From cb53a2140ae20a675ff7b8d9ba716448faffb05e Mon Sep 17 00:00:00 2001 From: David Ansari Date: Thu, 16 Oct 2025 10:12:39 +0200 Subject: [PATCH] Require feature flag `message_containers_deaths_v2` Require feature flag `message_containers_deaths_v2` for RabbitMQ 4.3 and delete the compatibility code. --- deps/rabbit/Makefile | 2 +- deps/rabbit/include/mc.hrl | 2 - deps/rabbit/src/mc.erl | 23 +--- deps/rabbit/src/mc_compat.erl | 4 +- deps/rabbit/src/rabbit_core_ff.erl | 2 +- deps/rabbit/src/rabbit_dead_letter.erl | 6 +- deps/rabbit/src/rabbit_fifo_dlx_worker.erl | 6 +- deps/rabbit/test/amqp_client_SUITE.erl | 62 ++++----- deps/rabbit/test/dead_lettering_SUITE.erl | 11 -- deps/rabbit/test/mc_unit_SUITE.erl | 48 +++---- .../message_containers_deaths_v2_SUITE.erl | 124 ------------------ 11 files changed, 55 insertions(+), 235 deletions(-) delete mode 100644 deps/rabbit/test/message_containers_deaths_v2_SUITE.erl diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index f5e7f40365e4..5b00e33bf30c 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -274,7 +274,7 @@ PARALLEL_CT_SET_2_D = queue_length_limits queue_parallel quorum_queue_member_rec PARALLEL_CT_SET_3_A = definition_import per_user_connection_channel_limit_partitions per_vhost_connection_limit_partitions policy priority_queue_recovery rabbit_fifo_v0 rabbit_stream_sac_coordinator_v4 rabbit_stream_sac_coordinator unit_credit_flow unit_queue_consumers unit_queue_location unit_quorum_queue PARALLEL_CT_SET_3_B = cluster_upgrade list_consumers_sanity_check list_queues_online_and_offline logging lqueue maintenance_mode rabbit_fifo_q -PARALLEL_CT_SET_3_C = cli_forget_cluster_node feature_flags_v2 mc_unit message_containers_deaths_v2 message_size_limit metadata_store_migration +PARALLEL_CT_SET_3_C = cli_forget_cluster_node feature_flags_v2 mc_unit message_size_limit metadata_store_migration PARALLEL_CT_SET_3_D = metadata_store_phase1 metrics mirrored_supervisor peer_discovery_classic_config proxy_protocol runtime_parameters unit_stats_and_metrics unit_supervisor2 unit_vm_memory_monitor PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_msg_interceptor rabbitmq_4_0_deprecations unit_pg_local unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue diff --git a/deps/rabbit/include/mc.hrl b/deps/rabbit/include/mc.hrl index abe38fb7c617..5ff42e104541 100644 --- a/deps/rabbit/include/mc.hrl +++ b/deps/rabbit/include/mc.hrl @@ -13,8 +13,6 @@ -define(ANN_DURABLE, d). -define(ANN_PRIORITY, p). --define(FF_MC_DEATHS_V2, message_containers_deaths_v2). - -type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}. -type death_anns() :: #{%% timestamp of the first time this message %% was dead lettered from this queue for this reason diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 828f6f6ac34b..d09d8c4c3c46 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -35,7 +35,7 @@ convert/3, protocol_state/1, prepare/2, - record_death/4, + record_death/3, is_death_cycle/2, death_queue_names/1 ]). @@ -382,11 +382,8 @@ protocol_state(BasicMsg) -> -spec record_death(rabbit_dead_letter:reason(), rabbit_misc:resource_name(), - state(), - environment()) -> state(). -record_death(Reason, SourceQueue, - #?MODULE{annotations = Anns0} = State, - Env) + state()) -> state(). +record_death(Reason, SourceQueue, #?MODULE{annotations = Anns0} = State) when is_atom(Reason) andalso is_binary(SourceQueue) -> Key = {SourceQueue, Reason}, @@ -438,25 +435,17 @@ record_death(Reason, SourceQueue, <<"x-last-death-exchange">> => Exchange, deaths := Deaths}; _ -> - Deaths = case Env of - #{?FF_MC_DEATHS_V2 := false} -> - #deaths{last = Key, - first = Key, - records = #{Key => NewDeath}}; - _ -> - [{Key, NewDeath}] - end, Anns0#{<<"x-first-death-reason">> => ReasonBin, <<"x-first-death-queue">> => SourceQueue, <<"x-first-death-exchange">> => Exchange, <<"x-last-death-reason">> => ReasonBin, <<"x-last-death-queue">> => SourceQueue, <<"x-last-death-exchange">> => Exchange, - deaths => Deaths} + deaths => [{Key, NewDeath}]} end, State#?MODULE{annotations = Anns}; -record_death(Reason, SourceQueue, BasicMsg, Env) -> - mc_compat:record_death(Reason, SourceQueue, BasicMsg, Env). +record_death(Reason, SourceQueue, BasicMsg) -> + mc_compat:record_death(Reason, SourceQueue, BasicMsg). update_death(#death{count = Count, anns = DeathAnns} = Death, Timestamp) -> diff --git a/deps/rabbit/src/mc_compat.erl b/deps/rabbit/src/mc_compat.erl index f501327c4b7b..b6469ed0defc 100644 --- a/deps/rabbit/src/mc_compat.erl +++ b/deps/rabbit/src/mc_compat.erl @@ -28,7 +28,7 @@ protocol_state/1, %serialize/1, prepare/2, - record_death/4, + record_death/3, is_death_cycle/2, %deaths/1, death_queue_names/1 @@ -166,7 +166,7 @@ prepare(store, Msg) -> record_death(Reason, SourceQueue, #basic_message{content = Content, exchange_name = Exchange, - routing_keys = RoutingKeys} = Msg, _Env) -> + routing_keys = RoutingKeys} = Msg) -> % HeadersFun1 = fun (H) -> lists:keydelete(<<"CC">>, 1, H) end, ReasonBin = atom_to_binary(Reason), TimeSec = os:system_time(seconds), diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index b98f3430f5a4..d02700614a43 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -187,7 +187,7 @@ {message_containers_deaths_v2, #{desc => "Bug fix for dead letter cycle detection", doc_url => "https://github.com/rabbitmq/rabbitmq-server/issues/11159", - stability => stable, + stability => required, depends_on => [message_containers] }}). diff --git a/deps/rabbit/src/rabbit_dead_letter.erl b/deps/rabbit/src/rabbit_dead_letter.erl index 1b7d1da232f1..abcd4758f1d9 100644 --- a/deps/rabbit/src/rabbit_dead_letter.erl +++ b/deps/rabbit/src/rabbit_dead_letter.erl @@ -32,11 +32,7 @@ publish(Msg0, Reason, #exchange{name = XName} = DLX, RK, _ -> [RK] end, - Env = case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of - true -> #{}; - false -> #{?FF_MC_DEATHS_V2 => false} - end, - Msg1 = mc:record_death(Reason, SourceQName, Msg0, Env), + Msg1 = mc:record_death(Reason, SourceQName, Msg0), {Ttl, Msg2} = mc:take_annotation(dead_letter_ttl, Msg1), Msg3 = mc:set_ttl(Ttl, Msg2), Msg4 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg3), diff --git a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl index a6fc3ce48318..98092cc1d8f4 100644 --- a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl +++ b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl @@ -325,11 +325,7 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason, _ -> [RKey] end, - Env = case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of - true -> #{}; - false -> #{?FF_MC_DEATHS_V2 => false} - end, - Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg, Env), + Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg), Msg1 = mc:set_ttl(undefined, Msg0), Msg2 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg1), Msg = mc:set_annotation(?ANN_EXCHANGE, DLXName, Msg2), diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 21b682e5fe6b..9287b10d8054 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -246,18 +246,11 @@ init_per_group(Group, Config) -> rabbit_ct_client_helpers:setup_steps()). end_per_group(_, Config) -> - rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_helpers:run_teardown_steps( + Config, rabbit_ct_client_helpers:teardown_steps() ++ rabbit_ct_broker_helpers:teardown_steps()). -init_per_testcase(T = dead_letter_reject, Config) -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2) of - ok -> - rabbit_ct_helpers:testcase_started(Config, T); - _ -> - {skip, "This test is known to fail with feature flag message_containers_deaths_v2 disabled " - "due bug https://github.com/rabbitmq/rabbitmq-server/issues/11159"} - end; init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). @@ -5116,34 +5109,29 @@ dead_letter_headers_exchange(Config) -> ?assertEqual(<<"m2">>, amqp10_msg:body_bin(Msg2)), ?assertEqual(#{message_id => <<"my ID">>}, amqp10_msg:properties(Msg1)), ?assertEqual(0, maps:size(amqp10_msg:properties(Msg2))), - case rpc(Config, rabbit_feature_flags, is_enabled, [message_containers_deaths_v2]) of - true -> - ?assertMatch( - #{<<"x-first-death-queue">> := QName1, - <<"x-first-death-exchange">> := <<>>, - <<"x-first-death-reason">> := <<"expired">>, - <<"x-last-death-queue">> := QName1, - <<"x-last-death-exchange">> := <<>>, - <<"x-last-death-reason">> := <<"expired">>, - <<"x-opt-deaths">> := {array, - map, - [{map, - [ - {{symbol, <<"queue">>}, {utf8, QName1}}, - {{symbol, <<"reason">>}, {symbol, <<"expired">>}}, - {{symbol, <<"count">>}, {ulong, 1}}, - {{symbol, <<"first-time">>}, {timestamp, Timestamp}}, - {{symbol, <<"last-time">>}, {timestamp, Timestamp}}, - {{symbol, <<"exchange">>},{utf8, <<>>}}, - {{symbol, <<"routing-keys">>}, {array, utf8, [{utf8, QName1}]}} - ]}]} - } when is_integer(Timestamp) andalso - Timestamp > Now - 5000 andalso - Timestamp < Now + 5000, - amqp10_msg:message_annotations(Msg1)); - false -> - ok - end, + ?assertMatch( + #{<<"x-first-death-queue">> := QName1, + <<"x-first-death-exchange">> := <<>>, + <<"x-first-death-reason">> := <<"expired">>, + <<"x-last-death-queue">> := QName1, + <<"x-last-death-exchange">> := <<>>, + <<"x-last-death-reason">> := <<"expired">>, + <<"x-opt-deaths">> := {array, + map, + [{map, + [ + {{symbol, <<"queue">>}, {utf8, QName1}}, + {{symbol, <<"reason">>}, {symbol, <<"expired">>}}, + {{symbol, <<"count">>}, {ulong, 1}}, + {{symbol, <<"first-time">>}, {timestamp, Timestamp}}, + {{symbol, <<"last-time">>}, {timestamp, Timestamp}}, + {{symbol, <<"exchange">>},{utf8, <<>>}}, + {{symbol, <<"routing-keys">>}, {array, utf8, [{utf8, QName1}]}} + ]}]} + } when is_integer(Timestamp) andalso + Timestamp > Now - 5000 andalso + Timestamp < Now + 5000, + amqp10_msg:message_annotations(Msg1)), %% We expect M3 and M4 to get dropped. receive Unexp -> ct:fail({unexpected, Unexp}) diff --git a/deps/rabbit/test/dead_lettering_SUITE.erl b/deps/rabbit/test/dead_lettering_SUITE.erl index df281c5f8a7c..4f19aa28a4ba 100644 --- a/deps/rabbit/test/dead_lettering_SUITE.erl +++ b/deps/rabbit/test/dead_lettering_SUITE.erl @@ -176,18 +176,7 @@ end_per_group(Group, Config) -> Config end. -init_per_testcase(T, Config) - when T =:= dead_letter_reject_expire_expire orelse - T =:= stream -> - %% With feature flag message_containers_deaths_v2 disabled, test case: - %% * dead_letter_reject_expire_expire is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11159 - %% * stream is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11173 - ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2), - init_per_testcase0(T, Config); init_per_testcase(Testcase, Config) -> - init_per_testcase0(Testcase, Config). - -init_per_testcase0(Testcase, Config) -> Group = proplists:get_value(name, ?config(tc_group_properties, Config)), Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])), Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])), diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index 41531eb22a7b..8a3ecba55e53 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -29,8 +29,7 @@ all_tests() -> amqpl_compat, amqpl_table_x_header, amqpl_table_x_header_array_of_tbls, - amqpl_death_v1_records, - amqpl_death_v2_records, + amqpl_death_records, is_death_cycle, amqpl_amqp_bin_amqpl, amqpl_cc_amqp_bin_amqpl, @@ -198,50 +197,39 @@ amqpl_table_x_header_array_of_tbls(_Config) -> ]}, mc:x_header(<<"x-fruit">>, Msg)). -amqpl_death_v1_records(_Config) -> - ok = amqpl_death_records(#{?FF_MC_DEATHS_V2 => false}). - -amqpl_death_v2_records(_Config) -> - ok = amqpl_death_records(#{?FF_MC_DEATHS_V2 => true}). - -amqpl_death_records(Env) -> +amqpl_death_records(_Config) -> Content = #content{class_id = 60, properties = #'P_basic'{headers = []}, payload_fragments_rev = [<<"data">>]}, Msg0 = mc:prepare(store, mc:init(mc_amqpl, Content, annotations())), - Msg1 = mc:record_death(rejected, <<"q1">>, Msg0, Env), - ?assertEqual([<<"q1">>], mc:death_queue_names(Msg1)), - ?assertEqual(false, mc:is_death_cycle(<<"q1">>, Msg1)), + Msg1 = mc:record_death(rejected, <<"q">>, Msg0), + ?assertEqual([<<"q">>], mc:death_queue_names(Msg1)), + ?assertEqual(false, mc:is_death_cycle(<<"q">>, Msg1)), #content{properties = #'P_basic'{headers = H1}} = mc:protocol_state(Msg1), ?assertMatch({_, array, [_]}, header(<<"x-death">>, H1)), - ?assertMatch({_, longstr, <<"q1">>}, header(<<"x-first-death-queue">>, H1)), + ?assertMatch({_, longstr, <<"q">>}, header(<<"x-first-death-queue">>, H1)), ?assertMatch({_, longstr, <<"exch">>}, header(<<"x-first-death-exchange">>, H1)), ?assertMatch({_, longstr, <<"rejected">>}, header(<<"x-first-death-reason">>, H1)), - ?assertMatch({_, longstr, <<"q1">>}, header(<<"x-last-death-queue">>, H1)), + ?assertMatch({_, longstr, <<"q">>}, header(<<"x-last-death-queue">>, H1)), ?assertMatch({_, longstr, <<"exch">>}, header(<<"x-last-death-exchange">>, H1)), ?assertMatch({_, longstr, <<"rejected">>}, header(<<"x-last-death-reason">>, H1)), {_, array, [{table, T1}]} = header(<<"x-death">>, H1), ?assertMatch({_, long, 1}, header(<<"count">>, T1)), ?assertMatch({_, longstr, <<"rejected">>}, header(<<"reason">>, T1)), - ?assertMatch({_, longstr, <<"q1">>}, header(<<"queue">>, T1)), + ?assertMatch({_, longstr, <<"q">>}, header(<<"queue">>, T1)), ?assertMatch({_, longstr, <<"exch">>}, header(<<"exchange">>, T1)), ?assertMatch({_, timestamp, _}, header(<<"time">>, T1)), ?assertMatch({_, array, [{longstr, <<"apple">>}]}, header(<<"routing-keys">>, T1)), - - %% second dead letter, e.g. an expired reason returning to source queue - - %% record_death uses a timestamp for death record ordering, ensure - %% it is definitely higher than the last timestamp taken - timer:sleep(2), - Msg2 = mc:record_death(expired, <<"dl">>, Msg1, Env), - + %% 2nd dead letter, e.g. an expired reason + Msg2 = mc:record_death(expired, <<"dead letter queue">>, Msg1), #content{properties = #'P_basic'{headers = H2}} = mc:protocol_state(Msg2), + %% We expect deaths to be ordered by recency {_, array, [{table, T2a}, {table, T2b}]} = header(<<"x-death">>, H2), - ?assertMatch({_, longstr, <<"dl">>}, header(<<"queue">>, T2a)), - ?assertMatch({_, longstr, <<"q1">>}, header(<<"queue">>, T2b)), + ?assertMatch({_, longstr, <<"dead letter queue">>}, header(<<"queue">>, T2a)), + ?assertMatch({_, longstr, <<"q">>}, header(<<"queue">>, T2b)), ok. is_death_cycle(_Config) -> @@ -254,29 +242,29 @@ is_death_cycle(_Config) -> %% Q1 --rejected--> Q2 --expired--> Q3 --expired--> %% Q1 --rejected--> Q2 --expired--> Q3 - Msg1 = mc:record_death(rejected, <<"q1">>, Msg0, #{}), + Msg1 = mc:record_death(rejected, <<"q1">>, Msg0), ?assertNot(mc:is_death_cycle(<<"q1">>, Msg1), "A queue that dead letters to itself due to rejected is not considered a cycle."), ?assertNot(mc:is_death_cycle(<<"q2">>, Msg1)), ?assertNot(mc:is_death_cycle(<<"q3">>, Msg1)), - Msg2 = mc:record_death(expired, <<"q2">>, Msg1, #{}), + Msg2 = mc:record_death(expired, <<"q2">>, Msg1), ?assertNot(mc:is_death_cycle(<<"q1">>, Msg2)), ?assert(mc:is_death_cycle(<<"q2">>, Msg2), "A queue that dead letters to itself due to expired is considered a cycle."), ?assertNot(mc:is_death_cycle(<<"q3">>, Msg2)), - Msg3 = mc:record_death(expired, <<"q3">>, Msg2, #{}), + Msg3 = mc:record_death(expired, <<"q3">>, Msg2), ?assertNot(mc:is_death_cycle(<<"q1">>, Msg3)), ?assert(mc:is_death_cycle(<<"q2">>, Msg3)), ?assert(mc:is_death_cycle(<<"q3">>, Msg3)), - Msg4 = mc:record_death(rejected, <<"q1">>, Msg3, #{}), + Msg4 = mc:record_death(rejected, <<"q1">>, Msg3), ?assertNot(mc:is_death_cycle(<<"q1">>, Msg4)), ?assertNot(mc:is_death_cycle(<<"q2">>, Msg4)), ?assertNot(mc:is_death_cycle(<<"q3">>, Msg4)), - Msg5 = mc:record_death(expired, <<"q2">>, Msg4, #{}), + Msg5 = mc:record_death(expired, <<"q2">>, Msg4), ?assertNot(mc:is_death_cycle(<<"q1">>, Msg5)), ?assert(mc:is_death_cycle(<<"q2">>, Msg5)), ?assertNot(mc:is_death_cycle(<<"q3">>, Msg5)), diff --git a/deps/rabbit/test/message_containers_deaths_v2_SUITE.erl b/deps/rabbit/test/message_containers_deaths_v2_SUITE.erl deleted file mode 100644 index abbb3abcebc8..000000000000 --- a/deps/rabbit/test/message_containers_deaths_v2_SUITE.erl +++ /dev/null @@ -1,124 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. - -%% This SUITE should be deleted when feature flag message_containers_deaths_v2 becomes required. --module(message_containers_deaths_v2_SUITE). - --define(FEATURE_FLAG, message_containers_deaths_v2). - --compile([export_all, nowarn_export_all]). - --include_lib("eunit/include/eunit.hrl"). --include_lib("amqp_client/include/amqp_client.hrl"). --include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). - -all() -> - [ - {group, cluster_size_1} - ]. - -groups() -> - [ - {cluster_size_1, [], [enable_feature_flag]} - ]. - -init_per_suite(Config) -> - rabbit_ct_helpers:log_environment(), - rabbit_ct_helpers:run_setup_steps(Config, []). - -end_per_suite(Config) -> - rabbit_ct_helpers:run_teardown_steps(Config). - -init_per_group(_Group, Config0) -> - Config = rabbit_ct_helpers:merge_app_env( - Config0, {rabbit, [{forced_feature_flags_on_init, []}]}), - rabbit_ct_helpers:run_steps(Config, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()). - -end_per_group(_Group, Config) -> - rabbit_ct_helpers:run_steps(Config, - rabbit_ct_client_helpers:teardown_steps() ++ - rabbit_ct_broker_helpers:teardown_steps()). - -enable_feature_flag(Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config), - Q1 = <<"q1">>, - Q2 = <<"q2">>, - #'queue.declare_ok'{} = amqp_channel:call( - Ch, #'queue.declare'{queue = Q1, - arguments = [{<<"x-dead-letter-exchange">>, longstr, <<>>}, - {<<"x-dead-letter-routing-key">>, longstr, Q2}, - {<<"x-message-ttl">>, long, 3}]}), - #'queue.declare_ok'{} = amqp_channel:call( - Ch, #'queue.declare'{queue = Q2, - arguments = [{<<"x-dead-letter-exchange">>, longstr, <<>>}, - {<<"x-dead-letter-routing-key">>, longstr, Q1}]}), - P1 = <<"payload 1">>, - P2 = <<"payload 2">>, - amqp_channel:call(Ch, - #'basic.publish'{routing_key = Q1}, - #amqp_msg{payload = P1}), - ?assertNot(rabbit_ct_broker_helpers:is_feature_flag_enabled(Config, ?FEATURE_FLAG)), - ?assertEqual(ok, rabbit_ct_broker_helpers:enable_feature_flag(Config, ?FEATURE_FLAG)), - amqp_channel:call(Ch, - #'basic.publish'{routing_key = Q1}, - #amqp_msg{payload = P2}), - %% We now have 2 messages in Q2 with different values for the mc deaths annotation for v1 and v2. - - reject(Ch, Q2, P1), - reject(Ch, Q2, P2), - reject(Ch, Q2, P1), - reject(Ch, Q2, P2), - - {#'basic.get_ok'{}, #amqp_msg{props = #'P_basic'{headers = H1}}} = - ?awaitMatch({#'basic.get_ok'{}, - #amqp_msg{payload = P1}}, - amqp_channel:call(Ch, #'basic.get'{queue = Q2}), - 5000), - - {#'basic.get_ok'{}, #amqp_msg{props = #'P_basic'{headers = H2}}} = - ?awaitMatch({#'basic.get_ok'{}, - #amqp_msg{payload = P2}}, - amqp_channel:call(Ch, #'basic.get'{queue = Q2}), - 5000), - - lists:foreach( - fun(Headers) -> - ?assertEqual({longstr, <<"expired">>}, rabbit_misc:table_lookup(Headers, <<"x-first-death-reason">>)), - ?assertEqual({longstr, Q1}, rabbit_misc:table_lookup(Headers, <<"x-first-death-queue">>)), - ?assertEqual({longstr, <<>>}, rabbit_misc:table_lookup(Headers, <<"x-first-death-exchange">>)), - ?assertEqual({longstr, <<"expired">>}, rabbit_misc:table_lookup(Headers, <<"x-last-death-reason">>)), - ?assertEqual({longstr, Q1}, rabbit_misc:table_lookup(Headers, <<"x-last-death-queue">>)), - ?assertEqual({longstr, <<>>}, rabbit_misc:table_lookup(Headers, <<"x-last-death-exchange">>)), - - {array, [{table, Death1}, - {table, Death2}]} = rabbit_misc:table_lookup(Headers, <<"x-death">>), - - ?assertEqual({longstr, Q1}, rabbit_misc:table_lookup(Death1, <<"queue">>)), - ?assertEqual({longstr, <<"expired">>}, rabbit_misc:table_lookup(Death1, <<"reason">>)), - ?assertMatch({timestamp, _}, rabbit_misc:table_lookup(Death1, <<"time">>)), - ?assertEqual({longstr, <<>>}, rabbit_misc:table_lookup(Death1, <<"exchange">>)), - ?assertEqual({long, 3}, rabbit_misc:table_lookup(Death1, <<"count">>)), - ?assertEqual({array, [{longstr, Q1}]}, rabbit_misc:table_lookup(Death1, <<"routing-keys">>)), - - ?assertEqual({longstr, Q2}, rabbit_misc:table_lookup(Death2, <<"queue">>)), - ?assertEqual({longstr, <<"rejected">>}, rabbit_misc:table_lookup(Death2, <<"reason">>)), - ?assertMatch({timestamp, _}, rabbit_misc:table_lookup(Death2, <<"time">>)), - ?assertEqual({longstr, <<>>}, rabbit_misc:table_lookup(Death2, <<"exchange">>)), - ?assertEqual({long, 2}, rabbit_misc:table_lookup(Death2, <<"count">>)), - ?assertEqual({array, [{longstr, Q2}]}, rabbit_misc:table_lookup(Death2, <<"routing-keys">>)) - end, [H1, H2]), - ok. - -reject(Ch, Queue, Payload) -> - {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{}} = - ?awaitMatch({#'basic.get_ok'{}, - #amqp_msg{payload = Payload}}, - amqp_channel:call(Ch, #'basic.get'{queue = Queue}), - 5000), - amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag, - requeue = false}).