Skip to content

Commit ca13675

Browse files
committed
Use list instead of array for x-cc message annotation
1 parent 715d421 commit ca13675

File tree

8 files changed

+83
-98
lines changed

8 files changed

+83
-98
lines changed

deps/rabbit/src/mc_amqpl.erl

Lines changed: 3 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -708,23 +708,10 @@ from_091(binary, V) -> {binary, V};
708708
from_091(timestamp, V) -> {timestamp, V * 1000};
709709
from_091(byte, V) -> {byte, V};
710710
from_091(void, _V) -> null;
711+
from_091(array, L) ->
712+
{list, [from_091(T, V) || {T, V} <- L]};
711713
from_091(table, L) ->
712-
{map, [{wrap(symbol, K), from_091(T, V)} || {K, T, V} <- L]};
713-
from_091(array, []) ->
714-
{list, []};
715-
from_091(array, L0 = [{T0, _} | _]) ->
716-
{L = [{T1, _} | _], {Monomorphic, _}} =
717-
lists:mapfoldl(fun({T, V}, {Mono0, PrevType}) ->
718-
Mono = case Mono0 of
719-
false -> false;
720-
true -> T =:= PrevType
721-
end,
722-
{from_091(T, V), {Mono, T}}
723-
end, {true, T0}, L0),
724-
case Monomorphic of
725-
true -> {array, T1, L};
726-
false -> {list, L}
727-
end.
714+
{map, [{wrap(symbol, K), from_091(T, V)} || {K, T, V} <- L]}.
728715

729716
map_add(_T, _Key, _Type, undefined, Acc) ->
730717
Acc;
@@ -887,24 +874,3 @@ amqp10_section_header(Header, Headers) ->
887874

888875
amqp_encoded_binary(Section) ->
889876
iolist_to_binary(amqp10_framing:encode_bin(Section)).
890-
891-
-ifdef(TEST).
892-
-include_lib("eunit/include/eunit.hrl").
893-
894-
from_091_array_test() ->
895-
{list, []} = from_091(array, []),
896-
{array, utf8, [{utf8, <<"e1">>}]} = from_091(array, [{longstr, <<"e1">>}]),
897-
{array, utf8, [{utf8, <<"e1">>},
898-
{utf8, <<"e2">>}]} = from_091(array, [{longstr, <<"e1">>},
899-
{longstr, <<"e2">>}]),
900-
{list, [{utf8, <<"e1">>},
901-
{binary, <<"e2">>}]} = from_091(array, [{longstr, <<"e1">>},
902-
{binary, <<"e2">>}]),
903-
{list, [{utf8, <<"e1">>},
904-
{binary, <<"e2">>},
905-
{utf8, <<"e3">>},
906-
{utf8, <<"e4">>}]} = from_091(array, [{longstr, <<"e1">>},
907-
{binary, <<"e2">>},
908-
{longstr, <<"e3">>},
909-
{longstr, <<"e4">>}]).
910-
-endif.

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2495,21 +2495,27 @@ lookup_routing_key(X = #exchange{name = #resource{name = XNameBin}},
24952495
RKeys = [RKey],
24962496
Mc = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1),
24972497
{ok, X, RKeys, Mc, PermCache};
2498-
{array, utf8, CCs0} ->
2499-
CCs = lists:map(fun({utf8, CC}) -> CC end, CCs0),
2500-
RKeys = [RKey | CCs],
2501-
Mc = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1),
2502-
{ok, X, RKeys, Mc, PermCache};
2498+
{list, CCs0} = L ->
2499+
try lists:map(fun({utf8, CC}) -> CC end, CCs0) of
2500+
CCs ->
2501+
RKeys = [RKey | CCs],
2502+
Mc = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1),
2503+
{ok, X, RKeys, Mc, PermCache}
2504+
catch error:function_clause ->
2505+
{error, {anonymous_terminus, AnonTerm}, bad_x_cc(L)}
2506+
end;
25032507
BadValue ->
2504-
Desc = unicode:characters_to_binary(
2505-
lists:flatten(
2506-
io_lib:format(
2507-
"bad value for 'x-cc' message-annotation: ~tp", [BadValue]))),
2508-
{error, {anonymous_terminus, AnonTerm},
2509-
#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD,
2510-
description = {utf8, Desc}}}
2508+
{error, {anonymous_terminus, AnonTerm}, bad_x_cc(BadValue)}
25112509
end.
25122510

