From 8f78d644942e8544784a3858bf83c3337d349734 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Thu, 13 Feb 2025 08:15:58 +0000 Subject: [PATCH] Mc: introduce new function in mc_amqp to init mc from stream. Initialising a message container from data stored in a stream is a special case where we need to recover exchange and routing key information from the following message annatations: * x-exchange * x-routing-keys * x-cc We do not want to do this when initialising a message container from AMQP data just received from a publisher. This commit introduces a new function `mc_amqp:init_from_stream/2` that is to be used when needing a message container from a stream message. (cherry picked from commit 32615bf5f063b9767091b3472d8f55343aac7c9c) (cherry picked from commit 91e3180a5b18b989455b336699b33c12d7891421) # Conflicts: # deps/rabbit/src/mc_amqp.erl # deps/rabbit/src/rabbit_stream_queue.erl --- deps/rabbit/src/mc_amqp.erl | 30 ++++++++++++++++++++++++- deps/rabbit/src/rabbit_stream_queue.erl | 14 ++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index be63597c3f96..5d8b4ffb408f 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -16,6 +16,8 @@ prepare/2 ]). +-export([init_from_stream/2]). + -import(rabbit_misc, [maps_put_truthy/3]). @@ -98,10 +100,26 @@ -export_type([state/0]). +%% API + +-spec init_from_stream(binary(), mc:annotations()) -> + mc:state(). +init_from_stream(Payload, #{} = Anns0) -> + Sections = amqp10_framing:decode_bin(Payload, [server_mode]), + Msg = msg_body_encoded(Sections, Payload, #msg_body_encoded{}), + %% when initalising from stored stream data the recovered + %% annotations take precendence over the ones provided + Anns = maps:merge(Anns0, essential_properties(Msg, recover)), + mc:init(?MODULE, Msg, Anns). + +%% CALLBACKS + +init(#msg_body_encoded{} = Msg) -> + {Msg, #{}}; init(Payload) -> Sections = amqp10_framing:decode_bin(Payload, [server_mode]), Msg = msg_body_encoded(Sections, Payload, #msg_body_encoded{}), - Anns = essential_properties(Msg), + Anns = essential_properties(Msg, new), {Msg, Anns}. convert_from(?MODULE, Sections, _Env) when is_list(Sections) -> @@ -602,7 +620,11 @@ encode_deaths(Deaths) -> {map, Map} end, Deaths). +<<<<<<< HEAD essential_properties(#msg_body_encoded{message_annotations = MA} = Msg) -> +======= +essential_properties(#msg_body_encoded{} = Msg, new) -> +>>>>>>> 91e3180a5 (Mc: introduce new function in mc_amqp to init mc from stream.) Durable = get_property(durable, Msg), Priority = get_property(priority, Msg), Timestamp = get_property(timestamp, Msg), @@ -615,6 +637,12 @@ essential_properties(#msg_body_encoded{message_annotations = MA} = Msg) -> maps_put_truthy( ttl, Ttl, Anns0))), +<<<<<<< HEAD +======= + Anns; +essential_properties(#msg_body_encoded{message_annotations = MA} = Msg, recover) -> + Anns = essential_properties(Msg, new), +>>>>>>> 91e3180a5 (Mc: introduce new function in mc_amqp to init mc from stream.) case MA of [] -> Anns; diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 7b6eb0510a1f..7262e3275e6f 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -1288,6 +1288,7 @@ parse_uncompressed_subbatch( end, parse_uncompressed_subbatch(Rem, Offset + 1, StartOffset, QName, Name, LocalPid, Acc). +<<<<<<< HEAD entry_to_msg(Entry, Offset, #resource{kind = queue, name = QName}, Name, LocalPid) -> Mc0 = mc:init(mc_amqp, Entry, #{}), @@ -1304,6 +1305,19 @@ entry_to_msg(Entry, Offset, #resource{kind = queue, end, Mc = mc:set_annotation(<<"x-stream-offset">>, Offset, Mc2), {Name, LocalPid, Offset, false, Mc}. +======= +entry_to_msg(Entry, Offset, #resource{kind = queue, name = QName}, + Name, LocalPid, Filter) -> + Mc = mc_amqp:init_from_stream(Entry, #{?ANN_EXCHANGE => <<>>, + ?ANN_ROUTING_KEYS => [QName], + <<"x-stream-offset">> => Offset}), + case rabbit_amqp_filtex:filter(Filter, Mc) of + true -> + {Name, LocalPid, Offset, false, Mc}; + false -> + none + end. +>>>>>>> 91e3180a5 (Mc: introduce new function in mc_amqp to init mc from stream.) capabilities() -> #{unsupported_policies => [%% Classic policies