Skip to content

Commit eeeae55

Browse files
ansdmergify[bot]
authored andcommitted
Fix AMQP crashes for approximate numbers
This commit fixes several crashes: 1. Serialising IEEE 754-2008 decimals as well as NaN and +-Inf for float and doubles crashed 2. Converting IEEE 754-2008 decimals as well as NaN and +-Inf for float and dobules from amqp to amqpl crashed The 2nd crash looks as follows: ``` exception exit: {function_clause, [{mc_amqpl,to_091, [<<"decimal-32">>,{as_is,116,<<124,0,0,0>>}], [{file,"mc_amqpl.erl"},{line,747}]}, {mc_amqpl,'-convert_from/3-lc$^2/1-2-',1, [{file,"mc_amqpl.erl"},{line,155}]}, {mc_amqpl,convert_from,3, [{file,"mc_amqpl.erl"},{line,155}]}, {mc,convert,3,[{file,"mc.erl"},{line,358}]}, {rabbit_channel,outgoing_content,2, [{file,"rabbit_channel.erl"},{line,2649}]}, {rabbit_channel,handle_basic_get,7, [{file,"rabbit_channel.erl"},{line,2636}]}, {rabbit_channel,handle_cast,2, [{file,"rabbit_channel.erl"},{line,617}]}, {gen_server2,handle_msg,2, [{file,"gen_server2.erl"},{line,1056}]}]} ``` The 2nd crash is fixed by omitting any `{as_is, _TypeCode, _Binary}` values during AMQP 1.0 -> AMQP 0.9.1 conversion. This will be documented in the conversion table. In addition to fixing these crashes, this commit adds tests that RabbitMQ is able to store and forward IEEE 754-2008 decimals. IEEE 754-2008 decimals can be parsed and serialsed by RabbitMQ. However, RabbitMQ doesn't support interpreting this values. For example, they can't be used on the headers exchange or for AMQP filter expressions. (cherry picked from commit 5c318c8)
1 parent 92a8454 commit eeeae55

File tree

6 files changed

+133
-27
lines changed

6 files changed

+133
-27
lines changed

