Skip to content

Commit 45b9ad5

Browse files
committed
Return queue-prefix and topic-prefix in open properties
`queue-prefix` and `topic-prefix` properties of the `open` frame are not part of the AMQP spec. However, let's include these properties because they are also returned by ActiveMQ and Solace and understood by some client libs (e.g. ActiveMQ NMS.AMQP Client). Closes #12531
1 parent 3eba29c commit 45b9ad5

File tree

2 files changed

+56
-3
lines changed

2 files changed

+56
-3
lines changed

deps/rabbit/src/rabbit_amqp_reader.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,15 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) ->
136136
server_properties() ->
137137
Props0 = rabbit_reader:server_properties(amqp_1_0),
138138
Props1 = [{{symbol, K}, {utf8, V}} || {K, longstr, V} <- Props0],
139-
Props = [{{symbol, <<"node">>}, {utf8, atom_to_binary(node())}} | Props1],
139+
Props = [
140+
{{symbol, <<"node">>}, {utf8, atom_to_binary(node())}},
141+
%% queue-prefix and topic-prefix are not part of the AMQP spec.
142+
%% However, we include these properties because they are also returned by
143+
%% ActiveMQ and Solace and understood by some client libs (e.g. ActiveMQ NMS.AMQP Client).
144+
%% https://github.com/rabbitmq/rabbitmq-server/issues/12531
145+
{{symbol, <<"queue-prefix">>}, {utf8, <<"/queues/">>}},
146+
{{symbol, <<"topic-prefix">>}, {utf8, <<"/exchanges/amq.topic/">>}}
147+
] ++ Props1,
140148
{map, Props}.
141149

142150
%%--------------------------------------------------------------------------

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ groups() ->
156156
tcp_back_pressure_rabbitmq_internal_flow_quorum_queue,
157157
session_max_per_connection,
158158
link_max_per_session,
159-
reserved_annotation
159+
reserved_annotation,
160+
open_properties_target_address_prefix
160161
]},
161162

162163
{cluster_size_3, [shuffle],
@@ -4762,7 +4763,7 @@ dead_letter_headers_exchange(Config) ->
47624763
#{arguments => #{<<"x-dead-letter-exchange">> => {utf8, <<"amq.headers">>},
47634764
<<"x-message-ttl">> => {ulong, 0}}}),
47644765
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}),
4765-
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.headers">>, <<>>,
4766+
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.headers">>, <<>>,
47664767
#{<<"my key">> => {uint, 5},
47674768
<<"x-my key">> => {uint, 6},
47684769
<<"x-match">> => {utf8, <<"all-with-x">>}}),
@@ -5944,6 +5945,50 @@ reserved_annotation(Config) ->
59445945
end,
59455946
ok = close_connection_sync(Connection).
59465947

5948+
%% Test case for
5949+
%% https://github.com/rabbitmq/rabbitmq-server/issues/12531
5950+
%% We pretend here to be unaware of RabbitMQ's target address format.
5951+
%% We learn the address format from the properties field in the open frame.
5952+
open_properties_target_address_prefix(Config) ->
5953+
QName = atom_to_binary(?FUNCTION_NAME),
5954+
Topic = <<"topic1">>,
5955+
5956+
OpnConf0 = connection_config(Config),
5957+
OpnConf = OpnConf0#{notify_with_performative => true},
5958+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
5959+
{QueuePrefix, TopicPrefix} =
5960+
receive {amqp10_event, {connection, Connection, {opened, #'v1_0.open'{properties = {map, KVList}}}}} ->
5961+
{_, {utf8, QPref}} = proplists:lookup({symbol, <<"queue-prefix">>}, KVList),
5962+
{_, {utf8, TPref}} = proplists:lookup({symbol, <<"topic-prefix">>}, KVList),
5963+
{QPref, TPref}
5964+
after 5000 -> ct:fail({missing_event, ?LINE})
5965+
end,
5966+
5967+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
5968+
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>),
5969+
5970+
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}),
5971+
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, <<"amq.topic">>, Topic, #{}),
5972+
5973+
{ok, QSender} = amqp10_client:attach_sender_link(
5974+
Session, <<"queue sender">>, <<QueuePrefix/binary, QName/binary>>),
5975+
{ok, TSender} = amqp10_client:attach_sender_link(
5976+
Session, <<"topic sender">>, <<TopicPrefix/binary, Topic/binary>>),
5977+
wait_for_credit(QSender),
5978+
wait_for_credit(TSender),
5979+
ok = amqp10_client:send_msg(QSender, amqp10_msg:new(<<"t1">>, <<"m1">>)),
5980+
ok = amqp10_client:send_msg(TSender, amqp10_msg:new(<<"t2">>, <<"m2">>)),
5981+
ok = wait_for_accepted(<<"t1">>),
5982+
ok = wait_for_accepted(<<"t2">>),
5983+
5984+
ok = amqp10_client:detach_link(QSender),
5985+
ok = amqp10_client:detach_link(TSender),
5986+
?assertMatch({ok, #{message_count := 2}},
5987+
rabbitmq_amqp_client:delete_queue(LinkPair, QName)),
5988+
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
5989+
ok = end_session_sync(Session),
5990+
ok = amqp10_client:close_connection(Connection).
5991+
59475992
%% internal
59485993
%%
59495994

0 commit comments

Comments
 (0)