2511+
bad_x_cc(Value) ->
2512+
Desc = unicode:characters_to_binary(
2513+
lists:flatten(
2514+
io_lib:format(
2515+
"bad value for 'x-cc' message-annotation: ~tp", [Value]))),
2516+
#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD,
2517+
description = {utf8, Desc}}.
2518+
25132519
process_routing_confirm([], _SenderSettles = true, _, U) ->
25142520
rabbit_global_counters:messages_unroutable_dropped(?PROTOCOL, 1),
25152521
{U, []};

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1283,7 +1283,7 @@ entry_to_msg(Entry, Offset, #resource{kind = queue, name = QName}, Name, LocalPi
12831283
<<>>
12841284
end,
12851285
RKeys0 = case XHeaders of
1286-
#{<<"x-cc">> := {array, utf8, CCs}} ->
1286+
#{<<"x-cc">> := {list, CCs}} ->
12871287
[CC || {utf8, CC} <- CCs];
12881288
_ ->
12891289
[]

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4449,7 +4449,7 @@ trace(Q, QType, Config) ->
44494449
ok = wait_for_credit(Sender),
44504450
Msg0 = amqp10_msg:new(<<"tag 1">>, Payload, true),
44514451
Msg = amqp10_msg:set_message_annotations(
4452-
#{<<"x-cc">> => {array, utf8, [{utf8, <<"my CC key">>}]}},
4452+
#{<<"x-cc">> => {list, [{utf8, <<"my CC key">>}]}},
44534453
amqp10_msg:set_properties(#{correlation_id => CorrelationId}, Msg0)),
44544454
ok = amqp10_client:send_msg(Sender, Msg),
44554455
{ok, _} = amqp10_client:get_msg(Receiver),
@@ -5993,7 +5993,7 @@ x_cc_annotation_exchange(Config) ->
59935993

59945994
Payload = <<"my message">>,
59955995
ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations(
5996-
#{<<"x-cc">> => {array, utf8, [{utf8, <<"key 2">>}]}},
5996+
#{<<"x-cc">> => {list, [{utf8, <<"key 2">>}]}},
59975997
amqp10_msg:new(<<"tag">>, Payload))),
59985998
ok = wait_for_accepted(<<"tag">>),
59995999
ok = amqp10_client:detach_link(Sender),
@@ -6028,8 +6028,8 @@ x_cc_annotation_exchange_routing_key_empty(Config) ->
60286028

60296029
Payload = <<"my message">>,
60306030
ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations(
6031-
#{<<"x-cc">> => {array, utf8, [{utf8, <<"key 1">>},
6032-
{utf8, <<"key 2">>}]}},
6031+
#{<<"x-cc">> => {list, [{utf8, <<"key 1">>},
6032+
{utf8, <<"key 2">>}]}},
60336033
amqp10_msg:new(<<"tag">>, Payload))),
60346034
ok = wait_for_accepted(<<"tag">>),
60356035
ok = amqp10_client:detach_link(Sender),
@@ -6063,7 +6063,7 @@ x_cc_annotation_queue(Config) ->
60636063

60646064
Payload = <<"my message">>,
60656065
ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations(
6066-
#{<<"x-cc">> => {array, utf8, [{utf8, QName2}]}},
6066+
#{<<"x-cc">> => {list, [{utf8, QName2}]}},
60676067
amqp10_msg:new(<<"tag">>, Payload))),
60686068
ok = wait_for_accepted(<<"tag">>),
60696069
ok = amqp10_client:detach_link(Sender),
@@ -6097,8 +6097,8 @@ x_cc_annotation_null(Config) ->
60976097
{ok, Receiver2} = amqp10_client:attach_receiver_link(Session, <<"receiver 2">>, QAddress2, settled),
60986098