deps/amqp10_common/src/amqp10_binary_generator.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,8 @@ generate1({array, Type, List}) ->
177177
[16#e0, S + 1, Count, Array]
178178
end;
179179

180-
generate1({as_is, TypeCode, Bin}) ->
181-
<<TypeCode, Bin>>.
180+
generate1({as_is, TypeCode, Bin}) when is_binary(Bin) ->
181+
[TypeCode, Bin].
182182

183183
constructor(symbol) -> 16#b3;
184184
constructor(ubyte) -> 16#50;

deps/amqp10_common/src/amqp10_binary_parser.erl

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -101,17 +101,17 @@ parse(<<16#e0, S:8,CountAndV:S/binary,_/binary>>, B) ->
101101
parse(<<16#f0, S:32,CountAndV:S/binary,_/binary>>, B) ->
102102
{parse_array(32, CountAndV), B+5+S};
103103
%% NaN or +-inf
104-
parse(<<16#72, V:32, _/binary>>, B) ->
105-
{{as_is, 16#72, <<V:32>>}, B+5};
106-
parse(<<16#82, V:64, _/binary>>, B) ->
107-
{{as_is, 16#82, <<V:64>>}, B+9};
104+
parse(<<16#72, V:4/binary, _/binary>>, B) ->
105+
{{as_is, 16#72, V}, B+5};
106+
parse(<<16#82, V:8/binary, _/binary>>, B) ->
107+
{{as_is, 16#82, V}, B+9};
108108
%% decimals
109-
parse(<<16#74, V:32, _/binary>>, B) ->
110-
{{as_is, 16#74, <<V:32>>}, B+5};
111-
parse(<<16#84, V:64, _/binary>>, B) ->
112-
{{as_is, 16#84, <<V:64>>}, B+9};
113-
parse(<<16#94, V:128, _/binary>>, B) ->
114-
{{as_is, 16#94, <<V:128>>}, B+17};
109+
parse(<<16#74, V:4/binary, _/binary>>, B) ->
110+
{{as_is, 16#74, V}, B+5};
111+
parse(<<16#84, V:8/binary, _/binary>>, B) ->
112+
{{as_is, 16#84, V}, B+9};
113+
parse(<<16#94, V:16/binary, _/binary>>, B) ->
114+
{{as_is, 16#94, V}, B+17};
115115
parse(<<Type, _/binary>>, B) ->
116116
throw({primitive_type_unsupported, Type, {position, B}}).
117117

@@ -317,17 +317,17 @@ pm(<<16#e0, S:8,CountAndV:S/binary,R/binary>>, O, B) ->
317317
pm(<<16#f0, S:32,CountAndV:S/binary,R/binary>>, O, B) ->
318318
[parse_array(32, CountAndV) | pm(R, O, B+5+S)];
319319
%% NaN or +-inf
320-
pm(<<16#72, V:32, R/binary>>, O, B) ->
321-
[{as_is, 16#72, <<V:32>>} | pm(R, O, B+5)];
322-
pm(<<16#82, V:64, R/binary>>, O, B) ->
323-
[{as_is, 16#82, <<V:64>>} | pm(R, O, B+9)];
320+
pm(<<16#72, V:4/binary, R/binary>>, O, B) ->
321+
[{as_is, 16#72, V} | pm(R, O, B+5)];
322+
pm(<<16#82, V:8/binary, R/binary>>, O, B) ->
323+
[{as_is, 16#82, V} | pm(R, O, B+9)];
324324
%% decimals
325-
pm(<<16#74, V:32, R/binary>>, O, B) ->
326-
[{as_is, 16#74, <<V:32>>} | pm(R, O, B+5)];
327-
pm(<<16#84, V:64, R/binary>>, O, B) ->
328-
[{as_is, 16#84, <<V:64>>} | pm(R, O, B+9)];
329-
pm(<<16#94, V:128, R/binary>>, O, B) ->
330-
[{as_is, 16#94, <<V:128>>} | pm(R, O, B+17)];
325+
pm(<<16#74, V:4/binary, R/binary>>, O, B) ->
326+
[{as_is, 16#74, V} | pm(R, O, B+5)];
327+
pm(<<16#84, V:8/binary, R/binary>>, O, B) ->
328+
[{as_is, 16#84, V} | pm(R, O, B+9)];
329+
pm(<<16#94, V:16/binary, R/binary>>, O, B) ->
330+
[{as_is, 16#94, V} | pm(R, O, B+17)];
331331
pm(<<Type, _Bin/binary>>, _O, B) ->
332332
throw({primitive_type_unsupported, Type, {position, B}}).
333333

deps/amqp10_common/test/binary_generator_SUITE.erl

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,34 @@ numerals(_Config) ->
9999
roundtrip({long, 0}),
100100
roundtrip({long, 16#7FFFFFFFFFFFFFFF}),
101101
roundtrip({long, -16#8000000000000000}),
102+
102103
roundtrip({float, 0.0}),
103104
roundtrip({float, 1.0}),
104105
roundtrip({float, -1.0}),
105106
roundtrip({double, 0.0}),
106107
roundtrip({double, 1.0}),
107108
roundtrip({double, -1.0}),
109+
110+
%% float +Inf
111+
roundtrip({as_is, 16#72, <<16#7F, 16#80, 16#00, 16#00>>}),
112+
%% double +Inf
113+
roundtrip({as_is, 16#82, <<16#7F, 16#F0, 16#00, 16#00,
114+
16#00, 16#00, 16#00, 16#00>>}),
115+
116+
%% decimal32
117+
roundtrip({as_is, 16#74, <<16#22, 16#50, 16#00, 16#00>>}), % 0
118+
roundtrip({as_is, 16#74, <<16#22, 16#50, 16#00, 16#2A>>}), % 42
119+
roundtrip({as_is, 16#74, <<16#A2, 16#40, 16#00, 16#48>>}), % -123.45
120+
roundtrip({as_is, 16#74, <<16#78, 16#00, 16#00, 16#00>>}), % +Infinity
121+
roundtrip({as_is, 16#74, <<16#7C, 16#00, 16#00, 16#00>>}), % NaN
122+
%% decimal64
123+
roundtrip({as_is, 16#84, <<16#22, 16#34, 16#00, 16#00,
124+
16#00, 16#00, 16#00, 16#00>>}), % 0
125+
%% decimal128
126+
roundtrip({as_is, 16#94, <<16#22, 16#08, 16#00, 16#00,
127+
16#00, 16#00, 16#00, 16#00,
128+
16#00, 16#00, 16#00, 16#00,
129+
16#00, 16#00, 16#00, 16#00>>}), % 0
108130
ok.
109131

110132
utf8(_Config) ->

deps/rabbit/src/mc.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
{list, [tagged_value()]} |
9191
{map, [{tagged_value(), tagged_value()}]} |
9292
{array, atom(), [tagged_value()]} |
93+
{as_is, TypeCode :: non_neg_integer(), binary()} |
9394
null |
9495
undefined.
9596

deps/rabbit/src/mc_amqpl.erl

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,15 +152,23 @@ convert_from(mc_amqp, Sections, Env) ->
152152
Type0
153153
end,
154154

155-
Headers0 = [to_091(K, V) || {{utf8, K}, V} <- AP,
156-
?IS_SHORTSTR_LEN(K)],
155+
Headers0 = lists:filtermap(fun({_K, {as_is, _, _}}) ->
156+
false;
157+
({{utf8, K}, V})
158+
when ?IS_SHORTSTR_LEN(K) ->
159+
{true, to_091(K, V)};
160+
(_) ->
161+
false
162+
end, AP),
157163
%% Add remaining x- message annotations as headers
158164
XHeaders = lists:filtermap(fun({{symbol, <<"x-cc">>}, V}) ->
159165
{true, to_091(<<"CC">>, V)};
160166
({{symbol, <<"x-opt-rabbitmq-received-time">>}, {timestamp, Ts}}) ->
161167
{true, {<<"timestamp_in_ms">>, long, Ts}};
162168
({{symbol, <<"x-opt-deaths">>}, V}) ->
163169
convert_from_amqp_deaths(V);
170+
({_K, {as_is, _, _}}) ->
171+
false;
164172
({{symbol, <<"x-", _/binary>> = K}, V})
165173
when ?IS_SHORTSTR_LEN(K) ->
166174
case is_internal_header(K) of
@@ -766,12 +774,23 @@ to_091(Key, null) -> {Key, void, undefined};
766774
to_091(Key, {list, L}) ->
767775
to_091_array(Key, L);
768776
to_091(Key, {map, M}) ->
769-
{Key, table, [to_091(unwrap(K), V) || {K, V} <- M]};
777+
T = lists:filtermap(fun({K, V}) when element(1, K) =:= as_is orelse
778+
element(1, V) =:= as_is ->
779+
false;
780+
({K, V}) ->
781+
{true, to_091(unwrap(K), V)}
782+
end, M),
783+
{Key, table, T};
770784
to_091(Key, {array, _T, L}) ->
771785
to_091_array(Key, L).
772786

773787
to_091_array(Key, L) ->
774-
{Key, array, [to_091(V) || V <- L]}.
788+
A = lists:filtermap(fun({as_is, _, _}) ->
789+
false;
790+
(V) ->
791+
{true, to_091(V)}
792+
end, L),
793+
{Key, array, A}.
775794

776795
to_091({utf8, V}) -> {longstr, V};
777796
to_091({symbol, V}) -> {longstr, V};

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,8 @@ groups() ->
172172
x_cc_annotation_exchange_routing_key_empty,
173173
x_cc_annotation_queue,
174174
x_cc_annotation_null,
175-
bad_x_cc_annotation_exchange
175+
bad_x_cc_annotation_exchange,
176+
decimal_types
176177
]},
177178

178179
{cluster_size_3, [shuffle],
@@ -6589,6 +6590,69 @@ bad_x_cc_annotation_exchange(Config) ->
65896590
ok = end_session_sync(Session),
65906591
ok = close_connection_sync(Connection).
65916592

6593+
%% Test that RabbitMQ can store and forward AMQP decimal types.
6594+
decimal_types(Config) ->
6595+
QName = atom_to_binary(?FUNCTION_NAME),
6596+
Address = rabbitmq_amqp_address:queue(QName),
6597+
{_, Session, LinkPair} = Init = init(Config),
6598+
{ok, _} = rabbitmq_amqp_client:declare_queue(
6599+
LinkPair, QName,
6600+
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}),
6601+
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address),
6602+
ok = wait_for_credit(Sender),
6603+
6604+
Decimal32Zero = <<16#22, 16#50, 0, 0>>,
6605+
Decimal64Zero = <<16#22, 16#34, 0, 0, 0, 0, 0, 0>>,
6606+
Decimal128Zero = <<16#22, 16#08, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>,
6607+
Decimal3242 = <<16#22, 16#50, 16#00, 16#2A>>, % 42
6608+
Decimal32NaN = <<16#7C, 0, 0, 0>>,
6609+
Body = #'v1_0.amqp_value'{content = {list, [{as_is, 16#74, Decimal32Zero},
6610+
{as_is, 16#84, Decimal64Zero},
6611+
{as_is, 16#94, Decimal128Zero}]}},
6612+
MsgAnns = #{<<"x-decimal-32">> => {as_is, 16#74, Decimal3242},
6613+
<<"x-decimal-64">> => {as_is, 16#84, Decimal64Zero},
6614+
<<"x-decimal-128">> => {as_is, 16#94, Decimal128Zero},
6615+
<<"x-list">> => {list, [{as_is, 16#94, Decimal128Zero}]},
6616+
<<"x-map">> => {map, [{{utf8, <<"key-1">>},
6617+
{as_is, 16#94, Decimal128Zero}}]}},
6618+
AppProps = #{<<"decimal-32">> => {as_is, 16#74, Decimal32NaN}},
6619+
Msg0 = amqp10_msg:set_message_annotations(
6620+
MsgAnns,
6621+
amqp10_msg:set_application_properties(
6622+
AppProps,
6623+
amqp10_msg:new(<<"tag">>, Body))),
6624+
ok = amqp10_client:send_msg(Sender, Msg0),
6625+
ok = wait_for_accepted(<<"tag">>),
6626+
ok = amqp10_client:send_msg(Sender, Msg0),
6627+
ok = wait_for_accepted(<<"tag">>),
6628+
ok = detach_link_sync(Sender),
6629+
6630+
%% Consume the first message via AMQP 1.0
6631+
{ok, Receiver} = amqp10_client:attach_receiver_link(
6632+
Session, <<"receiver">>, Address, unsettled),
6633+
{ok, Msg} = amqp10_client:get_msg(Receiver),
6634+
?assertEqual(Body, amqp10_msg:body(Msg)),
6635+
?assertMatch(#{<<"x-decimal-32">> := {as_is, 16#74, Decimal3242},
6636+
<<"x-decimal-64">> := {as_is, 16#84, Decimal64Zero},
6637+
<<"x-decimal-128">> := {as_is, 16#94, Decimal128Zero},
6638+
<<"x-list">> := [{as_is, 16#94, Decimal128Zero}],
6639+
<<"x-map">> := [{{utf8, <<"key-1">>},
6640+
{as_is, 16#94, Decimal128Zero}}]},
6641+
amqp10_msg:message_annotations(Msg)),
6642+
?assertEqual(AppProps, amqp10_msg:application_properties(Msg)),
6643+
ok = amqp10_client:accept_msg(Receiver, Msg),
6644+
ok = detach_link_sync(Receiver),
6645+
6646+
%% Consume the second message via AMQP 0.9.1
6647+
%% We expect to receive the message without any crashes.
6648+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
6649+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{}},
6650+
amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = true})),
6651+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
6652+
6653+
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
6654+
ok = close(Init).
6655+
65926656
%% Attach a receiver to an unavailable quorum queue.
65936657
attach_to_down_quorum_queue(Config) ->
65946658
QName = <<"q-down">>,

0 commit comments

Comments
 (0)