5050 Val :: term ()}].
5151-type opt (T ) :: T | undefined .
5252
53+ % % This representation was used in v3.13.7. 4.x understands this record for
54+ % % backward compatibility, specifically for the rare case where:
55+ % % 1. a 3.13 node internally parsed a message from a stream via
56+ % % ```
57+ % % Message = mc:init(mc_amqp, amqp10_framing:decode_bin(Bin), #{})
58+ % % ```
59+ % % 2. published this Message to a queue
60+ % % 3. RabbitMQ got upgraded to 4.x
61+ % %
62+ % % This record along with all its conversions in this module can therefore
63+ % % be deleted in some future RabbitMQ version once it's safe to assume that
64+ % % these upgraded messages have all been consumed.
65+ -record (msg ,
66+ {
67+ header :: opt (# 'v1_0.header' {}),
68+ delivery_annotations = []:: list (),
69+ message_annotations = [] :: list (),
70+ properties :: opt (# 'v1_0.properties' {}),
71+ application_properties = [] :: list (),
72+ data = [] :: amqp10_data (),
73+ footer = [] :: list ()
74+ }).
75+
5376% % This representation is used when the message was originally sent with
5477% % a protocol other than AMQP and the message was not read from a stream.
5578-record (msg_body_decoded ,
97120 body_code :: body_descriptor_code ()
98121 }).
99122
100- -opaque state () :: # msg_body_decoded {} | # msg_body_encoded {} | # v1 {}.
123+ -opaque state () :: # msg {} | # msg_body_decoded {} | # msg_body_encoded {} | # v1 {}.
101124
102125-export_type ([state / 0 ]).
103126
@@ -128,6 +151,8 @@ convert_from(?MODULE, Sections, _Env) when is_list(Sections) ->
128151convert_from (_SourceProto , _ , _Env ) ->
129152 not_implemented .
130153
154+ convert_to (? MODULE , Msg = # msg {}, _Env ) ->
155+ convert_from_3_13_msg (Msg );
131156convert_to (? MODULE , Msg , _Env ) ->
132157 Msg ;
133158convert_to (TargetProto , Msg , Env ) ->
@@ -139,7 +164,22 @@ size(#v1{message_annotations = MA,
139164 [] -> 0 ;
140165 _ -> ? MESSAGE_ANNOTATIONS_GUESS_SIZE
141166 end ,
142- {MetaSize , byte_size (Body )}.
167+ {MetaSize , byte_size (Body )};
168+ % % Copied from v3.13.7.
169+ % % This might be called in rabbit_fifo_v3 and must therefore not be modified
170+ % % to ensure determinism of quorum queues version 3.
171+ size (# msg {data = Body }) ->
172+ BodySize = if is_list (Body ) ->
173+ lists :foldl (
174+ fun (# 'v1_0.data' {content = Data }, Acc ) ->
175+ iolist_size (Data ) + Acc ;
176+ (# 'v1_0.amqp_sequence' {content = _ }, Acc ) ->
177+ Acc
178+ end , 0 , Body );
179+ is_record (Body , 'v1_0.amqp_value' ) ->
180+ 0
181+ end ,
182+ {_MetaSize = 0 , BodySize }.
143183
144184x_header (Key , Msg ) ->
145185 message_annotation (Key , Msg , undefined ).
@@ -151,6 +191,10 @@ property(_Prop, #msg_body_encoded{properties = undefined}) ->
151191 undefined ;
152192property (Prop , # msg_body_encoded {properties = Props }) ->
153193 property0 (Prop , Props );
194+ property (_Prop , # msg {properties = undefined }) ->
195+ undefined ;
196+ property (Prop , # msg {properties = Props }) ->
197+ property0 (Prop , Props );
154198property (_Prop , # v1 {bare_and_footer_properties_pos = ? OMITTED_SECTION }) ->
155199 undefined ;
156200property (Prop , # v1 {bare_and_footer = Bin ,
@@ -298,7 +342,9 @@ protocol_state(#v1{message_annotations = MA0,
298342 ttl = Ttl }, Anns ),
299343 MA = protocol_state_message_annotations (MA0 , Anns ),
300344 Sections = to_sections (Header , MA , []),
301- [encode (Sections ), BareAndFooter ].
345+ [encode (Sections ), BareAndFooter ];
346+ protocol_state (# msg {} = Msg , Anns ) ->
347+ protocol_state (convert_from_3_13_msg (Msg ), Anns ).
302348
303349prepare (read , Msg ) ->
304350 Msg ;
@@ -322,7 +368,9 @@ prepare(store, #msg_body_encoded{
322368 bare_and_footer_application_properties_pos = AppPropsPos ,
323369 bare_and_footer_body_pos = BodyPos ,
324370 body_code = BodyCode
325- }.
371+ };
372+ prepare (store , Msg = # msg {}) ->
373+ Msg .
326374
327375% % internal
328376
@@ -379,7 +427,9 @@ msg_to_sections(#v1{message_annotations = MAC,
379427 Sections = amqp10_framing :decode_bin (Bin ),
380428 Sections ++ [{amqp_encoded_body_and_footer , BodyAndFooterBin }]
381429 end ,
382- to_sections (undefined , MAC , Tail ).
430+ to_sections (undefined , MAC , Tail );
431+ msg_to_sections (# msg {} = Msg ) ->
432+ msg_to_sections (convert_from_3_13_msg (Msg )).
383433
384434to_sections (H , MAC , P , APC , Tail ) ->
385435 S0 = case APC of
@@ -410,6 +460,20 @@ to_sections(H, MAC, Tail) ->
410460 [H | S ]
411461 end .
412462
463+ convert_from_3_13_msg (# msg {header = H ,
464+ delivery_annotations = _ ,
465+ message_annotations = MAC ,
466+ properties = P ,
467+ application_properties = APC ,
468+ data = Data ,
469+ footer = FC }) ->
470+ # msg_body_decoded {header = H ,
471+ message_annotations = MAC ,
472+ properties = P ,
473+ application_properties = APC ,
474+ data = Data ,
475+ footer = FC }.
476+
413477-spec protocol_state_message_annotations (amqp_annotations (), mc :annotations ()) ->
414478 amqp_annotations ().
415479protocol_state_message_annotations (MA , Anns ) ->
@@ -482,11 +546,14 @@ message_annotation(Key, State, Default)
482546
483547message_annotations (# msg_body_decoded {message_annotations = L }) -> L ;
484548message_annotations (# msg_body_encoded {message_annotations = L }) -> L ;
485- message_annotations (# v1 {message_annotations = L }) -> L .
549+ message_annotations (# v1 {message_annotations = L }) -> L ;
550+ message_annotations (# msg {message_annotations = L }) -> L .
486551
487552message_annotations_as_simple_map (# msg_body_encoded {message_annotations = Content }) ->
488553 message_annotations_as_simple_map0 (Content );
489554message_annotations_as_simple_map (# v1 {message_annotations = Content }) ->
555+ message_annotations_as_simple_map0 (Content );
556+ message_annotations_as_simple_map (# msg {message_annotations = Content }) ->
490557 message_annotations_as_simple_map0 (Content ).
491558
492559message_annotations_as_simple_map0 (Content ) ->
@@ -501,6 +568,9 @@ message_annotations_as_simple_map0(Content) ->
501568application_properties_as_simple_map (
502569 # msg_body_encoded {application_properties = Content }, L ) ->
503570 application_properties_as_simple_map0 (Content , L );
571+ application_properties_as_simple_map (
572+ # msg {application_properties = Content }, L ) ->
573+ application_properties_as_simple_map0 (Content , L );
504574application_properties_as_simple_map (
505575 # v1 {bare_and_footer_application_properties_pos = ? OMITTED_SECTION }, L ) ->
506576 L ;
0 commit comments