60996099
Msg1 = amqp10_msg:set_message_annotations(
6100-
#{<<"x-cc">> => {array, utf8, [{utf8, <<"key-1">>},
6101-
{utf8, <<"key-3">>}]}},
6100+
#{<<"x-cc">> => {list, [{utf8, <<"key-1">>},
6101+
{utf8, <<"key-3">>}]}},
61026102
amqp10_msg:set_properties(
61036103
#{to => rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"🗝️-2"/utf8>>)},
61046104
amqp10_msg:new(<<"t1">>, <<"m1">>))),
@@ -6110,8 +6110,8 @@ x_cc_annotation_null(Config) ->
61106110
?assertEqual([<<"m1">>], amqp10_msg:body(R2M1)),
61116111

61126112
Msg2 = amqp10_msg:set_message_annotations(
6113-
#{<<"x-cc">> => {array, utf8, [{utf8, <<"🗝️-2"/utf8>>},
6114-
{utf8, <<"key-1">>}]}},
6113+
#{<<"x-cc">> => {list, [{utf8, <<"🗝️-2"/utf8>>},
6114+
{utf8, <<"key-1">>}]}},
61156115
amqp10_msg:set_properties(
61166116
#{to => rabbitmq_amqp_address:exchange(<<"amq.direct">>)},
61176117
amqp10_msg:new(<<"t2">>, <<"m2">>))),
@@ -6123,7 +6123,7 @@ x_cc_annotation_null(Config) ->
61236123
?assertEqual([<<"m2">>], amqp10_msg:body(R2M2)),
61246124

61256125
Msg3 = amqp10_msg:set_message_annotations(
6126-
#{<<"x-cc">> => {array, utf8, [{utf8, QName1}]}},
6126+
#{<<"x-cc">> => {list, [{utf8, QName1}]}},
61276127
amqp10_msg:set_properties(
61286128
#{to => rabbitmq_amqp_address:queue(QName2)},
61296129
amqp10_msg:new(<<"t3">>, <<"m3">>))),
@@ -6135,8 +6135,8 @@ x_cc_annotation_null(Config) ->
61356135
?assertEqual([<<"m3">>], amqp10_msg:body(R2M3)),
61366136

61376137
Msg4 = amqp10_msg:set_message_annotations(
6138-
%% We send a symbol array instead of utf8 array.
6139-
#{<<"x-cc">> => {array, symbol, [{symbol, QName1}]}},
6138+
%% We send a symbol instead of utf8..
6139+
#{<<"x-cc">> => {list, [{symbol, QName1}]}},
61406140
amqp10_msg:set_properties(
61416141
#{to => rabbitmq_amqp_address:queue(QName2)},
61426142
amqp10_msg:new(<<"t4">>, <<"m4">>))),
@@ -6169,23 +6169,41 @@ bad_x_cc_annotation_exchange(Config) ->
61696169
{ok, Session} = amqp10_client:begin_session(Connection),
61706170

