Skip to content

Commit 95dd670

Browse files
ansdmergify[bot]
authored andcommitted
Handle mc_amqp 3.13 msg record in 4.x
The `msg` record was used in 3.13. This commit makes 4.x understand this record for backward compatibility, specifically for the rare case where: 1. a 3.13 node internally parsed a message from a stream via ``` Message = mc:init(mc_amqp, amqp10_framing:decode_bin(Bin), #{}) ``` 2. published this Message to a queue 3. RabbitMQ got upgraded to 4.x (This commit can be reverted in some future RabbitMQ version once it's safe to assume that these upgraded messages have been consumed.) The changes were manually tested as described in Jira RMQ-1525. (cherry picked from commit 91f5ce2) (cherry picked from commit 75cffc9)
1 parent 63e6f4a commit 95dd670

File tree

1 file changed

+76
-6
lines changed

1 file changed

+76
-6
lines changed

deps/rabbit/src/mc_amqp.erl

Lines changed: 76 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,29 @@
4747
Val :: term()}].
4848
-type opt(T) :: T | undefined.
4949

50+
%% This representation was used in v3.13.7. 4.x understands this record for
51+
%% backward compatibility, specifically for the rare case where:
52+
%% 1. a 3.13 node internally parsed a message from a stream via
53+
%% ```
54+
%% Message = mc:init(mc_amqp, amqp10_framing:decode_bin(Bin), #{})
55+
%% ```
56+
%% 2. published this Message to a queue
57+
%% 3. RabbitMQ got upgraded to 4.x
58+
%%
59+
%% This record along with all its conversions in this module can therefore
60+
%% be deleted in some future RabbitMQ version once it's safe to assume that
61+
%% these upgraded messages have all been consumed.
62+
-record(msg,
63+
{
64+
header :: opt(#'v1_0.header'{}),
65+
delivery_annotations = []:: list(),
66+
message_annotations = [] :: list(),
67+
properties :: opt(#'v1_0.properties'{}),
68+
application_properties = [] :: list(),
69+
data = [] :: amqp10_data(),
70+
footer = [] :: list()
71+
}).
72+
5073
%% This representation is used when the message was originally sent with
5174
%% a protocol other than AMQP and the message was not read from a stream.
5275
-record(msg_body_decoded,
@@ -94,7 +117,7 @@
94117
body_code :: body_descriptor_code()
95118
}).
96119

97-
-opaque state() :: #msg_body_decoded{} | #msg_body_encoded{} | #v1{}.
120+
-opaque state() :: #msg{} | #msg_body_decoded{} | #msg_body_encoded{} | #v1{}.
98121

99122
-export_type([state/0]).
100123

@@ -109,6 +132,8 @@ convert_from(?MODULE, Sections, _Env) when is_list(Sections) ->
109132
convert_from(_SourceProto, _, _Env) ->
110133
not_implemented.
111134

135+
convert_to(?MODULE, Msg = #msg{}, _Env) ->
136+
convert_from_3_13_msg(Msg);
112137
convert_to(?MODULE, Msg, _Env) ->
113138
Msg;
114139
convert_to(TargetProto, Msg, Env) ->
@@ -120,7 +145,22 @@ size(#v1{message_annotations = MA,
120145
[] -> 0;
121146
_ -> ?MESSAGE_ANNOTATIONS_GUESS_SIZE
122147
end,
123-
{MetaSize, byte_size(Body)}.
148+
{MetaSize, byte_size(Body)};
149+
%% Copied from v3.13.7.
150+
%% This might be called in rabbit_fifo_v3 and must therefore not be modified
151+
%% to ensure determinism of quorum queues version 3.
152+
size(#msg{data = Body}) ->
153+
BodySize = if is_list(Body) ->
154+
lists:foldl(
155+
fun(#'v1_0.data'{content = Data}, Acc) ->
156+
iolist_size(Data) + Acc;
157+
(#'v1_0.amqp_sequence'{content = _}, Acc) ->
158+
Acc
159+
end, 0, Body);
160+
is_record(Body, 'v1_0.amqp_value') ->
161+
0
162+
end,
163+
{_MetaSize = 0, BodySize}.
124164

125165
x_header(Key, Msg) ->
126166
message_annotation(Key, Msg, undefined).
@@ -129,6 +169,10 @@ property(_Prop, #msg_body_encoded{properties = undefined}) ->
129169
undefined;
130170
property(Prop, #msg_body_encoded{properties = Props}) ->
131171
property0(Prop, Props);
172+
property(_Prop, #msg{properties = undefined}) ->
173+
undefined;
174+
property(Prop, #msg{properties = Props}) ->
175+
property0(Prop, Props);
132176
property(_Prop, #v1{bare_and_footer_properties_pos = ?OMITTED_SECTION}) ->
133177
undefined;
134178
property(Prop, #v1{bare_and_footer = Bin,
@@ -260,7 +304,9 @@ protocol_state(#v1{message_annotations = MA0,
260304
ttl = Ttl}, Anns),
261305
MA = protocol_state_message_annotations(MA0, Anns),
262306
Sections = to_sections(Header, MA, []),
263-
[encode(Sections), BareAndFooter].
307+
[encode(Sections), BareAndFooter];
308+
protocol_state(#msg{} = Msg, Anns) ->
309+
protocol_state(convert_from_3_13_msg(Msg), Anns).
264310

265311
prepare(read, Msg) ->
266312
Msg;
@@ -284,7 +330,9 @@ prepare(store, #msg_body_encoded{
284330
bare_and_footer_application_properties_pos = AppPropsPos,
285331
bare_and_footer_body_pos = BodyPos,
286332
body_code = BodyCode
287-
}.
333+
};
334+
prepare(store, Msg = #msg{}) ->
335+
Msg.
288336

289337
%% internal
290338

@@ -341,7 +389,9 @@ msg_to_sections(#v1{message_annotations = MAC,
341389
Sections = amqp10_framing:decode_bin(Bin),
342390
Sections ++ [{amqp_encoded_body_and_footer, BodyAndFooterBin}]
343391
end,
344-
to_sections(undefined, MAC, Tail).
392+
to_sections(undefined, MAC, Tail);
393+
msg_to_sections(#msg{} = Msg) ->
394+
msg_to_sections(convert_from_3_13_msg(Msg)).
345395

346396
to_sections(H, MAC, P, APC, Tail) ->
347397
S0 = case APC of
@@ -372,6 +422,20 @@ to_sections(H, MAC, Tail) ->
372422
[H | S]
373423
end.
374424

425+
convert_from_3_13_msg(#msg{header = H,
426+
delivery_annotations = _,
427+
message_annotations = MAC,
428+
properties = P,
429+
application_properties = APC,
430+
data = Data,
431+
footer = FC}) ->
432+
#msg_body_decoded{header = H,
433+
message_annotations = MAC,
434+
properties = P,
435+
application_properties = APC,
436+
data = Data,
437+
footer = FC}.
438+
375439
-spec protocol_state_message_annotations(amqp_annotations(), mc:annotations()) ->
376440
amqp_annotations().
377441
protocol_state_message_annotations(MA, Anns) ->
@@ -444,11 +508,14 @@ message_annotation(Key, State, Default)
444508

445509
message_annotations(#msg_body_decoded{message_annotations = L}) -> L;
446510
message_annotations(#msg_body_encoded{message_annotations = L}) -> L;
447-
message_annotations(#v1{message_annotations = L}) -> L.
511+
message_annotations(#v1{message_annotations = L}) -> L;
512+
message_annotations(#msg{message_annotations = L}) -> L.
448513

449514
message_annotations_as_simple_map(#msg_body_encoded{message_annotations = Content}) ->
450515
message_annotations_as_simple_map0(Content);
451516
message_annotations_as_simple_map(#v1{message_annotations = Content}) ->
517+
message_annotations_as_simple_map0(Content);
518+
message_annotations_as_simple_map(#msg{message_annotations = Content}) ->
452519
message_annotations_as_simple_map0(Content).
453520

454521
message_annotations_as_simple_map0(Content) ->
@@ -463,6 +530,9 @@ message_annotations_as_simple_map0(Content) ->
463530
application_properties_as_simple_map(
464531
#msg_body_encoded{application_properties = Content}, L) ->
465532
application_properties_as_simple_map0(Content, L);
533+
application_properties_as_simple_map(
534+
#msg{application_properties = Content}, L) ->
535+
application_properties_as_simple_map0(Content, L);
466536
application_properties_as_simple_map(
467537
#v1{bare_and_footer_application_properties_pos = ?OMITTED_SECTION}, L) ->
468538
L;

0 commit comments

Comments
 (0)