Skip to content

Commit e16c472

Browse files
committed
Mc: restore recovery of routing_keys and echange annotations
When initialising from an AMQP packet.
1 parent 95b6df7 commit e16c472

File tree

2 files changed

+38
-13
lines changed

2 files changed

+38
-13
lines changed

deps/rabbit/src/mc_amqp.erl

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -622,16 +622,41 @@ encode_deaths(Deaths) ->
622622
{map, Map}
623623
end, Deaths).
624624

625-
essential_properties(Msg) ->
625+
essential_properties(#msg_body_encoded{message_annotations = MA} = Msg) ->
626626
Durable = get_property(durable, Msg),
627627
Priority = get_property(priority, Msg),
628628
Timestamp = get_property(timestamp, Msg),
629629
Ttl = get_property(ttl, Msg),
630-
Anns = #{?ANN_DURABLE => Durable},
631-
maps_put_truthy(
632-
?ANN_PRIORITY, Priority,
633-
maps_put_truthy(
634-
?ANN_TIMESTAMP, Timestamp,
635-
maps_put_truthy(
636-
ttl, Ttl,
637-
Anns))).
630+
Anns0 = #{?ANN_DURABLE => Durable},
631+
Anns = maps_put_truthy(
632+
?ANN_PRIORITY, Priority,
633+
maps_put_truthy(
634+
?ANN_TIMESTAMP, Timestamp,
635+
maps_put_truthy(
636+
ttl, Ttl,
637+
Anns0))),
638+
case MA of
639+
[] ->
640+
Anns;
641+
_ ->
642+
lists:foldl(
643+
fun ({{symbol, <<"x-routing-key">>},
644+
{utf8, Key}}, Acc) ->
645+
maps:update_with(?ANN_ROUTING_KEYS,
646+
fun(L) -> [Key | L] end,
647+
[Key],
648+
Acc);
649+
({{symbol, <<"x-cc">>},
650+
{list, CCs0}}, Acc) ->
651+
CCs = [CC || {_T, CC} <- CCs0],
652+
maps:update_with(?ANN_ROUTING_KEYS,
653+
fun(L) -> L ++ CCs end,
654+
CCs,
655+
Acc);
656+
({{symbol, <<"x-exchange">>},
657+
{utf8, Exchange}}, Acc) ->
658+
Acc#{?ANN_EXCHANGE => Exchange};
659+
(_, Acc) ->
660+
Acc
661+
end, Anns, MA)
662+
end.

deps/rabbit/test/mc_unit_SUITE.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -460,11 +460,11 @@ amqpl_cc_amqp_bin_amqpl(_Config) ->
460460
{utf8, <<"q2">>}]}},
461461
mc:x_headers(Msg10)),
462462

463-
%% Here, we simulate what rabbit_stream_queue does:
464-
Msg10b = mc:set_annotation(?ANN_EXCHANGE, <<"exch">>, Msg10),
465-
Msg10c = mc:set_annotation(?ANN_ROUTING_KEYS, [<<"apple">>, <<"q1">>, <<"q2">>], Msg10b),
463+
%% routing keys and exchange annotations should be restored
464+
?assertEqual(RoutingKeys, mc:routing_keys(Msg10)),
465+
?assertEqual(<<"exch">>, mc:exchange(Msg10)),
466466

467-
MsgL2 = mc:convert(mc_amqpl, Msg10c),
467+
MsgL2 = mc:convert(mc_amqpl, Msg10),
468468
?assertEqual(RoutingKeys, mc:routing_keys(MsgL2)),
469469
?assertMatch(#content{properties = #'P_basic'{headers = Headers}},
470470
mc:protocol_state(MsgL2)).

0 commit comments

Comments
 (0)