61716171
Address = rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"key-1">>),
6172-
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address),
6173-
ok = wait_for_credit(Sender),
6174-
6172+
{ok, Sender1} = amqp10_client:attach_sender_link(Session, <<"sender 1">>, Address),
6173+
ok = wait_for_credit(Sender1),
61756174
ok = amqp10_client:send_msg(
6176-
Sender,
6175+
Sender1,
61776176
amqp10_msg:set_message_annotations(
6178-
%% We send a list instead of an array.
6179-
#{<<"x-cc">> => {list, [{utf8, <<"🗝️-2"/utf8>>}]}},
6180-
amqp10_msg:new(<<"tag">>, <<"msg">>))),
6181-
ok = wait_for_settlement(<<"tag">>, released),
6182-
receive {amqp10_event, {link, Sender, {detached, Error}}} ->
6177+
%% We send an array instead of a list.
6178+
#{<<"x-cc">> => {array, utf8, [{utf8, <<"🗝️-2"/utf8>>}]}},
6179+
amqp10_msg:new(<<"t1">>, <<"m1">>))),
6180+
ok = wait_for_settlement(<<"t1">>, released),
6181+
receive {amqp10_event, {link, Sender1, {detached, Error1}}} ->
61836182
?assertMatch(
61846183
#'v1_0.error'{
61856184
condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD,
61866185
description = {utf8, <<"bad value for 'x-cc' message-annotation: "
6187-
"{list,[{utf8,<<\"🗝️-2"/utf8, _Rest/binary>>}},
6188-
Error)
6186+
"{array,utf8,[{utf8,<<\"🗝️-2"/utf8, _Rest/binary>>}},
6187+
Error1)
6188+
after 5000 -> ct:fail({missing_event, ?LINE})
6189+
end,
6190+
6191+
{ok, Sender2} = amqp10_client:attach_sender_link(Session, <<"sender 2">>, Address),
6192+
ok = wait_for_credit(Sender2),
6193+
ok = amqp10_client:send_msg(
6194+
Sender2,
6195+
amqp10_msg:set_message_annotations(
6196+
%% We include a non-utf8 type in the list.
6197+
#{<<"x-cc">> => {list, [{symbol, <<"key-3">>}]}},
6198+
amqp10_msg:new(<<"t2">>, <<"m2">>))),
6199+
ok = wait_for_settlement(<<"t2">>, released),
6200+
receive {amqp10_event, {link, Sender2, {detached, Error2}}} ->
6201+
?assertEqual(
6202+
#'v1_0.error'{
6203+
condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD,
6204+
description = {utf8, <<"bad value for 'x-cc' message-annotation: "
6205+
"{list,[{symbol,<<\"key-3\">>}]}">>}},
6206+
Error2)
61896207
after 5000 -> ct:fail({missing_event, ?LINE})
61906208
end,
61916209

deps/rabbit/test/dead_lettering_SUITE.erl

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -174,22 +174,14 @@ end_per_group(Group, Config) ->
174174
Config
175175
end.
176176

177-
init_per_testcase(T = dead_letter_reject_expire_expire, Config) ->
178-
%% With feature flag message_containers_deaths_v2 disabled, this test is known to fail due to
179-
%% https://github.com/rabbitmq/rabbitmq-server/issues/11159
177+
init_per_testcase(T, Config)
178+
when T =:= dead_letter_reject_expire_expire orelse
179+
T =:= stream ->
180+
%% With feature flag message_containers_deaths_v2 disabled, test case:
181+
%% * dead_letter_reject_expire_expire is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11159
182+
%% * stream is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11173
180183
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2),
181184
init_per_testcase0(T, Config);
182-
init_per_testcase(T = stream, Config) ->
183-
%% With feature flag message_containers_deaths_v2 disabled, this test is known to fail due to
184-
%% https://github.com/rabbitmq/rabbitmq-server/issues/11173
185-
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2),
186-
case rabbit_ct_helpers:is_mixed_versions() of
187-
true ->
188-
{skip, "TODO unskip this test when the lower version got bumped to v4.0.3 "
189-
"which includes https://github.com/rabbitmq/rabbitmq-server/pull/12571"};
190-
false ->
191-
init_per_testcase0(T, Config)
192-
end;
193185
init_per_testcase(Testcase, Config) ->
194186
init_per_testcase0(Testcase, Config).
195187

