From ea94a77fa3270e93b99bd962576e689e95d4a55c Mon Sep 17 00:00:00 2001 From: David Ansari Date: Fri, 19 Apr 2024 15:01:35 +0200 Subject: [PATCH] Interpret x-death header from AMQP 0.9.1 client Fixes #10709 Fixes #11331 This commit fixes the following regression which worked in 3.12.x, but stopped working in 3.13.0 - 3.13.2: ``` AMQP 0.9.1 client --publish--> Q --dead-letter--> DLQ --consume--> AMQP 0.9.1 client (death count is now 1) --republish-same-message-with-headers-as-just-received--> Q --dead-letter--> DLQ --consume --> AMQP 0.9.1 (death count is now 1, but should be 2) ``` The reason this behaviour stopped to work in 3.13.0 is that the broker won't specially interpret x-headers in general, and the x-death header specifically in this case anymore. In other words, the new desired 3.13 behaviour with message containers is that "x-headers belong to the broker". While this is correct, it does break client applications which depended on the previous use case. One simple fix is that the client application does not re-publish with the x-death header, but instead sets its own custom count header to determine the number of times it retries. This commit will only be packported to v3.13.x branch. In other words, 4.0 won't interpret x-headers as done in 3.13.0 - 3.13.2. The reason we backport this commit to v3.13.x is that the Spring documentation expliclity recommends re-publishing the message with x-death header being set: https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-rabbit/3.0.6.RELEASE/reference/html/spring-cloud-stream-binder-rabbit.html#_retry_with_the_rabbitmq_binder --- deps/rabbit/BUILD.bazel | 3 + deps/rabbit/app.bzl | 1 + deps/rabbit/include/mc.hrl | 5 + deps/rabbit/src/mc.erl | 10 +- deps/rabbit/src/mc_amqp.erl | 12 +- deps/rabbit/src/mc_amqpl.erl | 204 +++++++++++------- deps/rabbit/src/rabbit_dead_letter.erl | 6 +- deps/rabbit/src/rabbit_fifo_dlx_worker.erl | 6 +- deps/rabbit/test/dead_lettering_SUITE.erl | 56 ++++- deps/rabbit/test/mc_unit_SUITE.erl | 71 +++++- deps/rabbit/test/per_node_limit_SUITE.erl | 2 +- ...c_mirrored_queue_sync_throttling_SUITE.erl | 6 +- deps/rabbitmq_mqtt/src/mc_mqtt.erl | 4 +- 13 files changed, 266 insertions(+), 120 deletions(-) diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index c1dc651d9ae2..7a098124180e 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -815,6 +815,9 @@ rabbitmq_suite( rabbitmq_suite( name = "mc_unit_SUITE", size = "small", + runtime_deps = [ + "@meck//:erlang_app", + ], deps = [ "//deps/amqp10_common:erlang_app", "//deps/rabbit_common:erlang_app", diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 7c32070ba62c..b4e1cd7d03ba 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -1721,6 +1721,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"): testonly = True, srcs = ["test/unit_classic_mirrored_queue_sync_throttling_SUITE.erl"], outs = ["test/unit_classic_mirrored_queue_sync_throttling_SUITE.beam"], + hdrs = ["include/mc.hrl"], app_name = "rabbit", erlc_opts = "//:test_erlc_opts", deps = ["//deps/rabbit_common:erlang_app"], diff --git a/deps/rabbit/include/mc.hrl b/deps/rabbit/include/mc.hrl index abe38fb7c617..904302ce3282 100644 --- a/deps/rabbit/include/mc.hrl +++ b/deps/rabbit/include/mc.hrl @@ -14,6 +14,11 @@ -define(ANN_PRIORITY, p). -define(FF_MC_DEATHS_V2, message_containers_deaths_v2). +-define(MC_ENV, + case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of + true -> #{}; + false -> #{?FF_MC_DEATHS_V2 => false} + end). -type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}. -type death_anns() :: #{%% timestamp of the first time this message diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 022d373d4fb4..ec1156343441 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -93,7 +93,7 @@ %% protocol specific init function %% returns a map of additional annotations to merge into the %% protocol generic annotations map, e.g. ttl, priority and durable --callback init(term()) -> +-callback init(term(), environment()) -> {proto_state(), annotations()}. %% the size of the payload and other meta data respectively @@ -147,7 +147,7 @@ init(Proto, Data, Anns0, Env) when is_atom(Proto) andalso is_map(Anns0) andalso is_map(Env) -> - {ProtoData, ProtoAnns} = Proto:init(Data), + {ProtoData, ProtoAnns} = Proto:init(Data, Env), Anns = case maps:size(Env) == 0 of true -> Anns0; @@ -389,9 +389,9 @@ record_death(Reason, SourceQueue, [{Key, NewDeath} | Deaths0] end end, - Anns0#{<<"x-last-death-reason">> := atom_to_binary(Reason), - <<"x-last-death-queue">> := SourceQueue, - <<"x-last-death-exchange">> := Exchange, + Anns0#{<<"x-last-death-reason">> => atom_to_binary(Reason), + <<"x-last-death-queue">> => SourceQueue, + <<"x-last-death-exchange">> => Exchange, deaths := Deaths}; _ -> Deaths = case Env of diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index f2c42f318329..28b3be127246 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -5,7 +5,7 @@ -include("mc.hrl"). -export([ - init/1, + init/2, size/1, x_header/2, property/2, @@ -59,18 +59,18 @@ ]). %% mc implementation -init(Sections) when is_list(Sections) -> +init(Sections, Env) when is_list(Sections) -> Msg = decode(Sections, #msg{}), - init(Msg); -init(#msg{} = Msg) -> + init(Msg, Env); +init(#msg{} = Msg, _Env) -> %% TODO: as the essential annotations, durable, priority, ttl and delivery_count %% is all we are interested in it isn't necessary to keep hold of the %% incoming AMQP header inside the state Anns = essential_properties(Msg), {Msg, Anns}. -convert_from(?MODULE, Sections, _Env) -> - element(1, init(Sections)); +convert_from(?MODULE, Sections, Env) -> + element(1, init(Sections, Env)); convert_from(_SourceProto, _, _Env) -> not_implemented. diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index 7462edae7143..f5d13c4456f6 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -8,7 +8,7 @@ %% mc -export([ - init/1, + init/2, size/1, x_header/2, routing_headers/2, @@ -48,10 +48,10 @@ ]). %% mc implementation -init(#content{} = Content0) -> +init(#content{} = Content0, Env) -> Content1 = rabbit_binary_parser:ensure_content_decoded(Content0), %% project essential properties into annotations - Anns = essential_properties(Content1), + Anns = essential_properties(Content1, Env), Content = strip_header(Content1, ?DELETED_HEADER), {Content, Anns}. @@ -483,7 +483,8 @@ message(#resource{name = ExchangeNameBin}, RoutingKey, {ok, mc:init(?MODULE, Content, Anns#{?ANN_ROUTING_KEYS => [RoutingKey | HeaderRoutes], - ?ANN_EXCHANGE => ExchangeNameBin})} + ?ANN_EXCHANGE => ExchangeNameBin}, + ?MC_ENV)} end; message(#resource{} = XName, RoutingKey, #content{} = Content, Anns, false) -> @@ -514,75 +515,6 @@ from_basic_message(#basic_message{content = Content, %% Internal -deaths_to_headers(Deaths, Headers0) -> - Infos = case Deaths of - #deaths{records = Records} -> - %% sort records by the last timestamp - List = lists:sort( - fun({_, #death{anns = #{last_time := L1}}}, - {_, #death{anns = #{last_time := L2}}}) -> - L1 =< L2 - end, maps:to_list(Records)), - lists:foldl(fun(Record, Acc) -> - Table = death_table(Record), - [Table | Acc] - end, [], List); - _ -> - lists:map(fun death_table/1, Deaths) - end, - rabbit_misc:set_table_value(Headers0, <<"x-death">>, array, Infos). - -convert_from_amqp_deaths({array, map, Maps}) -> - L = lists:map( - fun({map, KvList}) -> - {Ttl, KvList1} = case KvList of - [{{symbol, <<"ttl">>}, {uint, Ttl0}} | Tail] -> - {Ttl0, Tail}; - _ -> - {undefined, KvList} - end, - [ - {{symbol, <<"queue">>}, {utf8, Queue}}, - {{symbol, <<"reason">>}, {symbol, Reason}}, - {{symbol, <<"count">>}, {ulong, Count}}, - {{symbol, <<"first-time">>}, {timestamp, FirstTime}}, - {{symbol, <<"last-time">>}, {timestamp, _LastTime}}, - {{symbol, <<"exchange">>}, {utf8, Exchange}}, - {{symbol, <<"routing-keys">>}, {array, utf8, RKeys0}} - ] = KvList1, - RKeys = [Key || {utf8, Key} <- RKeys0], - death_table(Queue, Reason, Exchange, RKeys, Count, FirstTime, Ttl) - end, Maps), - {true, {<<"x-death">>, array, L}}; -convert_from_amqp_deaths(_IgnoreUnknownValue) -> - false. - -death_table({{QName, Reason}, - #death{exchange = Exchange, - routing_keys = RoutingKeys, - count = Count, - anns = DeathAnns = #{first_time := FirstTime}}}) -> - death_table(QName, Reason, Exchange, RoutingKeys, Count, FirstTime, - maps:get(ttl, DeathAnns, undefined)). - -death_table(QName, Reason, Exchange, RoutingKeys, Count, FirstTime, Ttl) -> - L0 = [ - {<<"count">>, long, Count}, - {<<"reason">>, longstr, rabbit_data_coercion:to_binary(Reason)}, - {<<"queue">>, longstr, QName}, - {<<"time">>, timestamp, FirstTime div 1000}, - {<<"exchange">>, longstr, Exchange}, - {<<"routing-keys">>, array, [{longstr, Key} || Key <- RoutingKeys]} - ], - L = case Ttl of - undefined -> - L0; - _ -> - Expiration = integer_to_binary(Ttl), - [{<<"original-expiration">>, longstr, Expiration} | L0] - end, - {table, L}. - strip_header(#content{properties = #'P_basic'{headers = undefined}} = DecodedContent, _Key) -> DecodedContent; @@ -732,11 +664,11 @@ message_id({utf8, S}, HKey, H0) -> message_id(undefined, _HKey, H) -> {H, undefined}. -essential_properties(#content{} = C) -> +essential_properties(#content{properties = Props}, Env) -> #'P_basic'{delivery_mode = Mode, priority = Priority, timestamp = TimestampRaw, - headers = Headers} = Props = C#content.properties, + headers = Headers} = Props, {ok, MsgTTL} = rabbit_basic:parse_expiration(Props), Timestamp = case TimestampRaw of undefined -> @@ -752,6 +684,8 @@ essential_properties(#content{} = C) -> _ -> undefined end, + Deaths = headers_to_deaths(Headers, Env), + maps_put_truthy( ?ANN_PRIORITY, Priority, maps_put_truthy( @@ -762,7 +696,125 @@ essential_properties(#content{} = C) -> ?ANN_DURABLE, Durable, maps_put_truthy( bcc, BccKeys, - #{}))))). + maps_put_truthy( + deaths, Deaths, + #{})))))). + +headers_to_deaths(_, #{?FF_MC_DEATHS_V2 := false}) -> + undefined; +headers_to_deaths(undefined, _) -> + undefined; +headers_to_deaths(Headers, _) -> + case lists:keymember(<<"x-death">>, 1, Headers) of + true -> + case rabbit_misc:amqp_table(Headers) of + #{<<"x-death">> := XDeathList} + when is_list(XDeathList) -> + recover_deaths(XDeathList, []); + _ -> + undefined + end; + false -> + undefined + end. + +recover_deaths([], Acc) -> + lists:reverse(Acc); +recover_deaths([Map = #{<<"exchange">> := Exchange, + <<"queue">> := Queue, + <<"routing-keys">> := RKeys, + <<"reason">> := ReasonBin, + <<"count">> := Count, + <<"time">> := Ts} | Rem], Acc0) -> + Reason = binary_to_existing_atom(ReasonBin), + DeathAnns0 = #{first_time => Ts, + %% Given that this timestamp is absent in the AMQP 0.9.1 + %% x-death header, the last_time we set here is incorrect + %% if the message was dead lettered more than one time. + last_time => Ts}, + DeathAnns = case Map of + #{<<"original-expiration">> := Exp} -> + DeathAnns0#{ttl => binary_to_integer(Exp)}; + _ -> + DeathAnns0 + end, + Acc = [{{Queue, Reason}, + #death{anns = DeathAnns, + exchange = Exchange, + count = Count, + routing_keys = RKeys}} | Acc0], + recover_deaths(Rem, Acc); +recover_deaths([_IgnoreInvalid | Rem], Acc) -> + recover_deaths(Rem, Acc). + +deaths_to_headers(Deaths, Headers0) -> + Infos = case Deaths of + #deaths{records = Records} -> + %% sort records by the last timestamp + List = lists:sort( + fun({_, #death{anns = #{last_time := L1}}}, + {_, #death{anns = #{last_time := L2}}}) -> + L1 =< L2 + end, maps:to_list(Records)), + lists:foldl(fun(Record, Acc) -> + Table = death_table(Record), + [Table | Acc] + end, [], List); + _ -> + lists:map(fun death_table/1, Deaths) + end, + rabbit_misc:set_table_value(Headers0, <<"x-death">>, array, Infos). + +convert_from_amqp_deaths({array, map, Maps}) -> + L = lists:map( + fun({map, KvList}) -> + {Ttl, KvList1} = case KvList of + [{{symbol, <<"ttl">>}, {uint, Ttl0}} | Tail] -> + {Ttl0, Tail}; + _ -> + {undefined, KvList} + end, + [ + {{symbol, <<"queue">>}, {utf8, Queue}}, + {{symbol, <<"reason">>}, {symbol, Reason}}, + {{symbol, <<"count">>}, {ulong, Count}}, + {{symbol, <<"first-time">>}, {timestamp, FirstTime}}, + {{symbol, <<"last-time">>}, {timestamp, _LastTime}}, + {{symbol, <<"exchange">>}, {utf8, Exchange}}, + {{symbol, <<"routing-keys">>}, {array, utf8, RKeys0}} + ] = KvList1, + RKeys = [Key || {utf8, Key} <- RKeys0], + death_table(Queue, Reason, Exchange, RKeys, Count, FirstTime, Ttl) + end, Maps), + {true, {<<"x-death">>, array, L}}; +convert_from_amqp_deaths(_IgnoreUnknownValue) -> + false. + +death_table({{QName, Reason}, + #death{exchange = Exchange, + routing_keys = RoutingKeys, + count = Count, + anns = DeathAnns = #{first_time := FirstTime}}}) -> + death_table(QName, Reason, Exchange, RoutingKeys, Count, FirstTime, + maps:get(ttl, DeathAnns, undefined)). + +death_table(QName, Reason, Exchange, RoutingKeys, Count, FirstTime, Ttl) -> + L0 = [ + {<<"count">>, long, Count}, + {<<"reason">>, longstr, rabbit_data_coercion:to_binary(Reason)}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, FirstTime div 1000}, + {<<"exchange">>, longstr, Exchange}, + {<<"routing-keys">>, array, [{longstr, Key} || Key <- RoutingKeys]} + ], + L = case Ttl of + undefined -> + L0; + _ -> + Expiration = integer_to_binary(Ttl), + [{<<"original-expiration">>, longstr, Expiration} | L0] + end, + {table, L}. %% headers that are added as annotations during conversions is_internal_header(<<"x-basic-", _/binary>>) -> diff --git a/deps/rabbit/src/rabbit_dead_letter.erl b/deps/rabbit/src/rabbit_dead_letter.erl index a8c6b4515eda..e42ba81a91bf 100644 --- a/deps/rabbit/src/rabbit_dead_letter.erl +++ b/deps/rabbit/src/rabbit_dead_letter.erl @@ -31,11 +31,7 @@ publish(Msg0, Reason, #exchange{name = XName} = DLX, RK, _ -> [RK] end, - Env = case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of - true -> #{}; - false -> #{?FF_MC_DEATHS_V2 => false} - end, - Msg1 = mc:record_death(Reason, SourceQName, Msg0, Env), + Msg1 = mc:record_death(Reason, SourceQName, Msg0, ?MC_ENV), {Ttl, Msg2} = mc:take_annotation(dead_letter_ttl, Msg1), Msg3 = mc:set_ttl(Ttl, Msg2), Msg4 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg3), diff --git a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl index f96d8de1e491..b2a7f46fc50d 100644 --- a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl +++ b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl @@ -324,11 +324,7 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason, _ -> [RKey] end, - Env = case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of - true -> #{}; - false -> #{?FF_MC_DEATHS_V2 => false} - end, - Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg, Env), + Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg, ?MC_ENV), Msg1 = mc:set_ttl(undefined, Msg0), Msg2 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg1), Msg = mc:set_annotation(?ANN_EXCHANGE, DLXName, Msg2), diff --git a/deps/rabbit/test/dead_lettering_SUITE.erl b/deps/rabbit/test/dead_lettering_SUITE.erl index 10622b9fd622..9fbee5fa8e1c 100644 --- a/deps/rabbit/test/dead_lettering_SUITE.erl +++ b/deps/rabbit/test/dead_lettering_SUITE.erl @@ -36,7 +36,7 @@ groups() -> dead_letter_missing_exchange, dead_letter_routing_key, dead_letter_headers_should_be_appended_for_each_event, - dead_letter_headers_should_not_be_appended_for_republish, + dead_letter_headers_should_be_appended_for_republish, dead_letter_routing_key_header_CC, dead_letter_routing_key_header_BCC, dead_letter_routing_key_cycle_max_length, @@ -56,7 +56,8 @@ groups() -> dead_letter_routing_key_cycle_ttl, dead_letter_headers_reason_expired, dead_letter_headers_reason_expired_per_message, - dead_letter_extra_bcc], + dead_letter_extra_bcc, + x_death_header_from_amqpl_client], DisabledMetricTests = [metric_maxlen, metric_rejected, metric_expired_queue_msg_ttl, @@ -200,11 +201,6 @@ init_per_testcase(T, Config) %% * stream is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11173 Skip end; -init_per_testcase(dead_letter_headers_should_not_be_appended_for_republish = T, Config) -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers) of - ok -> init_per_testcase0(T, Config); - {skip, _} = Skip -> Skip - end; init_per_testcase(Testcase, Config) -> init_per_testcase0(Testcase, Config). @@ -1218,7 +1214,7 @@ dead_letter_headers_should_be_appended_for_each_event(Config) -> ?assertEqual({longstr, Dlx1Name}, rabbit_misc:table_lookup(DeathDlx, <<"queue">>)), ok = rabbit_ct_client_helpers:close_connection(Conn). -dead_letter_headers_should_not_be_appended_for_republish(Config) -> +dead_letter_headers_should_be_appended_for_republish(Config) -> %% here we (re-)publish a message with the DL headers already set {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Args = ?config(queue_args, Config), @@ -1263,7 +1259,7 @@ dead_letter_headers_should_not_be_appended_for_republish(Config) -> props = #'P_basic'{headers = Headers2}}} = amqp_channel:call(Ch, #'basic.get'{queue = DlxName}), - {array, [{table, Death2}]} = rabbit_misc:table_lookup(Headers2, <<"x-death">>), + {array, [{table, Death2}, {table, _Death1}]} = rabbit_misc:table_lookup(Headers2, <<"x-death">>), ?assertEqual({longstr, <<"expired">>}, rabbit_misc:table_lookup(Death2, <<"reason">>)), ok = rabbit_ct_client_helpers:close_connection(Conn). @@ -1581,6 +1577,48 @@ dead_letter_extra_bcc(Config) -> [_] = consume(Ch, ExtraBCCQ, [P]), ok. +%% Test case for https://github.com/rabbitmq/rabbitmq-server/issues/10709 +%% The Spring Cloud Stream RabbitMQ Binder Reference Guide recommends relying on the +%% count field of the x-death header to determine how often a message was dead lettered +%% in a loop involving RabbitMQ and an AMQP 0.9.1 client. +%% This test therefore asserts that RabbitMQ interprets the x-death header if an AMQP 0.9.1 +%% clients (re)publishes a message with the x-death header set. +%% This test case should only pass up to 3.13. +%% Starting with 4.0, RabbitMQ won't interpret x-headers sent from clients anymore in any +%% special way as x-headers "belong to the broker". +x_death_header_from_amqpl_client(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-message-ttl">>, long, 0}]), + + Payload = <<"my payload">>, + ok = amqp_channel:call(Ch, + #'basic.publish'{routing_key = QName}, + #amqp_msg{payload = Payload}), + wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), + { + #'basic.get_ok'{}, + #amqp_msg{props = #'P_basic'{headers = Headers1}} = Msg1 + } = amqp_channel:call(Ch, #'basic.get'{queue = DLXQName, + no_ack = true}), + {array, [{table, Death1}]} = rabbit_misc:table_lookup(Headers1, <<"x-death">>), + ?assertEqual({long, 1}, rabbit_misc:table_lookup(Death1, <<"count">>)), + + ok = amqp_channel:call(Ch, + #'basic.publish'{routing_key = QName}, + %% Re-publish the same message we received including the x-death header. + Msg1), + wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), + { + #'basic.get_ok'{}, + #amqp_msg{payload = Payload, + props = #'P_basic'{headers = Headers2}} + } = amqp_channel:call(Ch, #'basic.get'{queue = DLXQName, + no_ack = true}), + {array, [{table, Death2}]} = rabbit_misc:table_lookup(Headers2, <<"x-death">>), + ?assertEqual({long, 2}, rabbit_misc:table_lookup(Death2, <<"count">>)). + set_queue_options(QName, Options) -> rabbit_amqqueue:update(rabbit_misc:r(<<"/">>, queue, QName), fun(Q) -> diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index 4e7c91391c28..09580ed69d5c 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -31,6 +31,7 @@ all_tests() -> amqpl_table_x_header_array_of_tbls, amqpl_death_v1_records, amqpl_death_v2_records, + amqpl_parse_x_death, is_death_cycle, amqpl_amqp_bin_amqpl, amqpl_cc_amqp_bin_amqpl, @@ -239,6 +240,59 @@ amqpl_death_records(Env) -> ?assertMatch({_, longstr, <<"q1">>}, header(<<"queue">>, T2b)), ok. +%% Test case for https://github.com/rabbitmq/rabbitmq-server/issues/10709 +%% with feature flag message_containers_deaths_v2 enabled. +amqpl_parse_x_death(_Config) -> + Q = <<"my queue">>, + DLQ = <<"my dead letter queue">>, + + Content0 = #content{class_id = 60, + properties = #'P_basic'{headers = [], + expiration = <<"9999">>}, + payload_fragments_rev = [<<"data">>]}, + Msg0 = mc:prepare(store, mc:init(mc_amqpl, Content0, annotations())), + Msg1 = mc:record_death(rejected, Q, Msg0, #{}), + + %% Roundtrip simulates message being sent to and received from AMQP 0.9.1 client. + Content1 = mc:protocol_state(Msg1), + Msg2 = mc:init(mc_amqpl, Content1, annotations()), + + ?assertEqual([Q], mc:death_queue_names(Msg2)), + ?assertEqual(false, mc:is_death_cycle(Q, Msg2)), + + + #content{properties = #'P_basic'{headers = H1}} = mc:protocol_state(Msg2), + ?assertMatch({_, longstr, Q}, header(<<"x-first-death-queue">>, H1)), + ?assertMatch({_, longstr, Q}, header(<<"x-last-death-queue">>, H1)), + ?assertMatch({_, longstr, <<"exch">>}, header(<<"x-first-death-exchange">>, H1)), + ?assertMatch({_, longstr, <<"exch">>}, header(<<"x-last-death-exchange">>, H1)), + ?assertMatch({_, longstr, <<"rejected">>}, header(<<"x-first-death-reason">>, H1)), + ?assertMatch({_, longstr, <<"rejected">>}, header(<<"x-last-death-reason">>, H1)), + {_, array, [{table, T1}]} = header(<<"x-death">>, H1), + ?assertMatch({_, long, 1}, header(<<"count">>, T1)), + ?assertMatch({_, longstr, <<"rejected">>}, header(<<"reason">>, T1)), + ?assertMatch({_, longstr, Q}, header(<<"queue">>, T1)), + ?assertMatch({_, longstr, <<"exch">>}, header(<<"exchange">>, T1)), + ?assertMatch({_, timestamp, _}, header(<<"time">>, T1)), + ?assertMatch({_, array, [{longstr, <<"apple">>}]}, header(<<"routing-keys">>, T1)), + ?assertMatch({_, longstr, <<"9999">>}, header(<<"original-expiration">>, T1)), + + Msg3 = mc:record_death(expired, DLQ, Msg2, #{}), + + #content{properties = #'P_basic'{headers = H2}} = mc:protocol_state(Msg3), + {_, array, [{table, T2a}, {table, T2b}]} = header(<<"x-death">>, H2), + ?assertMatch({_, longstr, DLQ}, header(<<"queue">>, T2a)), + ?assertMatch({_, longstr, Q}, header(<<"queue">>, T2b)), + + Msg4 = mc:record_death(rejected, Q, Msg3, #{}), + + %% Roundtrip simulates message being sent to and received from AMQP 0.9.1 client. + Content2 = mc:protocol_state(Msg4), + Msg5 = mc:init(mc_amqpl, Content2, annotations()), + + %% We expect the list to be ordered by death recency. + ?assertEqual([Q, DLQ], mc:death_queue_names(Msg5)). + is_death_cycle(_Config) -> Content = #content{class_id = 60, properties = #'P_basic'{headers = []}, @@ -430,6 +484,10 @@ amqpl_amqp_bin_amqpl(_Config) -> ok. amqpl_cc_amqp_bin_amqpl(_Config) -> + Mod = rabbit_feature_flags, + meck:new(Mod, [no_link]), + meck:expect(Mod, is_enabled, fun (message_containers_deaths_v2) -> true end), + Headers = [{<<"CC">>, array, [{longstr, <<"q1">>}, {longstr, <<"q2">>}]}], Props = #'P_basic'{headers = Headers}, @@ -724,15 +782,10 @@ amqp_amqpl_amqp_bodies(_Config) -> [begin EncodedPayload = amqp10_encode_bin(Payload), - - Ex = #resource{virtual_host = <<"/">>, - kind = exchange, - name = <<"ex">>}, - {ok, LegacyMsg} = mc_amqpl:message(Ex, <<"rkey">>, - #content{payload_fragments_rev = - lists:reverse(EncodedPayload), - properties = Props}, - #{}, true), + LegacyMsg = mc:init(mc_amqpl, + #content{properties = Props, + payload_fragments_rev = lists:reverse(EncodedPayload)}, + annotations()), AmqpMsg = mc:convert(mc_amqp, LegacyMsg), %% drop any non body sections diff --git a/deps/rabbit/test/per_node_limit_SUITE.erl b/deps/rabbit/test/per_node_limit_SUITE.erl index 00a51b415373..98990c8dc364 100644 --- a/deps/rabbit/test/per_node_limit_SUITE.erl +++ b/deps/rabbit/test/per_node_limit_SUITE.erl @@ -156,7 +156,7 @@ channel_consumers_limit(Config) -> ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost), Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost), {ok, Ch} = open_channel(Conn1), - Q = <<"Q">>, Tag = <<"Tag">>, + Q = <<"Q">>, {ok, _} = consume(Ch, Q, <<"Tag1">>), {ok, _} = consume(Ch, Q, <<"Tag2">>), diff --git a/deps/rabbit/test/unit_classic_mirrored_queue_sync_throttling_SUITE.erl b/deps/rabbit/test/unit_classic_mirrored_queue_sync_throttling_SUITE.erl index 8a61c6a3cc3d..51700c448974 100644 --- a/deps/rabbit/test/unit_classic_mirrored_queue_sync_throttling_SUITE.erl +++ b/deps/rabbit/test/unit_classic_mirrored_queue_sync_throttling_SUITE.erl @@ -2,6 +2,7 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit_common/include/rabbit_framing.hrl"). +-include_lib("rabbit/include/mc.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -65,8 +66,9 @@ append_to_acc(_Config) -> priority = 2}, payload_fragments_rev = [[<<"1234567890">>]] %% 10 bytes }, - ExName = rabbit_misc:r(<<>>, exchange, <<>>), - {ok, Msg} = mc_amqpl:message(ExName, <<>>, Content, #{id => 1}, true), + Msg = mc:init(mc_amqpl, Content, #{id => 1, + ?ANN_EXCHANGE => <<>>, + ?ANN_ROUTING_KEYS => [<<>>]}), BQDepth = 10, SyncThroughput_0 = 0, FoldAcc1 = {[], 0, {0, erlang:monotonic_time(), SyncThroughput_0}, {0, BQDepth}, erlang:monotonic_time()}, diff --git a/deps/rabbitmq_mqtt/src/mc_mqtt.erl b/deps/rabbitmq_mqtt/src/mc_mqtt.erl index 981ecbdc5f2c..21e2f5d78937 100644 --- a/deps/rabbitmq_mqtt/src/mc_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/mc_mqtt.erl @@ -11,7 +11,7 @@ -define(CONTENT_TYPE_AMQP, <<"message/vnd.rabbitmq.amqp">>). -export([ - init/1, + init/2, size/1, x_header/2, property/2, @@ -23,7 +23,7 @@ ]). init(Msg = #mqtt_msg{qos = Qos, - props = Props}) -> + props = Props}, _Env) -> Anns0 = #{?ANN_DURABLE => durable(Qos)}, Anns1 = case Props of #{'Message-Expiry-Interval' := Seconds} ->