From 5619f2e41fd79f33d93cf9f91134d15634762212 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 22 Oct 2024 09:51:34 +0200 Subject: [PATCH] Convert array from AMQP 1.0 to AMQP 0.9.1 Fix the following crash when an AMQP 0.9.1 client consumes an AMQP 1.0 encoded message that contains an array value in message annotations: ``` crasher: initial call: rabbit_channel:init/1 pid: <0.685.0> registered_name: [] exception exit: {function_clause, [{mc_amqpl,to_091, [<<"x-array">>, {array,utf8,[{utf8,<<"e1">>},{utf8,<<"e2">>}]}], [{file,"mc_amqpl.erl"},{line,737}]}, {mc_amqpl,'-convert_from/3-fun-3-',1, [{file,"mc_amqpl.erl"},{line,168}]}, {lists,filtermap_1,2, [{file,"lists.erl"},{line,2279}]}, {mc_amqpl,convert_from,3, [{file,"mc_amqpl.erl"},{line,158}]}, {mc,convert,3,[{file,"mc.erl"},{line,332}]}, {rabbit_channel,handle_deliver0,4, [{file,"rabbit_channel.erl"},{line,2619}]}, {lists,foldl_1,3,[{file,"lists.erl"},{line,2151}]}, {lists,foldl,3,[{file,"lists.erl"},{line,2146}]}]} ``` --- deps/amqp10_client/src/amqp10_msg.erl | 8 +++++--- deps/rabbit/src/mc_amqpl.erl | 9 +++++++-- deps/rabbit/test/amqp_client_SUITE.erl | 14 ++++++++++++++ deps/rabbit/test/mc_unit_SUITE.erl | 4 +++- 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_msg.erl b/deps/amqp10_client/src/amqp10_msg.erl index 0f60c9bb8c28..673617acc6a0 100644 --- a/deps/amqp10_client/src/amqp10_msg.erl +++ b/deps/amqp10_client/src/amqp10_msg.erl @@ -402,8 +402,8 @@ set_delivery_annotations( Anns1 = #'v1_0.delivery_annotations'{content = maps:to_list(Anns)}, Msg#amqp10_msg{delivery_annotations = Anns1}. --spec set_message_annotations(#{binary() => binary() | integer() | string()}, - amqp10_msg()) -> amqp10_msg(). +-spec set_message_annotations(#{binary() => binary() | number() | string() | tuple()}, + amqp10_msg()) -> amqp10_msg(). set_message_annotations(Props, #amqp10_msg{message_annotations = undefined} = Msg) -> @@ -436,7 +436,9 @@ wrap_ap_value(V) when is_integer(V) -> end; wrap_ap_value(V) when is_number(V) -> %% AMQP double and Erlang float are both 64-bit. - {double, V}. + {double, V}; +wrap_ap_value(TaggedValue) when is_tuple(TaggedValue) -> + TaggedValue. %% LOCAL header_value(durable, undefined) -> false; diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index 8de27294723a..723e60cd3f79 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -754,9 +754,14 @@ to_091(Key, false) -> {Key, bool, false}; to_091(Key, undefined) -> {Key, void, undefined}; to_091(Key, null) -> {Key, void, undefined}; to_091(Key, {list, L}) -> - {Key, array, [to_091(V) || V <- L]}; + to_091_array(Key, L); to_091(Key, {map, M}) -> - {Key, table, [to_091(unwrap(K), V) || {K, V} <- M]}. + {Key, table, [to_091(unwrap(K), V) || {K, V} <- M]}; +to_091(Key, {array, _T, L}) -> + to_091_array(Key, L). + +to_091_array(Key, L) -> + {Key, array, [to_091(V) || V <- L]}. to_091({utf8, V}) -> {longstr, V}; to_091({symbol, V}) -> {longstr, V}; diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index dd641328601b..f192a0c309f8 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -1339,6 +1339,13 @@ amqp_amqpl(QType, Config) -> message_format = {uint, 0}}, Body1, Footer])), + %% Send with an array value in message annotations. + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_message_annotations( + #{<<"x-array">> => {array, utf8, [{utf8, <<"e1">>}, + {utf8, <<"e2">>}]}}, + amqp10_msg:new(<<>>, Body1, true))), ok = amqp10_client:detach_link(Sender), flush(detached), @@ -1418,6 +1425,13 @@ amqp_amqpl(QType, Config) -> ?assertEqual([Body1, Footer], amqp10_framing:decode_bin(Payload10)) after 5000 -> ct:fail({missing_deliver, ?LINE}) end, + receive {_, #amqp_msg{payload = Payload11, + props = #'P_basic'{headers = Headers11}}} -> + ?assertEqual([Body1], amqp10_framing:decode_bin(Payload11)), + ?assertEqual({array, [{longstr, <<"e1">>}, {longstr, <<"e2">>}]}, + rabbit_misc:table_lookup(Headers11, <<"x-array">>)) + after 5000 -> ct:fail({missing_deliver, ?LINE}) + end, ok = rabbit_ct_client_helpers:close_channel(Ch), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index 529ffe072c28..acc9ea69adfe 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -532,7 +532,8 @@ amqp_amqpl(_Config) -> MAC = [ {{symbol, <<"x-stream-filter">>}, {utf8, <<"apple">>}}, thead2('x-list', list, [utf8(<<"l">>)]), - thead2('x-map', map, [{utf8(<<"k">>), utf8(<<"v">>)}]) + thead2('x-map', map, [{utf8(<<"k">>), utf8(<<"v">>)}]), + {{symbol, <<"x-array">>}, {array, utf8, [{utf8, <<"a">>}]}} ], M = #'v1_0.message_annotations'{content = MAC}, P = #'v1_0.properties'{content_type = {symbol, <<"ctype">>}, @@ -598,6 +599,7 @@ amqp_amqpl(_Config) -> ?assertMatch({_, longstr, <<"apple">>}, header(<<"x-stream-filter">>, HL)), ?assertMatch({_ ,array, [{longstr,<<"l">>}]}, header(<<"x-list">>, HL)), ?assertMatch({_, table, [{<<"k">>,longstr,<<"v">>}]}, header(<<"x-map">>, HL)), + ?assertMatch({_, array, [{longstr, <<"a">>}]}, header(<<"x-array">>, HL)), ?assertMatch({_, long, 5}, header(<<"long">>, HL)), ?assertMatch({_, long, 5}, header(<<"ulong">>, HL)),