diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index c1dc651d9ae2..7a098124180e 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -815,6 +815,9 @@ rabbitmq_suite( rabbitmq_suite( name = "mc_unit_SUITE", size = "small", + runtime_deps = [ + "@meck//:erlang_app", + ], deps = [ "//deps/amqp10_common:erlang_app", "//deps/rabbit_common:erlang_app", diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 7c32070ba62c..b4e1cd7d03ba 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -1721,6 +1721,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"): testonly = True, srcs = ["test/unit_classic_mirrored_queue_sync_throttling_SUITE.erl"], outs = ["test/unit_classic_mirrored_queue_sync_throttling_SUITE.beam"], + hdrs = ["include/mc.hrl"], app_name = "rabbit", erlc_opts = "//:test_erlc_opts", deps = ["//deps/rabbit_common:erlang_app"], diff --git a/deps/rabbit/include/mc.hrl b/deps/rabbit/include/mc.hrl index abe38fb7c617..904302ce3282 100644 --- a/deps/rabbit/include/mc.hrl +++ b/deps/rabbit/include/mc.hrl @@ -14,6 +14,11 @@ -define(ANN_PRIORITY, p). -define(FF_MC_DEATHS_V2, message_containers_deaths_v2). +-define(MC_ENV, + case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of + true -> #{}; + false -> #{?FF_MC_DEATHS_V2 => false} + end). -type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}. -type death_anns() :: #{%% timestamp of the first time this message diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 022d373d4fb4..ec1156343441 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -93,7 +93,7 @@ %% protocol specific init function %% returns a map of additional annotations to merge into the %% protocol generic annotations map, e.g. ttl, priority and durable --callback init(term()) -> +-callback init(term(), environment()) -> {proto_state(), annotations()}. %% the size of the payload and other meta data respectively @@ -147,7 +147,7 @@ init(Proto, Data, Anns0, Env) when is_atom(Proto) andalso is_map(Anns0) andalso is_map(Env) -> - {ProtoData, ProtoAnns} = Proto:init(Data), + {ProtoData, ProtoAnns} = Proto:init(Data, Env), Anns = case maps:size(Env) == 0 of true -> Anns0; @@ -389,9 +389,9 @@ record_death(Reason, SourceQueue, [{Key, NewDeath} | Deaths0] end end, - Anns0#{<<"x-last-death-reason">> := atom_to_binary(Reason), - <<"x-last-death-queue">> := SourceQueue, - <<"x-last-death-exchange">> := Exchange, + Anns0#{<<"x-last-death-reason">> => atom_to_binary(Reason), + <<"x-last-death-queue">> => SourceQueue, + <<"x-last-death-exchange">> => Exchange, deaths := Deaths}; _ -> Deaths = case Env of diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index f2c42f318329..28b3be127246 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -5,7 +5,7 @@ -include("mc.hrl"). -export([ - init/1, + init/2, size/1, x_header/2, property/2, @@ -59,18 +59,18 @@ ]). %% mc implementation -init(Sections) when is_list(Sections) -> +init(Sections, Env) when is_list(Sections) -> Msg = decode(Sections, #msg{}), - init(Msg); -init(#msg{} = Msg) -> + init(Msg, Env); +init(#msg{} = Msg, _Env) -> %% TODO: as the essential annotations, durable, priority, ttl and delivery_count %% is all we are interested in it isn't necessary to keep hold of the %% incoming AMQP header inside the state Anns = essential_properties(Msg), {Msg, Anns}. -convert_from(?MODULE, Sections, _Env) -> - element(1, init(Sections)); +convert_from(?MODULE, Sections, Env) -> + element(1, init(Sections, Env)); convert_from(_SourceProto, _, _Env) -> not_implemented. diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index 7462edae7143..f5d13c4456f6 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -8,7 +8,7 @@ %% mc -export([ - init/1, + init/2, size/1, x_header/2, routing_headers/2, @@ -48,10 +48,10 @@ ]). %% mc implementation -init(#content{} = Content0) -> +init(#content{} = Content0, Env) -> Content1 = rabbit_binary_parser:ensure_content_decoded(Content0), %% project essential properties into annotations - Anns = essential_properties(Content1), + Anns = essential_properties(Content1, Env), Content = strip_header(Content1, ?DELETED_HEADER), {Content, Anns}. @@ -483,7 +483,8 @@ message(#resource{name = ExchangeNameBin}, RoutingKey, {ok, mc:init(?MODULE, Content, Anns#{?ANN_ROUTING_KEYS => [RoutingKey | HeaderRoutes], - ?ANN_EXCHANGE => ExchangeNameBin})} + ?ANN_EXCHANGE => ExchangeNameBin}, + ?MC_ENV)} end; message(#resource{} = XName, RoutingKey, #content{} = Content, Anns, false) -> @@ -514,75 +515,6 @@ from_basic_message(#basic_message{content = Content, %% Internal -deaths_to_headers(Deaths, Headers0) -> - Infos = case Deaths of - #deaths{records = Records} -> - %% sort records by the last timestamp - List = lists:sort( - fun({_, #death{anns = #{last_time := L1}}}, - {_, #death{anns = #{last_time := L2}}}) -> - L1 =< L2 - end, maps:to_list(Records)), - lists:foldl(fun(Record, Acc) -> - Table = death_table(Record), - [Table | Acc] - end, [], List); - _ -> - lists:map(fun death_table/1, Deaths) - end, - rabbit_misc:set_table_value(Headers0, <<"x-death">>, array, Infos). - -convert_from_amqp_deaths({array, map, Maps}) -> - L = lists:map( - fun({map, KvList}) -> - {Ttl, KvList1} = case KvList of - [{{symbol, <<"ttl">>}, {uint, Ttl0}} | Tail] -> - {Ttl0, Tail}; - _ -> - {undefined, KvList} - end, - [ - {{symbol, <<"queue">>}, {utf8, Queue}}, - {{symbol, <<"reason">>}, {symbol, Reason}}, - {{symbol, <<"count">>}, {ulong, Count}}, - {{symbol, <<"first-time">>}, {timestamp, FirstTime}}, - {{symbol, <<"last-time">>}, {timestamp, _LastTime}}, - {{symbol, <<"exchange">>}, {utf8, Exchange}}, - {{symbol, <<"routing-keys">>}, {array, utf8, RKeys0}} - ] = KvList1, - RKeys = [Key || {utf8, Key} <- RKeys0], - death_table(Queue, Reason, Exchange, RKeys, Count, FirstTime, Ttl) - end, Maps), - {true, {<<"x-death">>, array, L}}; -convert_from_amqp_deaths(_IgnoreUnknownValue) -> - false. - -death_table({{QName, Reason}, - #death{exchange = Exchange, - routing_keys = RoutingKeys, - count = Count, - anns = DeathAnns = #{first_time := FirstTime}}}) -> - death_table(QName, Reason, Exchange, RoutingKeys, Count, FirstTime, - maps:get(ttl, DeathAnns, undefined)). - -death_table(QName, Reason, Exchange, RoutingKeys, Count, FirstTime, Ttl) -> - L0 = [ - {<<"count">>, long, Count}, - {<<"reason">>, longstr, rabbit_data_coercion:to_binary(Reason)}, - {<<"queue">>, longstr, QName}, - {<<"time">>, timestamp, FirstTime div 1000}, - {<<"exchange">>, longstr, Exchange}, - {<<"routing-keys">>, array, [{longstr, Key} || Key <- RoutingKeys]} - ], - L = case Ttl of - undefined -> - L0; - _ -> - Expiration = integer_to_binary(Ttl), - [{<<"original-expiration">>, longstr, Expiration} | L0] - end, - {table, L}. - strip_header(#content{properties = #'P_basic'{headers = undefined}} = DecodedContent, _Key) -> DecodedContent; @@ -732,11 +664,11 @@ message_id({utf8, S}, HKey, H0) -> message_id(undefined, _HKey, H) -> {H, undefined}. -essential_properties(#content{} = C) -> +essential_properties(#content{properties = Props}, Env) -> #'P_basic'{delivery_mode = Mode, priority = Priority, timestamp = TimestampRaw, - headers = Headers} = Props = C#content.properties, + headers = Headers} = Props, {ok, MsgTTL} = rabbit_basic:parse_expiration(Props), Timestamp = case TimestampRaw of undefined -> @@ -752,6 +684,8 @@ essential_properties(#content{} = C) -> _ -> undefined end, + Deaths = headers_to_deaths(Headers, Env), + maps_put_truthy( ?ANN_PRIORITY, Priority, maps_put_truthy( @@ -762,7 +696,125 @@ essential_properties(#content{} = C) -> ?ANN_DURABLE, Durable, maps_put_truthy( bcc, BccKeys, - #{}))))). + maps_put_truthy( + deaths, Deaths, + #{})))))). + +headers_to_deaths(_, #{?FF_MC_DEATHS_V2 := false}) -> + undefined; +headers_to_deaths(undefined, _) -> + undefined; +headers_to_deaths(Headers, _) -> + case lists:keymember(<<"x-death">>, 1, Headers) of + true -> + case rabbit_misc:amqp_table(Headers) of + #{<<"x-death">> := XDeathList} + when is_list(XDeathList) -> + recover_deaths(XDeathList, []); + _ -> + undefined + end; + false -> + undefined + end. + +recover_deaths([], Acc) -> + lists:reverse(Acc); +recover_deaths([Map = #{<<"exchange">> := Exchange, + <<"queue">> := Queue, + <<"routing-keys">> := RKeys, + <<"reason">> := ReasonBin, + <<"count">> := Count, + <<"time">> := Ts} | Rem], Acc0) -> + Reason = binary_to_existing_atom(ReasonBin), + DeathAnns0 = #{first_time => Ts, + %% Given that this timestamp is absent in the AMQP 0.9.1 + %% x-death header, the last_time we set here is incorrect + %% if the message was dead lettered more than one time. + last_time => Ts}, + DeathAnns = case Map of + #{<<"original-expiration">> := Exp} -> + DeathAnns0#{ttl => binary_to_integer(Exp)}; + _ -> + DeathAnns0 + end, + Acc = [{{Queue, Reason}, + #death{anns = DeathAnns, + exchange = Exchange, + count = Count, + routing_keys = RKeys}} | Acc0], + recover_deaths(Rem, Acc); +recover_deaths([_IgnoreInvalid | Rem], Acc) -> + recover_deaths(Rem, Acc). + +deaths_to_headers(Deaths, Headers0) -> + Infos = case Deaths of + #deaths{records = Records} -> + %% sort records by the last timestamp + List = lists:sort( + fun({_, #death{anns = #{last_time := L1}}}, + {_, #death{anns = #{last_time := L2}}}) -> + L1 =< L2 + end, maps:to_list(Records)), + lists:foldl(fun(Record, Acc) -> + Table = death_table(Record), + [Table | Acc] + end, [], List); + _ -> + lists:map(fun death_table/1, Deaths) + end, + rabbit_misc:set_table_value(Headers0, <<"x-death">>, array, Infos). + +convert_from_amqp_deaths({array, map, Maps}) -> + L = lists:map( + fun({map, KvList}) -> + {Ttl, KvList1} = case KvList of + [{{symbol, <<"ttl">>}, {uint, Ttl0}} | Tail] -> + {Ttl0, Tail}; + _ -> + {undefined, KvList} + end, + [ + {{symbol, <<"queue">>}, {utf8, Queue}}, + {{symbol, <<"reason">>}, {symbol, Reason}}, + {{symbol, <<"count">>}, {ulong, Count}}, + {{symbol, <<"first-time">>}, {timestamp, FirstTime}}, + {{symbol, <<"last-time">>}, {timestamp, _LastTime}}, + {{symbol, <<"exchange">>}, {utf8, Exchange}}, + {{symbol, <<"routing-keys">>}, {array, utf8, RKeys0}} + ] = KvList1, + RKeys = [Key || {utf8, Key} <- RKeys0], + death_table(Queue, Reason, Exchange, RKeys, Count, FirstTime, Ttl) + end, Maps), + {true, {<<"x-death">>, array, L}}; +convert_from_amqp_deaths(_IgnoreUnknownValue) -> + false. + +death_table({{QName, Reason}, + #death{exchange = Exchange, + routing_keys = RoutingKeys, + count = Count, + anns = DeathAnns = #{first_time := FirstTime}}}) -> + death_table(QName, Reason, Exchange, RoutingKeys, Count, FirstTime, + maps:get(ttl, DeathAnns, undefined)). + +death_table(QName, Reason, Exchange, RoutingKeys, Count, FirstTime, Ttl) -> + L0 = [ + {<<"count">>, long, Count}, + {<<"reason">>, longstr, rabbit_data_coercion:to_binary(Reason)}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, FirstTime div 1000}, + {<<"exchange">>, longstr, Exchange}, + {<<"routing-keys">>, array, [{longstr, Key} || Key <- RoutingKeys]} + ], + L = case Ttl of + undefined -> + L0; + _ -> + Expiration = integer_to_binary(Ttl), + [{<<"original-expiration">>, longstr, Expiration} | L0] + end, + {table, L}. %% headers that are added as annotations during conversions is_internal_header(<<"x-basic-", _/binary>>) -> diff --git a/deps/rabbit/src/rabbit_dead_letter.erl b/deps/rabbit/src/rabbit_dead_letter.erl index a8c6b4515eda..e42ba81a91bf 100644 --- a/deps/rabbit/src/rabbit_dead_letter.erl +++ b/deps/rabbit/src/rabbit_dead_letter.erl @@ -31,11 +31,7 @@ publish(Msg0, Reason, #exchange{name = XName} = DLX, RK, _ -> [RK] end, - Env = case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of - true -> #{}; - false -> #{?FF_MC_DEATHS_V2 => false} - end, - Msg1 = mc:record_death(Reason, SourceQName, Msg0, Env), + Msg1 = mc:record_death(Reason, SourceQName, Msg0, ?MC_ENV), {Ttl, Msg2} = mc:take_annotation(dead_letter_ttl, Msg1), Msg3 = mc:set_ttl(Ttl, Msg2), Msg4 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg3), diff --git a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl index f96d8de1e491..b2a7f46fc50d 100644 --- a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl +++ b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl @@ -324,11 +324,7 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason, _ -> [RKey] end, - Env = case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of - true -> #{}; - false -> #{?FF_MC_DEATHS_V2 => false} - end, - Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg, Env), + Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg, ?MC_ENV), Msg1 = mc:set_ttl(undefined, Msg0), Msg2 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg1), Msg = mc:set_annotation(?ANN_EXCHANGE, DLXName, Msg2), diff --git a/deps/rabbit/test/dead_lettering_SUITE.erl b/deps/rabbit/test/dead_lettering_SUITE.erl index 10622b9fd622..9fbee5fa8e1c 100644 --- a/deps/rabbit/test/dead_lettering_SUITE.erl +++ b/deps/rabbit/test/dead_lettering_SUITE.erl @@ -36,7 +36,7 @@ groups() -> dead_letter_missing_exchange, dead_letter_routing_key, dead_letter_headers_should_be_appended_for_each_event, - dead_letter_headers_should_not_be_appended_for_republish, + dead_letter_headers_should_be_appended_for_republish, dead_letter_routing_key_header_CC, dead_letter_routing_key_header_BCC, dead_letter_routing_key_cycle_max_length, @@ -56,7 +56,8 @@ groups() -> dead_letter_routing_key_cycle_ttl, dead_letter_headers_reason_expired, dead_letter_headers_reason_expired_per_message, - dead_letter_extra_bcc], + dead_letter_extra_bcc, + x_death_header_from_amqpl_client], DisabledMetricTests = [metric_maxlen, metric_rejected, metric_expired_queue_msg_ttl, @@ -200,11 +201,6 @@ init_per_testcase(T, Config) %% * stream is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11173 Skip end; -init_per_testcase(dead_letter_headers_should_not_be_appended_for_republish = T, Config) -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers) of - ok -> init_per_testcase0(T, Config); - {skip, _} = Skip -> Skip - end; init_per_testcase(Testcase, Config) -> init_per_testcase0(Testcase, Config). @@ -1218,7 +1214,7 @@ dead_letter_headers_should_be_appended_for_each_event(Config) -> ?assertEqual({longstr, Dlx1Name}, rabbit_misc:table_lookup(DeathDlx, <<"queue">>)), ok = rabbit_ct_client_helpers:close_connection(Conn). -dead_letter_headers_should_not_be_appended_for_republish(Config) -> +dead_letter_headers_should_be_appended_for_republish(Config) -> %% here we (re-)publish a message with the DL headers already set {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Args = ?config(queue_args, Config), @@ -1263,7 +1259,7 @@ dead_letter_headers_should_not_be_appended_for_republish(Config) -> props = #'P_basic'{headers = Headers2}}} = amqp_channel:call(Ch, #'basic.get'{queue = DlxName}), - {array, [{table, Death2}]} = rabbit_misc:table_lookup(Headers2, <<"x-death">>), + {array, [{table, Death2}, {table, _Death1}]} = rabbit_misc:table_lookup(Headers2, <<"x-death">>), ?assertEqual({longstr, <<"expired">>}, rabbit_misc:table_lookup(Death2, <<"reason">>)), ok = rabbit_ct_client_helpers:close_connection(Conn). @@ -1581,6 +1577,48 @@ dead_letter_extra_bcc(Config) -> [_] = consume(Ch, ExtraBCCQ, [P]), ok. +%% Test case for https://github.com/rabbitmq/rabbitmq-server/issues/10709 +%% The Spring Cloud Stream RabbitMQ Binder Reference Guide recommends relying on the +%% count field of the x-death header to determine how often a message was dead lettered +%% in a loop involving RabbitMQ and an AMQP 0.9.1 client. +%% This test therefore asserts that RabbitMQ interprets the x-death header if an AMQP 0.9.1 +%% clients (re)publishes a message with the x-death header set. +%% This test case should only pass up to 3.13. +%% Starting with 4.0, RabbitMQ won't interpret x-headers sent from clients anymore in any +%% special way as x-headers "belong to the broker". +x_death_header_from_amqpl_client(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-message-ttl">>, long, 0}]), + + Payload = <<"my payload">>, + ok = amqp_channel:call(Ch, + #'basic.publish'{routing_key = QName}, + #amqp_msg{payload = Payload}), + wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), + { + #'basic.get_ok'{}, + #amqp_msg{props = #'P_basic'{headers = Headers1}} = Msg1 + } = amqp_channel:call(Ch, #'basic.get'{queue = DLXQName, + no_ack = true}), + {array, [{table, Death1}]} = rabbit_misc:table_lookup(Headers1, <<"x-death">>), + ?assertEqual({long, 1}, rabbit_misc:table_lookup(Death1, <<"count">>)), + + ok = amqp_channel:call(Ch, + #'basic.publish'{routing_key = QName}, + %% Re-publish the same message we received including the x-death header. + Msg1), + wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), + { + #'basic.get_ok'{}, + #amqp_msg{payload = Payload, + props = #'P_basic'{headers = Headers2}} + } = amqp_channel:call(Ch, #'basic.get'{queue = DLXQName, + no_ack = true}), + {array, [{table, Death2}]} = rabbit_misc:table_lookup(Headers2, <<"x-death">>), + ?assertEqual({long, 2}, rabbit_misc:table_lookup(Death2, <<"count">>)). + set_queue_options(QName, Options) -> rabbit_amqqueue:update(rabbit_misc:r(<<"/">>, queue, QName), fun(Q) -> diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index 4e7c91391c28..09580ed69d5c 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -31,6 +31,7 @@ all_tests() -> amqpl_table_x_header_array_of_tbls, amqpl_death_v1_records, amqpl_death_v2_records, + amqpl_parse_x_death, is_death_cycle, amqpl_amqp_bin_amqpl, amqpl_cc_amqp_bin_amqpl, @@ -239,6 +240,59 @@ amqpl_death_records(Env) -> ?assertMatch({_, longstr, <<"q1">>}, header(<<"queue">>, T2b)), ok. +%% Test case for https://github.com/rabbitmq/rabbitmq-server/issues/10709 +%% with feature flag message_containers_deaths_v2 enabled. +amqpl_parse_x_death(_Config) -> + Q = <<"my queue">>, + DLQ = <<"my dead letter queue">>, + + Content0 = #content{class_id = 60, + properties = #'P_basic'{headers = [], + expiration = <<"9999">>}, + payload_fragments_rev = [<<"data">>]}, + Msg0 = mc:prepare(store, mc:init(mc_amqpl, Content0, annotations())), + Msg1 = mc:record_death(rejected, Q, Msg0, #{}), + + %% Roundtrip simulates message being sent to and received from AMQP 0.9.1 client. + Content1 = mc:protocol_state(Msg1), + Msg2 = mc:init(mc_amqpl, Content1, annotations()), + + ?assertEqual([Q], mc:death_queue_names(Msg2)), + ?assertEqual(false, mc:is_death_cycle(Q, Msg2)), + + + #content{properties = #'P_basic'{headers = H1}} = mc:protocol_state(Msg2), + ?assertMatch({_, longstr, Q}, header(<<"x-first-death-queue">>, H1)), + ?assertMatch({_, longstr, Q}, header(<<"x-last-death-queue">>, H1)), + ?assertMatch({_, longstr, <<"exch">>}, header(<<"x-first-death-exchange">>, H1)), + ?assertMatch({_, longstr, <<"exch">>}, header(<<"x-last-death-exchange">>, H1)), + ?assertMatch({_, longstr, <<"rejected">>}, header(<<"x-first-death-reason">>, H1)), + ?assertMatch({_, longstr, <<"rejected">>}, header(<<"x-last-death-reason">>, H1)), + {_, array, [{table, T1}]} = header(<<"x-death">>, H1), + ?assertMatch({_, long, 1}, header(<<"count">>, T1)), + ?assertMatch({_, longstr, <<"rejected">>}, header(<<"reason">>, T1)), + ?assertMatch({_, longstr, Q}, header(<<"queue">>, T1)), + ?assertMatch({_, longstr, <<"exch">>}, header(<<"exchange">>, T1)), + ?assertMatch({_, timestamp, _}, header(<<"time">>, T1)), + ?assertMatch({_, array, [{longstr, <<"apple">>}]}, header(<<"routing-keys">>, T1)), + ?assertMatch({_, longstr, <<"9999">>}, header(<<"original-expiration">>, T1)), + + Msg3 = mc:record_death(expired, DLQ, Msg2, #{}), + + #content{properties = #'P_basic'{headers = H2}} = mc:protocol_state(Msg3), + {_, array, [{table, T2a}, {table, T2b}]} = header(<<"x-death">>, H2), + ?assertMatch({_, longstr, DLQ}, header(<<"queue">>, T2a)), + ?assertMatch({_, longstr, Q}, header(<<"queue">>, T2b)), + + Msg4 = mc:record_death(rejected, Q, Msg3, #{}), + + %% Roundtrip simulates message being sent to and received from AMQP 0.9.1 client. + Content2 = mc:protocol_state(Msg4), + Msg5 = mc:init(mc_amqpl, Content2, annotations()), + + %% We expect the list to be ordered by death recency. + ?assertEqual([Q, DLQ], mc:death_queue_names(Msg5)). + is_death_cycle(_Config) -> Content = #content{class_id = 60, properties = #'P_basic'{headers = []}, @@ -430,6 +484,10 @@ amqpl_amqp_bin_amqpl(_Config) -> ok. amqpl_cc_amqp_bin_amqpl(_Config) -> + Mod = rabbit_feature_flags, + meck:new(Mod, [no_link]), + meck:expect(Mod, is_enabled, fun (message_containers_deaths_v2) -> true end), + Headers = [{<<"CC">>, array, [{longstr, <<"q1">>}, {longstr, <<"q2">>}]}], Props = #'P_basic'{headers = Headers}, @@ -724,15 +782,10 @@ amqp_amqpl_amqp_bodies(_Config) -> [begin EncodedPayload = amqp10_encode_bin(Payload), - - Ex = #resource{virtual_host = <<"/">>, - kind = exchange, - name = <<"ex">>}, - {ok, LegacyMsg} = mc_amqpl:message(Ex, <<"rkey">>, - #content{payload_fragments_rev = - lists:reverse(EncodedPayload), - properties = Props}, - #{}, true), + LegacyMsg = mc:init(mc_amqpl, + #content{properties = Props, + payload_fragments_rev = lists:reverse(EncodedPayload)}, + annotations()), AmqpMsg = mc:convert(mc_amqp, LegacyMsg), %% drop any non body sections diff --git a/deps/rabbit/test/per_node_limit_SUITE.erl b/deps/rabbit/test/per_node_limit_SUITE.erl index 00a51b415373..98990c8dc364 100644 --- a/deps/rabbit/test/per_node_limit_SUITE.erl +++ b/deps/rabbit/test/per_node_limit_SUITE.erl @@ -156,7 +156,7 @@ channel_consumers_limit(Config) -> ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost), Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost), {ok, Ch} = open_channel(Conn1), - Q = <<"Q">>, Tag = <<"Tag">>, + Q = <<"Q">>, {ok, _} = consume(Ch, Q, <<"Tag1">>), {ok, _} = consume(Ch, Q, <<"Tag2">>), diff --git a/deps/rabbit/test/unit_classic_mirrored_queue_sync_throttling_SUITE.erl b/deps/rabbit/test/unit_classic_mirrored_queue_sync_throttling_SUITE.erl index 8a61c6a3cc3d..51700c448974 100644 --- a/deps/rabbit/test/unit_classic_mirrored_queue_sync_throttling_SUITE.erl +++ b/deps/rabbit/test/unit_classic_mirrored_queue_sync_throttling_SUITE.erl @@ -2,6 +2,7 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit_common/include/rabbit_framing.hrl"). +-include_lib("rabbit/include/mc.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -65,8 +66,9 @@ append_to_acc(_Config) -> priority = 2}, payload_fragments_rev = [[<<"1234567890">>]] %% 10 bytes }, - ExName = rabbit_misc:r(<<>>, exchange, <<>>), - {ok, Msg} = mc_amqpl:message(ExName, <<>>, Content, #{id => 1}, true), + Msg = mc:init(mc_amqpl, Content, #{id => 1, + ?ANN_EXCHANGE => <<>>, + ?ANN_ROUTING_KEYS => [<<>>]}), BQDepth = 10, SyncThroughput_0 = 0, FoldAcc1 = {[], 0, {0, erlang:monotonic_time(), SyncThroughput_0}, {0, BQDepth}, erlang:monotonic_time()}, diff --git a/deps/rabbitmq_mqtt/src/mc_mqtt.erl b/deps/rabbitmq_mqtt/src/mc_mqtt.erl index 981ecbdc5f2c..21e2f5d78937 100644 --- a/deps/rabbitmq_mqtt/src/mc_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/mc_mqtt.erl @@ -11,7 +11,7 @@ -define(CONTENT_TYPE_AMQP, <<"message/vnd.rabbitmq.amqp">>). -export([ - init/1, + init/2, size/1, x_header/2, property/2, @@ -23,7 +23,7 @@ ]). init(Msg = #mqtt_msg{qos = Qos, - props = Props}) -> + props = Props}, _Env) -> Anns0 = #{?ANN_DURABLE => durable(Qos)}, Anns1 = case Props of #{'Message-Expiry-Interval' := Seconds} ->