Skip to content

Commit 56bc881

Browse files
committed
Return queue-prefix in open properties
The `queue-prefix` property of the `open` frame is not part of the AMQP spec. However, let's include this property because it is also returned by ActiveMQ and Solace and understood by some client libs, e.g. ActiveMQ NMS.AMQP and Qpid JMS. Note that we do not set `topic-prefix` here because `/exchanges/amq.topic/` is a valid target address prefix but not a valid source address prefix. Closes #12531
1 parent 985bc0e commit 56bc881

File tree

2 files changed

+51
-3
lines changed

2 files changed

+51
-3
lines changed

deps/rabbit/src/rabbit_amqp_reader.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,15 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) ->
136136
server_properties() ->
137137
Props0 = rabbit_reader:server_properties(amqp_1_0),
138138
Props1 = [{{symbol, K}, {utf8, V}} || {K, longstr, V} <- Props0],
139-
Props = [{{symbol, <<"node">>}, {utf8, atom_to_binary(node())}} | Props1],
139+
Props = [
140+
{{symbol, <<"node">>}, {utf8, atom_to_binary(node())}},
141+
%% queue-prefix is not part of the AMQP spec.
142+
%% However, we include this property because it is also returned by
143+
%% ActiveMQ and Solace and understood by some client libs,
144+
%% e.g. ActiveMQ NMS.AMQP and Qpid JMS.
145+
%% https://github.com/rabbitmq/rabbitmq-server/issues/12531
146+
{{symbol, <<"queue-prefix">>}, {utf8, <<"/queues/">>}}
147+
] ++ Props1,
140148
{map, Props}.
141149

142150
%%--------------------------------------------------------------------------

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ groups() ->
156156
tcp_back_pressure_rabbitmq_internal_flow_quorum_queue,
157157
session_max_per_connection,
158158
link_max_per_session,
159-
reserved_annotation
159+
reserved_annotation,
160+
open_properties_queue_prefix
160161
]},
161162

162163
{cluster_size_3, [shuffle],
@@ -4760,7 +4761,7 @@ dead_letter_headers_exchange(Config) ->
47604761
#{arguments => #{<<"x-dead-letter-exchange">> => {utf8, <<"amq.headers">>},
47614762
<<"x-message-ttl">> => {ulong, 0}}}),
47624763
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}),
4763-
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.headers">>, <<>>,
4764+
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.headers">>, <<>>,
47644765
#{<<"my key">> => {uint, 5},
47654766
<<"x-my key">> => {uint, 6},
47664767
<<"x-match">> => {utf8, <<"all-with-x">>}}),
@@ -5942,6 +5943,45 @@ reserved_annotation(Config) ->
59425943
end,
59435944
ok = close_connection_sync(Connection).
59445945

5946+
%% Test case for https://github.com/rabbitmq/rabbitmq-server/issues/12531.
5947+
%% We pretend here to be unaware of RabbitMQ's target and source address format.
5948+
%% We learn the address format from the properties field in the open frame.
5949+
open_properties_queue_prefix(Config) ->
5950+
QName = atom_to_binary(?FUNCTION_NAME),
5951+
5952+
OpnConf0 = connection_config(Config),
5953+
OpnConf = OpnConf0#{notify_with_performative => true},
5954+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
5955+
QueuePrefix = receive {amqp10_event, {connection, Connection,
5956+
{opened, #'v1_0.open'{properties = {map, KVList}}}}} ->
5957+
{_, {utf8, QPref}} = proplists:lookup({symbol, <<"queue-prefix">>}, KVList),
5958+
QPref
5959+
after 5000 -> ct:fail({missing_event, ?LINE})
5960+
end,
5961+
5962+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
5963+
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>),
5964+
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}),
5965+
5966+
Address = <<QueuePrefix/binary, QName/binary>>,
5967+
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address, unsettled),
5968+
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled),
5969+
wait_for_credit(Sender),
5970+
5971+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag">>, <<"msg">>)),
5972+
ok = wait_for_accepted(<<"tag">>),
5973+
5974+
{ok, Msg} = amqp10_client:get_msg(Receiver),
5975+
?assertEqual([<<"msg">>], amqp10_msg:body(Msg)),
5976+
ok = amqp10_client:accept_msg(Receiver, Msg),
5977+
5978+
ok = amqp10_client:detach_link(Sender),
5979+
ok = amqp10_client:detach_link(Receiver),
5980+
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
5981+
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
5982+
ok = end_session_sync(Session),
5983+
ok = amqp10_client:close_connection(Connection).
5984+
59455985
%% internal
59465986
%%
59475987

0 commit comments

Comments
 (0)