Skip to content

Commit 5cf0afb

Browse files
ansdmergify[bot]
authored andcommitted
Convert array from AMQP 1.0 to AMQP 0.9.1
Fix the following crash when an AMQP 0.9.1 client consumes an AMQP 1.0 encoded message that contains an array value in message annotations: ``` crasher: initial call: rabbit_channel:init/1 pid: <0.685.0> registered_name: [] exception exit: {function_clause, [{mc_amqpl,to_091, [<<"x-array">>, {array,utf8,[{utf8,<<"e1">>},{utf8,<<"e2">>}]}], [{file,"mc_amqpl.erl"},{line,737}]}, {mc_amqpl,'-convert_from/3-fun-3-',1, [{file,"mc_amqpl.erl"},{line,168}]}, {lists,filtermap_1,2, [{file,"lists.erl"},{line,2279}]}, {mc_amqpl,convert_from,3, [{file,"mc_amqpl.erl"},{line,158}]}, {mc,convert,3,[{file,"mc.erl"},{line,332}]}, {rabbit_channel,handle_deliver0,4, [{file,"rabbit_channel.erl"},{line,2619}]}, {lists,foldl_1,3,[{file,"lists.erl"},{line,2151}]}, {lists,foldl,3,[{file,"lists.erl"},{line,2146}]}]} ``` (cherry picked from commit 814d44d) # Conflicts: # deps/amqp10_client/src/amqp10_msg.erl
1 parent 1c271d7 commit 5cf0afb

File tree

4 files changed

+35
-5
lines changed

4 files changed

+35
-5
lines changed

deps/amqp10_client/src/amqp10_msg.erl

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,8 +402,8 @@ set_delivery_annotations(
402402
Anns1 = #'v1_0.delivery_annotations'{content = maps:to_list(Anns)},
403403
Msg#amqp10_msg{delivery_annotations = Anns1}.
404404

405-
-spec set_message_annotations(#{binary() => binary() | integer() | string()},
406-
amqp10_msg()) -> amqp10_msg().
405+
-spec set_message_annotations(#{binary() => binary() | number() | string() | tuple()},
406+
amqp10_msg()) -> amqp10_msg().
407407
set_message_annotations(Props,
408408
#amqp10_msg{message_annotations = undefined} =
409409
Msg) ->
@@ -433,7 +433,16 @@ wrap_ap_value(V) when is_integer(V) ->
433433
case V < 0 of
434434
true -> {int, V};
435435
false -> {uint, V}
436+
<<<<<<< HEAD
436437
end.
438+
=======
439+
end;
440+
wrap_ap_value(V) when is_number(V) ->
441+
%% AMQP double and Erlang float are both 64-bit.
442+
{double, V};
443+
wrap_ap_value(TaggedValue) when is_tuple(TaggedValue) ->
444+
TaggedValue.
445+
>>>>>>> 814d44dd82 (Convert array from AMQP 1.0 to AMQP 0.9.1)
437446

438447
%% LOCAL
439448
header_value(durable, undefined) -> false;

deps/rabbit/src/mc_amqpl.erl

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -754,9 +754,14 @@ to_091(Key, false) -> {Key, bool, false};
754754
to_091(Key, undefined) -> {Key, void, undefined};
755755
to_091(Key, null) -> {Key, void, undefined};
756756
to_091(Key, {list, L}) ->
757-
{Key, array, [to_091(V) || V <- L]};
757+
to_091_array(Key, L);
758758
to_091(Key, {map, M}) ->
759-
{Key, table, [to_091(unwrap(K), V) || {K, V} <- M]}.
759+
{Key, table, [to_091(unwrap(K), V) || {K, V} <- M]};
760+
to_091(Key, {array, _T, L}) ->
761+
to_091_array(Key, L).
762+
763+
to_091_array(Key, L) ->
764+
{Key, array, [to_091(V) || V <- L]}.
760765

761766
to_091({utf8, V}) -> {longstr, V};
762767
to_091({symbol, V}) -> {longstr, V};

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1328,6 +1328,13 @@ amqp_amqpl(QType, Config) ->
13281328
message_format = {uint, 0}},
13291329
Body1,
13301330
Footer])),
1331+
%% Send with an array value in message annotations.
1332+
ok = amqp10_client:send_msg(
1333+
Sender,
1334+
amqp10_msg:set_message_annotations(
1335+
#{<<"x-array">> => {array, utf8, [{utf8, <<"e1">>},
1336+
{utf8, <<"e2">>}]}},
1337+
amqp10_msg:new(<<>>, Body1, true))),
13311338

13321339
ok = amqp10_client:detach_link(Sender),
13331340
flush(detached),
@@ -1407,6 +1414,13 @@ amqp_amqpl(QType, Config) ->
14071414
?assertEqual([Body1, Footer], amqp10_framing:decode_bin(Payload10))
14081415
after 5000 -> ct:fail({missing_deliver, ?LINE})
14091416
end,
1417+
receive {_, #amqp_msg{payload = Payload11,
1418+
props = #'P_basic'{headers = Headers11}}} ->
1419+
?assertEqual([Body1], amqp10_framing:decode_bin(Payload11)),
1420+
?assertEqual({array, [{longstr, <<"e1">>}, {longstr, <<"e2">>}]},
1421+
rabbit_misc:table_lookup(Headers11, <<"x-array">>))
1422+
after 5000 -> ct:fail({missing_deliver, ?LINE})
1423+
end,
14101424

14111425
ok = rabbit_ct_client_helpers:close_channel(Ch),
14121426
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),

deps/rabbit/test/mc_unit_SUITE.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,8 @@ amqp_amqpl(_Config) ->
532532
MAC = [
533533
{{symbol, <<"x-stream-filter">>}, {utf8, <<"apple">>}},
534534
thead2('x-list', list, [utf8(<<"l">>)]),
535-
thead2('x-map', map, [{utf8(<<"k">>), utf8(<<"v">>)}])
535+
thead2('x-map', map, [{utf8(<<"k">>), utf8(<<"v">>)}]),
536+
{{symbol, <<"x-array">>}, {array, utf8, [{utf8, <<"a">>}]}}
536537
],
537538
M = #'v1_0.message_annotations'{content = MAC},
538539
P = #'v1_0.properties'{content_type = {symbol, <<"ctype">>},
@@ -598,6 +599,7 @@ amqp_amqpl(_Config) ->
598599
?assertMatch({_, longstr, <<"apple">>}, header(<<"x-stream-filter">>, HL)),
599600
?assertMatch({_ ,array, [{longstr,<<"l">>}]}, header(<<"x-list">>, HL)),
600601
?assertMatch({_, table, [{<<"k">>,longstr,<<"v">>}]}, header(<<"x-map">>, HL)),
602+
?assertMatch({_, array, [{longstr, <<"a">>}]}, header(<<"x-array">>, HL)),
601603

602604
?assertMatch({_, long, 5}, header(<<"long">>, HL)),
603605
?assertMatch({_, long, 5}, header(<<"ulong">>, HL)),

0 commit comments

Comments
 (0)