Skip to content

Commit fb91185

Browse files
authored
Various message container fixes and improvements (#9278)
* AMQP encoded bodies should be converted to amqp correctly Fix for AMQP encoded amqpl payloads. Also removing some headers added during amqpl->amqpl conversions that duplicate information in the amqp header. * we should not need to prepre for read toset annotations * fix tagged_prop() type spec * tagged_prop() -> tagged_value()
1 parent 1962076 commit fb91185

File tree

4 files changed

+78
-57
lines changed

4 files changed

+78
-57
lines changed

deps/rabbit/src/mc.erl

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,18 @@
6464
integer() |
6565
float() |
6666
boolean().
67-
-type tagged_prop() :: {uuid, binary()} |
68-
{utf8, binary()} |
69-
{binary, binary()} |
70-
{boolean, boolean()} |
71-
{long, integer()} |
72-
{ulong, non_neg_integer() } |
73-
{list, [tagged_prop()]} |
74-
{map, [{tagged_prop(), tagged_prop()}]} |
75-
undefined.
67+
-type tagged_value() :: {uuid, binary()} |
68+
{utf8, binary()} |
69+
{binary, binary()} |
70+
{boolean, boolean()} |
71+
{double | float, float()} |
72+
{long | int | short | byte, integer()} |
73+
{ulong | uint | ushort | ubyte, non_neg_integer()} |
74+
{timestamp, non_neg_integer()} |
75+
{list, [tagged_value()]} |
76+
{map, [{tagged_value(), tagged_value()}]} |
77+
null |
78+
undefined.
7679

7780
%% behaviour callbacks for protocol specific implementation
7881

@@ -90,12 +93,12 @@
9093
%% retrieve and x- header from the protocol data
9194
%% the return value should be tagged with an AMQP 1.0 type
9295
-callback x_header(binary(), proto_state()) ->
93-
tagged_prop().
96+
tagged_value().
9497

9598
%% retrieve a property field from the protocol data
9699
%% e.g. message_id, correlation_id
97100
-callback property(atom(), proto_state()) ->
98-
tagged_prop().
101+
tagged_value().
99102

100103
%% return a map of header values used for message routing,
101104
%% optionally include x- headers and / or complex types (i.e. tables, arrays etc)
@@ -173,7 +176,7 @@ set_annotation(Key, Value, BasicMessage) ->
173176
mc_compat:set_annotation(Key, Value, BasicMessage).
174177

175178
-spec x_header(Key :: binary(), state()) ->
176-
tagged_prop().
179+
tagged_value().
177180
x_header(Key, #?MODULE{protocol = Proto,
178181
annotations = Anns,
179182
data = Data}) ->

deps/rabbit/src/mc_amqpl.erl

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,15 @@ convert_to(mc_amqp, #content{payload_fragments_rev = Payload} = Content) ->
282282
end,
283283
%% TODO: only add header section if at least one of the fields
284284
%% needs to be set
285+
Ttl = case Expiration of
286+
undefined ->
287+
undefined;
288+
_ ->
289+
binary_to_integer(Expiration)
290+
end,
291+
285292
H = #'v1_0.header'{durable = DelMode =:= 2,
293+
ttl = wrap(uint, Ttl),
286294
%% TODO: check Priority is a ubyte?
287295
priority = wrap(ubyte, Priority)},
288296
P = case amqp10_section_header(?AMQP10_PROPERTIES_HEADER, Headers) of
@@ -327,21 +335,23 @@ convert_to(mc_amqp, #content{payload_fragments_rev = Payload} = Content) ->
327335
is_binary(K)
328336
],
329337

