From eeeae55caa65783063bc88e7804f92a1b8b9f9fd Mon Sep 17 00:00:00 2001 From: David Ansari Date: Wed, 9 Jul 2025 15:21:18 +0200 Subject: [PATCH] Fix AMQP crashes for approximate numbers This commit fixes several crashes: 1. Serialising IEEE 754-2008 decimals as well as NaN and +-Inf for float and doubles crashed 2. Converting IEEE 754-2008 decimals as well as NaN and +-Inf for float and dobules from amqp to amqpl crashed The 2nd crash looks as follows: ``` exception exit: {function_clause, [{mc_amqpl,to_091, [<<"decimal-32">>,{as_is,116,<<124,0,0,0>>}], [{file,"mc_amqpl.erl"},{line,747}]}, {mc_amqpl,'-convert_from/3-lc$^2/1-2-',1, [{file,"mc_amqpl.erl"},{line,155}]}, {mc_amqpl,convert_from,3, [{file,"mc_amqpl.erl"},{line,155}]}, {mc,convert,3,[{file,"mc.erl"},{line,358}]}, {rabbit_channel,outgoing_content,2, [{file,"rabbit_channel.erl"},{line,2649}]}, {rabbit_channel,handle_basic_get,7, [{file,"rabbit_channel.erl"},{line,2636}]}, {rabbit_channel,handle_cast,2, [{file,"rabbit_channel.erl"},{line,617}]}, {gen_server2,handle_msg,2, [{file,"gen_server2.erl"},{line,1056}]}]} ``` The 2nd crash is fixed by omitting any `{as_is, _TypeCode, _Binary}` values during AMQP 1.0 -> AMQP 0.9.1 conversion. This will be documented in the conversion table. In addition to fixing these crashes, this commit adds tests that RabbitMQ is able to store and forward IEEE 754-2008 decimals. IEEE 754-2008 decimals can be parsed and serialsed by RabbitMQ. However, RabbitMQ doesn't support interpreting this values. For example, they can't be used on the headers exchange or for AMQP filter expressions. (cherry picked from commit 5c318c8e38692906cfb5089538f169641cec05ab) --- .../src/amqp10_binary_generator.erl | 4 +- .../src/amqp10_binary_parser.erl | 40 +++++------ .../test/binary_generator_SUITE.erl | 22 +++++++ deps/rabbit/src/mc.erl | 1 + deps/rabbit/src/mc_amqpl.erl | 27 ++++++-- deps/rabbit/test/amqp_client_SUITE.erl | 66 ++++++++++++++++++- 6 files changed, 133 insertions(+), 27 deletions(-) diff --git a/deps/amqp10_common/src/amqp10_binary_generator.erl b/deps/amqp10_common/src/amqp10_binary_generator.erl index c23a40f856da..b628fcaaa152 100644 --- a/deps/amqp10_common/src/amqp10_binary_generator.erl +++ b/deps/amqp10_common/src/amqp10_binary_generator.erl @@ -177,8 +177,8 @@ generate1({array, Type, List}) -> [16#e0, S + 1, Count, Array] end; -generate1({as_is, TypeCode, Bin}) -> - <>. +generate1({as_is, TypeCode, Bin}) when is_binary(Bin) -> + [TypeCode, Bin]. constructor(symbol) -> 16#b3; constructor(ubyte) -> 16#50; diff --git a/deps/amqp10_common/src/amqp10_binary_parser.erl b/deps/amqp10_common/src/amqp10_binary_parser.erl index c8e07513db98..13f616ff57c3 100644 --- a/deps/amqp10_common/src/amqp10_binary_parser.erl +++ b/deps/amqp10_common/src/amqp10_binary_parser.erl @@ -101,17 +101,17 @@ parse(<<16#e0, S:8,CountAndV:S/binary,_/binary>>, B) -> parse(<<16#f0, S:32,CountAndV:S/binary,_/binary>>, B) -> {parse_array(32, CountAndV), B+5+S}; %% NaN or +-inf -parse(<<16#72, V:32, _/binary>>, B) -> - {{as_is, 16#72, <>}, B+5}; -parse(<<16#82, V:64, _/binary>>, B) -> - {{as_is, 16#82, <>}, B+9}; +parse(<<16#72, V:4/binary, _/binary>>, B) -> + {{as_is, 16#72, V}, B+5}; +parse(<<16#82, V:8/binary, _/binary>>, B) -> + {{as_is, 16#82, V}, B+9}; %% decimals -parse(<<16#74, V:32, _/binary>>, B) -> - {{as_is, 16#74, <>}, B+5}; -parse(<<16#84, V:64, _/binary>>, B) -> - {{as_is, 16#84, <>}, B+9}; -parse(<<16#94, V:128, _/binary>>, B) -> - {{as_is, 16#94, <>}, B+17}; +parse(<<16#74, V:4/binary, _/binary>>, B) -> + {{as_is, 16#74, V}, B+5}; +parse(<<16#84, V:8/binary, _/binary>>, B) -> + {{as_is, 16#84, V}, B+9}; +parse(<<16#94, V:16/binary, _/binary>>, B) -> + {{as_is, 16#94, V}, B+17}; parse(<>, B) -> throw({primitive_type_unsupported, Type, {position, B}}). @@ -317,17 +317,17 @@ pm(<<16#e0, S:8,CountAndV:S/binary,R/binary>>, O, B) -> pm(<<16#f0, S:32,CountAndV:S/binary,R/binary>>, O, B) -> [parse_array(32, CountAndV) | pm(R, O, B+5+S)]; %% NaN or +-inf -pm(<<16#72, V:32, R/binary>>, O, B) -> - [{as_is, 16#72, <>} | pm(R, O, B+5)]; -pm(<<16#82, V:64, R/binary>>, O, B) -> - [{as_is, 16#82, <>} | pm(R, O, B+9)]; +pm(<<16#72, V:4/binary, R/binary>>, O, B) -> + [{as_is, 16#72, V} | pm(R, O, B+5)]; +pm(<<16#82, V:8/binary, R/binary>>, O, B) -> + [{as_is, 16#82, V} | pm(R, O, B+9)]; %% decimals -pm(<<16#74, V:32, R/binary>>, O, B) -> - [{as_is, 16#74, <>} | pm(R, O, B+5)]; -pm(<<16#84, V:64, R/binary>>, O, B) -> - [{as_is, 16#84, <>} | pm(R, O, B+9)]; -pm(<<16#94, V:128, R/binary>>, O, B) -> - [{as_is, 16#94, <>} | pm(R, O, B+17)]; +pm(<<16#74, V:4/binary, R/binary>>, O, B) -> + [{as_is, 16#74, V} | pm(R, O, B+5)]; +pm(<<16#84, V:8/binary, R/binary>>, O, B) -> + [{as_is, 16#84, V} | pm(R, O, B+9)]; +pm(<<16#94, V:16/binary, R/binary>>, O, B) -> + [{as_is, 16#94, V} | pm(R, O, B+17)]; pm(<>, _O, B) -> throw({primitive_type_unsupported, Type, {position, B}}). diff --git a/deps/amqp10_common/test/binary_generator_SUITE.erl b/deps/amqp10_common/test/binary_generator_SUITE.erl index ac63d1b7a661..ef50660d95ae 100644 --- a/deps/amqp10_common/test/binary_generator_SUITE.erl +++ b/deps/amqp10_common/test/binary_generator_SUITE.erl @@ -99,12 +99,34 @@ numerals(_Config) -> roundtrip({long, 0}), roundtrip({long, 16#7FFFFFFFFFFFFFFF}), roundtrip({long, -16#8000000000000000}), + roundtrip({float, 0.0}), roundtrip({float, 1.0}), roundtrip({float, -1.0}), roundtrip({double, 0.0}), roundtrip({double, 1.0}), roundtrip({double, -1.0}), + + %% float +Inf + roundtrip({as_is, 16#72, <<16#7F, 16#80, 16#00, 16#00>>}), + %% double +Inf + roundtrip({as_is, 16#82, <<16#7F, 16#F0, 16#00, 16#00, + 16#00, 16#00, 16#00, 16#00>>}), + + %% decimal32 + roundtrip({as_is, 16#74, <<16#22, 16#50, 16#00, 16#00>>}), % 0 + roundtrip({as_is, 16#74, <<16#22, 16#50, 16#00, 16#2A>>}), % 42 + roundtrip({as_is, 16#74, <<16#A2, 16#40, 16#00, 16#48>>}), % -123.45 + roundtrip({as_is, 16#74, <<16#78, 16#00, 16#00, 16#00>>}), % +Infinity + roundtrip({as_is, 16#74, <<16#7C, 16#00, 16#00, 16#00>>}), % NaN + %% decimal64 + roundtrip({as_is, 16#84, <<16#22, 16#34, 16#00, 16#00, + 16#00, 16#00, 16#00, 16#00>>}), % 0 + %% decimal128 + roundtrip({as_is, 16#94, <<16#22, 16#08, 16#00, 16#00, + 16#00, 16#00, 16#00, 16#00, + 16#00, 16#00, 16#00, 16#00, + 16#00, 16#00, 16#00, 16#00>>}), % 0 ok. utf8(_Config) -> diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 9dec628b7091..8f008ac8e702 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -90,6 +90,7 @@ {list, [tagged_value()]} | {map, [{tagged_value(), tagged_value()}]} | {array, atom(), [tagged_value()]} | + {as_is, TypeCode :: non_neg_integer(), binary()} | null | undefined. diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index cac190e2cb5e..349030c86039 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -152,8 +152,14 @@ convert_from(mc_amqp, Sections, Env) -> Type0 end, - Headers0 = [to_091(K, V) || {{utf8, K}, V} <- AP, - ?IS_SHORTSTR_LEN(K)], + Headers0 = lists:filtermap(fun({_K, {as_is, _, _}}) -> + false; + ({{utf8, K}, V}) + when ?IS_SHORTSTR_LEN(K) -> + {true, to_091(K, V)}; + (_) -> + false + end, AP), %% Add remaining x- message annotations as headers XHeaders = lists:filtermap(fun({{symbol, <<"x-cc">>}, V}) -> {true, to_091(<<"CC">>, V)}; @@ -161,6 +167,8 @@ convert_from(mc_amqp, Sections, Env) -> {true, {<<"timestamp_in_ms">>, long, Ts}}; ({{symbol, <<"x-opt-deaths">>}, V}) -> convert_from_amqp_deaths(V); + ({_K, {as_is, _, _}}) -> + false; ({{symbol, <<"x-", _/binary>> = K}, V}) when ?IS_SHORTSTR_LEN(K) -> case is_internal_header(K) of @@ -766,12 +774,23 @@ to_091(Key, null) -> {Key, void, undefined}; to_091(Key, {list, L}) -> to_091_array(Key, L); to_091(Key, {map, M}) -> - {Key, table, [to_091(unwrap(K), V) || {K, V} <- M]}; + T = lists:filtermap(fun({K, V}) when element(1, K) =:= as_is orelse + element(1, V) =:= as_is -> + false; + ({K, V}) -> + {true, to_091(unwrap(K), V)} + end, M), + {Key, table, T}; to_091(Key, {array, _T, L}) -> to_091_array(Key, L). to_091_array(Key, L) -> - {Key, array, [to_091(V) || V <- L]}. + A = lists:filtermap(fun({as_is, _, _}) -> + false; + (V) -> + {true, to_091(V)} + end, L), + {Key, array, A}. to_091({utf8, V}) -> {longstr, V}; to_091({symbol, V}) -> {longstr, V}; diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index ebe49946cf1d..1699517f470d 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -172,7 +172,8 @@ groups() -> x_cc_annotation_exchange_routing_key_empty, x_cc_annotation_queue, x_cc_annotation_null, - bad_x_cc_annotation_exchange + bad_x_cc_annotation_exchange, + decimal_types ]}, {cluster_size_3, [shuffle], @@ -6589,6 +6590,69 @@ bad_x_cc_annotation_exchange(Config) -> ok = end_session_sync(Session), ok = close_connection_sync(Connection). +%% Test that RabbitMQ can store and forward AMQP decimal types. +decimal_types(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(QName), + {_, Session, LinkPair} = Init = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue( + LinkPair, QName, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + Decimal32Zero = <<16#22, 16#50, 0, 0>>, + Decimal64Zero = <<16#22, 16#34, 0, 0, 0, 0, 0, 0>>, + Decimal128Zero = <<16#22, 16#08, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>, + Decimal3242 = <<16#22, 16#50, 16#00, 16#2A>>, % 42 + Decimal32NaN = <<16#7C, 0, 0, 0>>, + Body = #'v1_0.amqp_value'{content = {list, [{as_is, 16#74, Decimal32Zero}, + {as_is, 16#84, Decimal64Zero}, + {as_is, 16#94, Decimal128Zero}]}}, + MsgAnns = #{<<"x-decimal-32">> => {as_is, 16#74, Decimal3242}, + <<"x-decimal-64">> => {as_is, 16#84, Decimal64Zero}, + <<"x-decimal-128">> => {as_is, 16#94, Decimal128Zero}, + <<"x-list">> => {list, [{as_is, 16#94, Decimal128Zero}]}, + <<"x-map">> => {map, [{{utf8, <<"key-1">>}, + {as_is, 16#94, Decimal128Zero}}]}}, + AppProps = #{<<"decimal-32">> => {as_is, 16#74, Decimal32NaN}}, + Msg0 = amqp10_msg:set_message_annotations( + MsgAnns, + amqp10_msg:set_application_properties( + AppProps, + amqp10_msg:new(<<"tag">>, Body))), + ok = amqp10_client:send_msg(Sender, Msg0), + ok = wait_for_accepted(<<"tag">>), + ok = amqp10_client:send_msg(Sender, Msg0), + ok = wait_for_accepted(<<"tag">>), + ok = detach_link_sync(Sender), + + %% Consume the first message via AMQP 1.0 + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, Address, unsettled), + {ok, Msg} = amqp10_client:get_msg(Receiver), + ?assertEqual(Body, amqp10_msg:body(Msg)), + ?assertMatch(#{<<"x-decimal-32">> := {as_is, 16#74, Decimal3242}, + <<"x-decimal-64">> := {as_is, 16#84, Decimal64Zero}, + <<"x-decimal-128">> := {as_is, 16#94, Decimal128Zero}, + <<"x-list">> := [{as_is, 16#94, Decimal128Zero}], + <<"x-map">> := [{{utf8, <<"key-1">>}, + {as_is, 16#94, Decimal128Zero}}]}, + amqp10_msg:message_annotations(Msg)), + ?assertEqual(AppProps, amqp10_msg:application_properties(Msg)), + ok = amqp10_client:accept_msg(Receiver, Msg), + ok = detach_link_sync(Receiver), + + %% Consume the second message via AMQP 0.9.1 + %% We expect to receive the message without any crashes. + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{}}, + amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = true})), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = close(Init). + %% Attach a receiver to an unavailable quorum queue. attach_to_down_quorum_queue(Config) -> QName = <<"q-down">>,