deps/rabbit/test/mc_unit_SUITE.erl

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -189,11 +189,14 @@ amqpl_table_x_header_array_of_tbls(_Config) ->
189189
Content = #content{properties = Props,
190190
payload_fragments_rev = Payload},
191191
Msg = mc:init(mc_amqpl, Content, annotations()),
192-
?assertMatch({array, map,
193-
[{map, [{{symbol, <<"type">>}, {utf8, <<"apple">>}},
194-
{{symbol, <<"count">>}, {long, 99}}]},
195-
{map, [{{symbol, <<"type">>}, {utf8, <<"orange">>}},
196-
{{symbol, <<"count">>}, {long, 45}}]}]},
192+
?assertMatch({list,
193+
[{map,
194+
[{{symbol, <<"type">>}, {utf8, <<"apple">>}},
195+
{{symbol, <<"count">>}, {long, 99}}]},
196+
{map,
197+
[{{symbol, <<"type">>}, {utf8, <<"orange">>}},
198+
{{symbol, <<"count">>}, {long, 45}}]}
199+
]},
197200
mc:x_header(<<"x-fruit">>, Msg)).
198201

199202
amqpl_death_v1_records(_Config) ->
@@ -447,8 +450,8 @@ amqpl_cc_amqp_bin_amqpl(_Config) ->
447450
Msg10 = mc:init(mc_amqp, Sections, #{}),
448451
?assertMatch(#{<<"x-exchange">> := {utf8, <<"exch">>},
449452
<<"x-routing-key">> := {utf8, <<"apple">>},
450-
<<"x-cc">> := {array, utf8, [{utf8, <<"q1">>},
451-
{utf8, <<"q2">>}]}},
453+
<<"x-cc">> := {list, [{utf8, <<"q1">>},
454+
{utf8, <<"q2">>}]}},
452455
mc:x_headers(Msg10)),
453456

454457
%% Here, we simulate what rabbit_stream_queue does:

deps/rabbit/test/topic_permission_SUITE.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ amqp_x_cc_annotation(Config) ->
8282
%% We have permissions to send to both topics.
8383
%% Therefore, m1 should be sent to both queues.
8484
ok = amqp10_client:send_msg(Sender1, amqp10_msg:set_message_annotations(
85-
#{<<"x-cc">> => {array, utf8, [{utf8, <<"a.2">>}]}},
85+
#{<<"x-cc">> => {list, [{utf8, <<"a.2">>}]}},
8686
amqp10_msg:new(<<"t1">>, <<"m1">>, true))),
8787
{ok, Msg1} = amqp10_client:get_msg(Receiver1),
8888
{ok, Msg2} = amqp10_client:get_msg(Receiver2),
@@ -99,7 +99,7 @@ amqp_x_cc_annotation(Config) ->
9999
rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"x.1">>)),
100100
ok = amqp_utils:wait_for_credit(Sender2),
101101
ok = amqp10_client:send_msg(Sender2, amqp10_msg:set_message_annotations(
102-
#{<<"x-cc">> => {array, utf8, [{utf8, <<"a.2">>}]}},
102+
#{<<"x-cc">> => {list, [{utf8, <<"a.2">>}]}},
103103
amqp10_msg:new(<<"t2">>, <<"m2">>, true))),
104104
receive
105105
{amqp10_event,
@@ -122,7 +122,7 @@ amqp_x_cc_annotation(Config) ->
122122
rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"a.1">>)),
123123
ok = amqp_utils:wait_for_credit(Sender3),
124124
ok = amqp10_client:send_msg(Sender3, amqp10_msg:set_message_annotations(
125-
#{<<"x-cc">> => {array, utf8, [{utf8, <<"x.2">>}]}},
125+
#{<<"x-cc">> => {list, [{utf8, <<"x.2">>}]}},
126126
amqp10_msg:new(<<"t3">>, <<"m3">>, true))),
127127
receive
128128
{amqp10_event,

release-notes/4.1.0.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ Each metric is labelled by protocol (AMQP 1.0, AMQP 0.9.1, MQTT 5.0, MQTT 3.1.1,
2222

2323
### Support for Multiple Routing Keys in AMQP 1.0 via `x-cc` Message Annotation
2424
[PR #12559](https://github.com/rabbitmq/rabbitmq-server/pull/12559) enables AMQP 1.0 publishers to set multiple routing keys by using the `x-cc` message annotation.
25-
This annotation allows publishers to specify an [array](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-array) of routing keys ([strings](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-string)) for more flexible message distribution, similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1.
25+
This annotation allows publishers to specify a [list](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-list) of routing keys ([strings](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-string)) for more flexible message distribution, similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1.
2626

2727
## Potential incompatibilities
2828

0 commit comments

Comments
 (0)