Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 76 additions & 6 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,29 @@
Val :: term()}].
-type opt(T) :: T | undefined.

%% This representation was used in v3.13.7. 4.x understands this record for
%% backward compatibility, specifically for the rare case where:
%% 1. a 3.13 node internally parsed a message from a stream via
%% ```
%% Message = mc:init(mc_amqp, amqp10_framing:decode_bin(Bin), #{})
%% ```
%% 2. published this Message to a queue
%% 3. RabbitMQ got upgraded to 4.x
%%
%% This record along with all its conversions in this module can therefore
%% be deleted in some future RabbitMQ version once it's safe to assume that
%% these upgraded messages have all been consumed.
-record(msg,
{
header :: opt(#'v1_0.header'{}),
delivery_annotations = []:: list(),
message_annotations = [] :: list(),
properties :: opt(#'v1_0.properties'{}),
application_properties = [] :: list(),
data = [] :: amqp10_data(),
footer = [] :: list()
}).

%% This representation is used when the message was originally sent with
%% a protocol other than AMQP and the message was not read from a stream.
-record(msg_body_decoded,
Expand Down Expand Up @@ -97,7 +120,7 @@
body_code :: body_descriptor_code()
}).

-opaque state() :: #msg_body_decoded{} | #msg_body_encoded{} | #v1{}.
-opaque state() :: #msg{} | #msg_body_decoded{} | #msg_body_encoded{} | #v1{}.

-export_type([state/0]).

Expand Down Expand Up @@ -128,6 +151,8 @@ convert_from(?MODULE, Sections, _Env) when is_list(Sections) ->
convert_from(_SourceProto, _, _Env) ->
not_implemented.

convert_to(?MODULE, Msg = #msg{}, _Env) ->
convert_from_3_13_msg(Msg);
convert_to(?MODULE, Msg, _Env) ->
Msg;
convert_to(TargetProto, Msg, Env) ->
Expand All @@ -139,7 +164,22 @@ size(#v1{message_annotations = MA,
[] -> 0;
_ -> ?MESSAGE_ANNOTATIONS_GUESS_SIZE
end,
{MetaSize, byte_size(Body)}.
{MetaSize, byte_size(Body)};
%% Copied from v3.13.7.
%% This might be called in rabbit_fifo_v3 and must therefore not be modified
%% to ensure determinism of quorum queues version 3.
size(#msg{data = Body}) ->
BodySize = if is_list(Body) ->
lists:foldl(
fun(#'v1_0.data'{content = Data}, Acc) ->
iolist_size(Data) + Acc;
(#'v1_0.amqp_sequence'{content = _}, Acc) ->
Acc
end, 0, Body);
is_record(Body, 'v1_0.amqp_value') ->
0
end,
{_MetaSize = 0, BodySize}.

x_header(Key, Msg) ->
message_annotation(Key, Msg, undefined).
Expand All @@ -151,6 +191,10 @@ property(_Prop, #msg_body_encoded{properties = undefined}) ->
undefined;
property(Prop, #msg_body_encoded{properties = Props}) ->
property0(Prop, Props);
property(_Prop, #msg{properties = undefined}) ->
undefined;
property(Prop, #msg{properties = Props}) ->
property0(Prop, Props);
property(_Prop, #v1{bare_and_footer_properties_pos = ?OMITTED_SECTION}) ->
undefined;
property(Prop, #v1{bare_and_footer = Bin,
Expand Down Expand Up @@ -298,7 +342,9 @@ protocol_state(#v1{message_annotations = MA0,
ttl = Ttl}, Anns),
MA = protocol_state_message_annotations(MA0, Anns),
Sections = to_sections(Header, MA, []),
[encode(Sections), BareAndFooter].
[encode(Sections), BareAndFooter];
protocol_state(#msg{} = Msg, Anns) ->
protocol_state(convert_from_3_13_msg(Msg), Anns).

prepare(read, Msg) ->
Msg;
Expand All @@ -322,7 +368,9 @@ prepare(store, #msg_body_encoded{
bare_and_footer_application_properties_pos = AppPropsPos,
bare_and_footer_body_pos = BodyPos,
body_code = BodyCode
}.
};
prepare(store, Msg = #msg{}) ->
Msg.

%% internal

Expand Down Expand Up @@ -379,7 +427,9 @@ msg_to_sections(#v1{message_annotations = MAC,
Sections = amqp10_framing:decode_bin(Bin),
Sections ++ [{amqp_encoded_body_and_footer, BodyAndFooterBin}]
end,
to_sections(undefined, MAC, Tail).
to_sections(undefined, MAC, Tail);
msg_to_sections(#msg{} = Msg) ->
msg_to_sections(convert_from_3_13_msg(Msg)).

to_sections(H, MAC, P, APC, Tail) ->
S0 = case APC of
Expand Down Expand Up @@ -410,6 +460,20 @@ to_sections(H, MAC, Tail) ->
[H | S]
end.

convert_from_3_13_msg(#msg{header = H,
delivery_annotations = _,
message_annotations = MAC,
properties = P,
application_properties = APC,
data = Data,
footer = FC}) ->
#msg_body_decoded{header = H,
message_annotations = MAC,
properties = P,
application_properties = APC,
data = Data,
footer = FC}.

-spec protocol_state_message_annotations(amqp_annotations(), mc:annotations()) ->
amqp_annotations().
protocol_state_message_annotations(MA, Anns) ->
Expand Down Expand Up @@ -482,11 +546,14 @@ message_annotation(Key, State, Default)

message_annotations(#msg_body_decoded{message_annotations = L}) -> L;
message_annotations(#msg_body_encoded{message_annotations = L}) -> L;
message_annotations(#v1{message_annotations = L}) -> L.
message_annotations(#v1{message_annotations = L}) -> L;
message_annotations(#msg{message_annotations = L}) -> L.

message_annotations_as_simple_map(#msg_body_encoded{message_annotations = Content}) ->
message_annotations_as_simple_map0(Content);
message_annotations_as_simple_map(#v1{message_annotations = Content}) ->
message_annotations_as_simple_map0(Content);
message_annotations_as_simple_map(#msg{message_annotations = Content}) ->
message_annotations_as_simple_map0(Content).

message_annotations_as_simple_map0(Content) ->
Expand All @@ -501,6 +568,9 @@ message_annotations_as_simple_map0(Content) ->
application_properties_as_simple_map(
#msg_body_encoded{application_properties = Content}, L) ->
application_properties_as_simple_map0(Content, L);
application_properties_as_simple_map(
#msg{application_properties = Content}, L) ->
application_properties_as_simple_map0(Content, L);
application_properties_as_simple_map(
#v1{bare_and_footer_application_properties_pos = ?OMITTED_SECTION}, L) ->
L;
Expand Down
Loading