Skip to content

Commit 8f78d64

Browse files
kjnilssonmergify[bot]
authored andcommitted
Mc: introduce new function in mc_amqp to init mc from stream.
Initialising a message container from data stored in a stream is a special case where we need to recover exchange and routing key information from the following message annatations: * x-exchange * x-routing-keys * x-cc We do not want to do this when initialising a message container from AMQP data just received from a publisher. This commit introduces a new function `mc_amqp:init_from_stream/2` that is to be used when needing a message container from a stream message. (cherry picked from commit 32615bf) (cherry picked from commit 91e3180) # Conflicts: # deps/rabbit/src/mc_amqp.erl # deps/rabbit/src/rabbit_stream_queue.erl
1 parent fca6f1a commit 8f78d64

File tree

2 files changed

+43
-1
lines changed

2 files changed

+43
-1
lines changed

deps/rabbit/src/mc_amqp.erl

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
prepare/2
1717
]).
1818

19+
-export([init_from_stream/2]).
20+
1921
-import(rabbit_misc,
2022
[maps_put_truthy/3]).
2123

@@ -98,10 +100,26 @@
98100

99101
-export_type([state/0]).
100102

103+
%% API
104+
105+
-spec init_from_stream(binary(), mc:annotations()) ->
106+
mc:state().
107+
init_from_stream(Payload, #{} = Anns0) ->
108+
Sections = amqp10_framing:decode_bin(Payload, [server_mode]),
109+
Msg = msg_body_encoded(Sections, Payload, #msg_body_encoded{}),
110+
%% when initalising from stored stream data the recovered
111+
%% annotations take precendence over the ones provided
112+
Anns = maps:merge(Anns0, essential_properties(Msg, recover)),
113+
mc:init(?MODULE, Msg, Anns).
114+
115+
%% CALLBACKS
116+
117+
init(#msg_body_encoded{} = Msg) ->
118+
{Msg, #{}};
101119
init(Payload) ->
102120
Sections = amqp10_framing:decode_bin(Payload, [server_mode]),
103121
Msg = msg_body_encoded(Sections, Payload, #msg_body_encoded{}),
104-
Anns = essential_properties(Msg),
122+
Anns = essential_properties(Msg, new),
105123
{Msg, Anns}.
106124

107125
convert_from(?MODULE, Sections, _Env) when is_list(Sections) ->
@@ -602,7 +620,11 @@ encode_deaths(Deaths) ->
602620
{map, Map}
603621
end, Deaths).
604622

623+
<<<<<<< HEAD
605624
essential_properties(#msg_body_encoded{message_annotations = MA} = Msg) ->
625+
=======
626+
essential_properties(#msg_body_encoded{} = Msg, new) ->
627+
>>>>>>> 91e3180a5 (Mc: introduce new function in mc_amqp to init mc from stream.)
606628
Durable = get_property(durable, Msg),
607629
Priority = get_property(priority, Msg),
608630
Timestamp = get_property(timestamp, Msg),
@@ -615,6 +637,12 @@ essential_properties(#msg_body_encoded{message_annotations = MA} = Msg) ->
615637
maps_put_truthy(
616638
ttl, Ttl,
617639
Anns0))),
640+
<<<<<<< HEAD
641+
=======
642+
Anns;
643+
essential_properties(#msg_body_encoded{message_annotations = MA} = Msg, recover) ->
644+
Anns = essential_properties(Msg, new),
645+
>>>>>>> 91e3180a5 (Mc: introduce new function in mc_amqp to init mc from stream.)
618646
case MA of
619647
[] ->
620648
Anns;

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1288,6 +1288,7 @@ parse_uncompressed_subbatch(
12881288
end,
12891289
parse_uncompressed_subbatch(Rem, Offset + 1, StartOffset, QName, Name, LocalPid, Acc).
12901290

1291+
<<<<<<< HEAD
12911292
entry_to_msg(Entry, Offset, #resource{kind = queue,
12921293
name = QName}, Name, LocalPid) ->
12931294
Mc0 = mc:init(mc_amqp, Entry, #{}),
@@ -1304,6 +1305,19 @@ entry_to_msg(Entry, Offset, #resource{kind = queue,
13041305
end,
13051306
Mc = mc:set_annotation(<<"x-stream-offset">>, Offset, Mc2),
13061307
{Name, LocalPid, Offset, false, Mc}.
1308+
=======
1309+
entry_to_msg(Entry, Offset, #resource{kind = queue, name = QName},
1310+
Name, LocalPid, Filter) ->
1311+
Mc = mc_amqp:init_from_stream(Entry, #{?ANN_EXCHANGE => <<>>,
1312+
?ANN_ROUTING_KEYS => [QName],
1313+
<<"x-stream-offset">> => Offset}),
1314+
case rabbit_amqp_filtex:filter(Filter, Mc) of
1315+
true ->
1316+
{Name, LocalPid, Offset, false, Mc};
1317+
false ->
1318+
none
1319+
end.
1320+
>>>>>>> 91e3180a5 (Mc: introduce new function in mc_amqp to init mc from stream.)
13071321

13081322
capabilities() ->
13091323
#{unsupported_policies => [%% Classic policies

0 commit comments

Comments
 (0)