Skip to content

Commit cfccb78

Browse files
committed
Return queue-prefix in open properties
The `queue-prefix` property of the `open` frame is not part of the AMQP spec. However, let's include this property because it is also returned by ActiveMQ and Solace and understood by some client libs, e.g. ActiveMQ NMS.AMQP and Qpid JMS. Note that we do not set `topic-prefix` here because `/exchanges/amq.topic/` is a valid target address prefix but not a valid source address prefix. Closes #12531
1 parent 73f118f commit cfccb78

File tree

2 files changed

+51
-3
lines changed

2 files changed

+51
-3
lines changed

deps/rabbit/src/rabbit_amqp_reader.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,15 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) ->
136136
server_properties() ->
137137
Props0 = rabbit_reader:server_properties(amqp_1_0),
138138
Props1 = [{{symbol, K}, {utf8, V}} || {K, longstr, V} <- Props0],
139-
Props = [{{symbol, <<"node">>}, {utf8, atom_to_binary(node())}} | Props1],
139+
Props = [
140+
{{symbol, <<"node">>}, {utf8, atom_to_binary(node())}},
141+
%% queue-prefix is not part of the AMQP spec.
142+
%% However, we include this property because it is also returned by
143+
%% ActiveMQ and Solace and understood by some client libs,
144+
%% e.g. ActiveMQ NMS.AMQP and Qpid JMS.
145+
%% https://github.com/rabbitmq/rabbitmq-server/issues/12531
146+
{{symbol, <<"queue-prefix">>}, {utf8, <<"/queues/">>}}
147+
] ++ Props1,
140148
{map, Props}.
141149

142150
%%--------------------------------------------------------------------------

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ groups() ->
156156
tcp_back_pressure_rabbitmq_internal_flow_quorum_queue,
157157
session_max_per_connection,
158158
link_max_per_session,
159-
reserved_annotation
159+
reserved_annotation,
160+
open_properties_queue_prefix
160161
]},
161162

162163
{cluster_size_3, [shuffle],
@@ -4762,7 +4763,7 @@ dead_letter_headers_exchange(Config) ->
47624763
#{arguments => #{<<"x-dead-letter-exchange">> => {utf8, <<"amq.headers">>},
47634764
<<"x-message-ttl">> => {ulong, 0}}}),
47644765
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}),
4765-
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.headers">>, <<>>,
4766+
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.headers">>, <<>>,
47664767
#{<<"my key">> => {uint, 5},
47674768
<<"x-my key">> => {uint, 6},
47684769
<<"x-match">> => {utf8, <<"all-with-x">>}}),
@@ -5944,6 +5945,45 @@ reserved_annotation(Config) ->
59445945
end,
59455946
ok = close_connection_sync(Connection).
59465947

5948+
%% Test case for https://github.com/rabbitmq/rabbitmq-server/issues/12531.
5949+
%% We pretend here to be unaware of RabbitMQ's target and source address format.
5950+
%% We learn the address format from the properties field in the open frame.
5951+
open_properties_queue_prefix(Config) ->
5952+
QName = atom_to_binary(?FUNCTION_NAME),
5953+
5954+
OpnConf0 = connection_config(Config),
5955+
OpnConf = OpnConf0#{notify_with_performative => true},
5956+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
5957+
QueuePrefix = receive {amqp10_event, {connection, Connection,
5958+
{opened, #'v1_0.open'{properties = {map, KVList}}}}} ->
5959+
{_, {utf8, QPref}} = proplists:lookup({symbol, <<"queue-prefix">>}, KVList),
5960+
QPref
5961+
after 5000 -> ct:fail({missing_event, ?LINE})
5962+
end,
5963+
5964+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
5965+
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>),
5966+
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}),
5967+
5968+
Address = <<QueuePrefix/binary, QName/binary>>,
5969+
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address, unsettled),
5970+
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled),
5971+
wait_for_credit(Sender),
5972+
5973+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag">>, <<"msg">>)),
5974+
ok = wait_for_accepted(<<"tag">>),
5975+
5976+
{ok, Msg} = amqp10_client:get_msg(Receiver),
5977+
?assertEqual([<<"msg">>], amqp10_msg:body(Msg)),
5978+
ok = amqp10_client:accept_msg(Receiver, Msg),
5979+
5980+
ok = amqp10_client:detach_link(Sender),
5981+
ok = amqp10_client:detach_link(Receiver),
5982+
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
5983+
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
5984+
ok = end_session_sync(Session),
5985+
ok = amqp10_client:close_connection(Connection).
5986+
59475987
%% internal
59485988
%%
59495989

0 commit comments

Comments
 (0)