diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index 06a923763da9..9e3ac9a74aec 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -17,6 +17,8 @@ prepare/2 ]). +-export([init_from_stream/2]). + -import(rabbit_misc, [maps_put_truthy/3]). @@ -99,10 +101,26 @@ -export_type([state/0]). +%% API + +-spec init_from_stream(binary(), mc:annotations()) -> + mc:state(). +init_from_stream(Payload, #{} = Anns0) -> + Sections = amqp10_framing:decode_bin(Payload, [server_mode]), + Msg = msg_body_encoded(Sections, Payload, #msg_body_encoded{}), + %% when initalising from stored stream data the recovered + %% annotations take precendence over the ones provided + Anns = maps:merge(Anns0, essential_properties(Msg, recover)), + mc:init(?MODULE, Msg, Anns). + +%% CALLBACKS + +init(#msg_body_encoded{} = Msg) -> + {Msg, #{}}; init(Payload) -> Sections = amqp10_framing:decode_bin(Payload, [server_mode]), Msg = msg_body_encoded(Sections, Payload, #msg_body_encoded{}), - Anns = essential_properties(Msg), + Anns = essential_properties(Msg, new), {Msg, Anns}. convert_from(?MODULE, Sections, _Env) when is_list(Sections) -> @@ -622,16 +640,44 @@ encode_deaths(Deaths) -> {map, Map} end, Deaths). -essential_properties(Msg) -> +essential_properties(#msg_body_encoded{} = Msg, new) -> Durable = get_property(durable, Msg), Priority = get_property(priority, Msg), Timestamp = get_property(timestamp, Msg), Ttl = get_property(ttl, Msg), - Anns = #{?ANN_DURABLE => Durable}, - maps_put_truthy( - ?ANN_PRIORITY, Priority, - maps_put_truthy( - ?ANN_TIMESTAMP, Timestamp, - maps_put_truthy( - ttl, Ttl, - Anns))). + Anns0 = #{?ANN_DURABLE => Durable}, + Anns = maps_put_truthy( + ?ANN_PRIORITY, Priority, + maps_put_truthy( + ?ANN_TIMESTAMP, Timestamp, + maps_put_truthy( + ttl, Ttl, + Anns0))), + Anns; +essential_properties(#msg_body_encoded{message_annotations = MA} = Msg, recover) -> + Anns = essential_properties(Msg, new), + case MA of + [] -> + Anns; + _ -> + lists:foldl( + fun ({{symbol, <<"x-routing-key">>}, + {utf8, Key}}, Acc) -> + maps:update_with(?ANN_ROUTING_KEYS, + fun(L) -> [Key | L] end, + [Key], + Acc); + ({{symbol, <<"x-cc">>}, + {list, CCs0}}, Acc) -> + CCs = [CC || {_T, CC} <- CCs0], + maps:update_with(?ANN_ROUTING_KEYS, + fun(L) -> L ++ CCs end, + CCs, + Acc); + ({{symbol, <<"x-exchange">>}, + {utf8, Exchange}}, Acc) -> + Acc#{?ANN_EXCHANGE => Exchange}; + (_, Acc) -> + Acc + end, Anns, MA) + end. diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 2e4cac1a2c59..7840ec213628 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -1305,39 +1305,11 @@ parse_uncompressed_subbatch( parse_uncompressed_subbatch(Rem, Offset + 1, StartOffset, QName, Name, LocalPid, Filter, Acc). -entry_to_msg(Entry, Offset, #resource{kind = queue, name = QName}, Name, LocalPid, Filter) -> - Mc0 = mc:init(mc_amqp, Entry, #{}), - %% If exchange or routing keys annotation isn't present the entry most likely came - %% from the rabbitmq-stream plugin so we'll choose defaults that simulate use - %% of the direct exchange. - XHeaders = mc:x_headers(Mc0), - Exchange = case XHeaders of - #{<<"x-exchange">> := {utf8, X}} -> - X; - _ -> - <<>> - end, - RKeys0 = case XHeaders of - #{<<"x-cc">> := {list, CCs}} -> - [CC || {utf8, CC} <- CCs]; - _ -> - [] - end, - RKeys1 = case XHeaders of - #{<<"x-routing-key">> := {utf8, RK}} -> - [RK | RKeys0]; - _ -> - RKeys0 - end, - RKeys = case RKeys1 of - [] -> - [QName]; - _ -> - RKeys1 - end, - Mc1 = mc:set_annotation(?ANN_EXCHANGE, Exchange, Mc0), - Mc2 = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1), - Mc = mc:set_annotation(<<"x-stream-offset">>, Offset, Mc2), +entry_to_msg(Entry, Offset, #resource{kind = queue, name = QName}, + Name, LocalPid, Filter) -> + Mc = mc_amqp:init_from_stream(Entry, #{?ANN_EXCHANGE => <<>>, + ?ANN_ROUTING_KEYS => [QName], + <<"x-stream-offset">> => Offset}), case rabbit_amqp_filtex:filter(Filter, Mc) of true -> {Name, LocalPid, Offset, false, Mc};