330-
%% properties that _are_ potentially used by the broker
331-
%% are stored as message annotations
332-
%% an alternative woud be to store priority and delivery mode in
333-
%% the amqp (1.0) header section using the dura
334-
MAC = map_add(symbol, <<"x-basic-type">>, utf8, Type,
335-
map_add(symbol, <<"x-basic-priority">>, ubyte, Priority,
336-
map_add(symbol, <<"x-basic-delivery-mode">>, ubyte, DelMode,
337-
map_add(symbol, <<"x-basic-expiration">>, utf8, Expiration,
338-
MAC0)))),
338+
%% `type' doesn't have a direct equivalent so adding as
339+
%% a message annotation here
340+
MAC = map_add(symbol, <<"x-basic-type">>, utf8, Type, MAC0),
339341
#'v1_0.message_annotations'{content = MAC};
340342
Section ->
341343
Section
342344
end,
343345

344-
Sections = [H, P, AP, MA, #'v1_0.data'{content = lists:reverse(Payload)}],
346+
BodySections = case Type of
347+
?AMQP10_TYPE ->
348+
amqp10_framing:decode_bin(
349+
iolist_to_binary(lists:reverse(Payload)));
350+
_ ->
351+
[#'v1_0.data'{content = lists:reverse(Payload)}]
352+
end,
353+
354+
Sections = [H, MA, P, AP | BodySections],
345355
mc_amqp:convert_from(mc_amqp, Sections);
346356
convert_to(_TargetProto, _Content) ->
347357
not_implemented.

deps/rabbit/src/mc_compat.erl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,10 @@ set_annotation(routing_keys, Value, #basic_message{} = Msg) ->
6363
set_annotation(exchange, Value, #basic_message{exchange_name = Ex} = Msg) ->
6464
Msg#basic_message{exchange_name = Ex#resource{name = Value}};
6565
set_annotation(<<"x-", _/binary>> = Key, Value,
66-
#basic_message{content =
67-
#content{properties =
68-
#'P_basic'{headers = H0} = B} = C0} = Msg) ->
66+
#basic_message{content = Content0} = Msg) ->
67+
#content{properties =
68+
#'P_basic'{headers = H0} = B} = C0 =
69+
rabbit_binary_parser:ensure_content_decoded(Content0),
6970
T = case Value of
7071
_ when is_integer(Value) ->
7172
long;

deps/rabbit/test/mc_SUITE.erl

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ all_tests() ->
2727
amqpl_death_records,
2828
amqpl_amqp_bin_amqpl,
2929
amqp_amqpl,
30-
amqp_to_amqpl_data_body
30+
amqp_to_amqpl_data_body,
31+
amqp_amqpl_amqp_bodies
3132
].
3233

3334
groups() ->
@@ -443,41 +444,47 @@ amqp_to_amqpl_data_body(_Config) ->
443444
iolist_to_binary(PayFrag))
444445
end, Cases).
445446

446-
amqp10_non_single_data_bodies(_Config) ->
447+
amqp_amqpl_amqp_bodies(_Config) ->
447448
Props = #'P_basic'{type = <<"amqp-1.0">>},
448-
Payloads = [
449-
[#'v1_0.data'{content = <<"hello">>},
450-
#'v1_0.data'{content = <<"brave">>},
451-
#'v1_0.data'{content = <<"new">>},
452-
#'v1_0.data'{content = <<"world">>}
453-
],
454-
#'v1_0.amqp_value'{content = {utf8, <<"hello world">>}},
455-
[#'v1_0.amqp_sequence'{content = [{utf8, <<"one">>},
456-
{utf8, <<"blah">>}]},
457-
#'v1_0.amqp_sequence'{content = [{utf8, <<"two">>}]}
458-
]
459-
],
449+
Bodies = [
450+
#'v1_0.data'{content = <<"helo world">>},
451+
[#'v1_0.data'{content = <<"hello">>},
452+
#'v1_0.data'{content = <<"brave">>},
453+
#'v1_0.data'{content = <<"new">>},
454+
#'v1_0.data'{content = <<"world">>}
455+
],
456+
#'v1_0.amqp_value'{content = {utf8, <<"hello world">>}},
457+
[#'v1_0.amqp_sequence'{content = [{utf8, <<"one">>},
458+
{utf8, <<"blah">>}]},
459+
#'v1_0.amqp_sequence'{content = [{utf8, <<"two">>}]}
460+
]
461+
],
460462

461463
[begin
462464
EncodedPayload = amqp10_encode_bin(Payload),
463465

464-
MsgRecord0 = rabbit_msg_record:from_amqp091(Props, EncodedPayload),
465-
MsgRecord = rabbit_msg_record:init(
466-
iolist_to_binary(rabbit_msg_record:to_iodata(MsgRecord0))),
467-
{PropsOut, PayloadEncodedOut} = rabbit_msg_record:to_amqp091(MsgRecord),
468-
PayloadOut = case amqp10_framing:decode_bin(iolist_to_binary(PayloadEncodedOut)) of
469-
L when length(L) =:= 1 ->
470-
lists:nth(1, L);
471-
L ->
472-
L
466+
Ex = #resource{virtual_host = <<"/">>,
467+
kind = exchange,
468+
name = <<"ex">>},
469+
LegacyMsg = mc_amqpl:message(Ex, <<"rkey">>,
470+
#content{payload_fragments_rev =
471+
lists:reverse(EncodedPayload),
472+
properties = Props},
473+
#{}, true),
474+
475+
AmqpMsg = mc:convert(mc_amqp, LegacyMsg),
476+
%% drop any non body sections
477+
BodySections = lists:nthtail(3, mc:protocol_state(AmqpMsg)),
478+
479+
AssertBody = case is_list(Payload) of
480+
true ->
481+
Payload;
482+
false ->
483+
[Payload]
473484
end,
474-
475-
?assertEqual(Props, PropsOut),
476-
?assertEqual(iolist_to_binary(EncodedPayload),
477-
iolist_to_binary(PayloadEncodedOut)),
478-
?assertEqual(Payload, PayloadOut)
479-
480-
end || Payload <- Payloads],
485+
% ct:pal("ProtoState ~p", [BodySections]),
486+
?assertEqual(AssertBody, BodySections)
487+
end || Payload <- Bodies],
481488
ok.
482489

483490
unsupported_091_header_is_dropped(_Config) ->
@@ -630,9 +637,9 @@ reuse_amqp10_binary_chunks(_Config) ->
630637
ok.
631638

632639
amqp10_encode_bin(L) when is_list(L) ->
633-
iolist_to_binary([amqp10_encode_bin(X) || X <- L]);
640+
[iolist_to_binary(amqp10_framing:encode_bin(X)) || X <- L];
634641
amqp10_encode_bin(X) ->
635-
iolist_to_binary(amqp10_framing:encode_bin(X)).
642+
[iolist_to_binary(amqp10_framing:encode_bin(X))].
636643

637644
%% Utility
638645

0 commit comments

Comments
 (0)