Skip to content

Commit 632629f

Browse files
committed
Mc: introduce new function in mc_amqp to recover mc from stream.
This includes all the routing key recovery / defaulting needed.
1 parent e16c472 commit 632629f

File tree

3 files changed

+31
-39
lines changed

3 files changed

+31
-39
lines changed

deps/rabbit/src/mc_amqp.erl

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
prepare/2
1818
]).
1919

20+
-export([recover_from_stream/2]).
21+
2022
-import(rabbit_misc,
2123
[maps_put_truthy/3]).
2224

@@ -99,10 +101,24 @@
99101

100102
-export_type([state/0]).
101103

104+
%% API
105+
106+
-spec recover_from_stream(binary(), mc:annotations()) ->
107+
mc:state().
108+
recover_from_stream(Payload, Anns0) ->
109+
Sections = amqp10_framing:decode_bin(Payload, [server_mode]),
110+
Msg = msg_body_encoded(Sections, Payload, #msg_body_encoded{}),
111+
Anns = maps:merge(Anns0, essential_properties(Msg, recover)),
112+
mc:init(?MODULE, Msg, Anns).
113+
114+
%% CALLBACKS
115+
116+
init(#msg_body_encoded{} = Msg) ->
117+
{Msg, #{}};
102118
init(Payload) ->
103119
Sections = amqp10_framing:decode_bin(Payload, [server_mode]),
104120
Msg = msg_body_encoded(Sections, Payload, #msg_body_encoded{}),
105-
Anns = essential_properties(Msg),
121+
Anns = essential_properties(Msg, new),
106122
{Msg, Anns}.
107123

108124
convert_from(?MODULE, Sections, _Env) when is_list(Sections) ->
@@ -622,7 +638,7 @@ encode_deaths(Deaths) ->
622638
{map, Map}
623639
end, Deaths).
624640

625-
essential_properties(#msg_body_encoded{message_annotations = MA} = Msg) ->
641+
essential_properties(#msg_body_encoded{} = Msg, new) ->
626642
Durable = get_property(durable, Msg),
627643
Priority = get_property(priority, Msg),
628644
Timestamp = get_property(timestamp, Msg),
@@ -635,6 +651,9 @@ essential_properties(#msg_body_encoded{message_annotations = MA} = Msg) ->
635651
maps_put_truthy(
636652
ttl, Ttl,
637653
Anns0))),
654+
Anns;
655+
essential_properties(#msg_body_encoded{message_annotations = MA} = Msg, recover) ->
656+
Anns = essential_properties(Msg, new),
638657
case MA of
639658
[] ->
640659
Anns;

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1305,39 +1305,12 @@ parse_uncompressed_subbatch(
13051305
parse_uncompressed_subbatch(Rem, Offset + 1, StartOffset, QName,
13061306
Name, LocalPid, Filter, Acc).
13071307

1308-
entry_to_msg(Entry, Offset, #resource{kind = queue, name = QName}, Name, LocalPid, Filter) ->
1309-
Mc0 = mc:init(mc_amqp, Entry, #{}),
1310-
%% If exchange or routing keys annotation isn't present the entry most likely came
1311-
%% from the rabbitmq-stream plugin so we'll choose defaults that simulate use
1312-
%% of the direct exchange.
1313-
XHeaders = mc:x_headers(Mc0),
1314-
Exchange = case XHeaders of
1315-
#{<<"x-exchange">> := {utf8, X}} ->
1316-
X;
1317-
_ ->
1318-
<<>>
1319-
end,
1320-
RKeys0 = case XHeaders of
1321-
#{<<"x-cc">> := {list, CCs}} ->
1322-
[CC || {utf8, CC} <- CCs];
1323-
_ ->
1324-
[]
1325-
end,
1326-
RKeys1 = case XHeaders of
1327-
#{<<"x-routing-key">> := {utf8, RK}} ->
1328-
[RK | RKeys0];
1329-
_ ->
1330-
RKeys0
1331-
end,
1332-
RKeys = case RKeys1 of
1333-
[] ->
1334-
[QName];
1335-
_ ->
1336-
RKeys1
1337-
end,
1338-
Mc1 = mc:set_annotation(?ANN_EXCHANGE, Exchange, Mc0),
1339-
Mc2 = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1),
1340-
Mc = mc:set_annotation(<<"x-stream-offset">>, Offset, Mc2),
1308+
entry_to_msg(Entry, Offset, #resource{kind = queue, name = QName},
1309+
Name, LocalPid, Filter) ->
1310+
Mc0 = mc_amqp:recover_from_stream(Entry, #{?ANN_EXCHANGE => <<>>,
1311+
?ANN_ROUTING_KEYS => [QName],
1312+
<<"x-stream-offset">> => Offset}),
1313+
Mc = mc:set_annotation(<<"x-stream-offset">>, Offset, Mc0),
13411314
case rabbit_amqp_filtex:filter(Filter, Mc) of
13421315
true ->
13431316
{Name, LocalPid, Offset, false, Mc};

deps/rabbit/test/mc_unit_SUITE.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -460,11 +460,11 @@ amqpl_cc_amqp_bin_amqpl(_Config) ->
460460
{utf8, <<"q2">>}]}},
461461
mc:x_headers(Msg10)),
462462

463-
%% routing keys and exchange annotations should be restored
464-
?assertEqual(RoutingKeys, mc:routing_keys(Msg10)),
465-
?assertEqual(<<"exch">>, mc:exchange(Msg10)),
463+
%% Here, we simulate what rabbit_stream_queue does:
464+
Msg10b = mc:set_annotation(?ANN_EXCHANGE, <<"exch">>, Msg10),
465+
Msg10c = mc:set_annotation(?ANN_ROUTING_KEYS, [<<"apple">>, <<"q1">>, <<"q2">>], Msg10b),
466466

467-
MsgL2 = mc:convert(mc_amqpl, Msg10),
467+
MsgL2 = mc:convert(mc_amqpl, Msg10c),
468468
?assertEqual(RoutingKeys, mc:routing_keys(MsgL2)),
469469
?assertMatch(#content{properties = #'P_basic'{headers = Headers}},
470470
mc:protocol_state(MsgL2)).

0 commit comments

Comments
 (0)