Skip to content

Commit ecf3fa5

Browse files
committed
Support responder checking for existence of requester
Support responder checking for existence of requester via HTTP GET (over AMQP). This is especially useful for when the responder attaches to the anonymous target. This is in addition to attaching to the queues target with RabbitMQ refusing the link if the queue doesn't exist.
1 parent 422b575 commit ecf3fa5

File tree

5 files changed

+111
-15
lines changed

5 files changed

+111
-15
lines changed

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,14 @@ handle_http_req(<<"GET">>,
8787
QName,
8888
fun(Q) ->
8989
{ok, NumMsgs, NumConsumers} = rabbit_amqqueue:stat(Q),
90-
RespPayload = encode_queue(Q, NumMsgs, NumConsumers),
91-
{ok, {<<"200">>, RespPayload, PermCaches}}
90+
case rabbit_volatile_queue:is(QNameBin) andalso
91+
not rabbit_volatile_queue:exists(QName) of
92+
true ->
93+
{error, not_found};
94+
false ->
95+
RespPayload = encode_queue(Q, NumMsgs, NumConsumers),
96+
{ok, {<<"200">>, RespPayload, PermCaches}}
97+
end
9298
end) of
9399
{ok, Result} ->
94100
Result;

deps/rabbit/src/rabbit_volatile_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ notify_decorators(_) ->
285285
ok.
286286

287287
stat(_) ->
288-
{ok, 0, 0}.
288+
{ok, 0, 1}.
289289

290290
format(_, _) ->
291291
[].

deps/rabbit/test/direct_reply_to_amqp_SUITE.erl

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ responder_attaches_queue_target(Config) ->
9696
RequestQueue = atom_to_binary(?FUNCTION_NAME),
9797
AddrRequestQueue = rabbitmq_amqp_address:queue(RequestQueue),
9898

99-
{ConnResponder, SessionResponder, LinkPair} = init(Config),
100-
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, RequestQueue, #{}),
99+
{ConnResponder, SessionResponder, LinkPairResponder} = init(Config),
100+
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPairResponder, RequestQueue, #{}),
101101

102102
OpnConfRequester0 = connection_config(Config),
103103
OpnConfRequester = OpnConfRequester0#{container_id := <<"requester">>,
@@ -143,6 +143,14 @@ responder_attaches_queue_target(Config) ->
143143
%% The metadata store should store only the request queue.
144144
?assertEqual(1, rabbit_ct_broker_helpers:rpc(Config, rabbit_db_queue, count, [])),
145145

146+
{ok, #{queue := ReplyQ}} = rabbitmq_amqp_address:to_map(ReplyToAddr),
147+
?assertMatch({ok, #{vhost := <<"/">>,
148+
durable := false,
149+
type := <<"rabbit_volatile_queue">>,
150+
message_count := 0,
151+
consumer_count := 1}},
152+
rabbitmq_amqp_client:get_queue(LinkPairResponder, ReplyQ)),
153+
146154
{ok, SenderResponder1} = amqp10_client:attach_sender_link_sync(
147155
SessionResponder, <<"sender responder unsettled">>,
148156
ReplyToAddr, unsettled),
@@ -247,12 +255,15 @@ responder_attaches_queue_target(Config) ->
247255
end,
248256

249257
%% When the requester detaches, the volatile queue is gone.
250-
%% Therefore, RabbitMQ should refuse attaching to the volatile queue target.
251258
ok = detach_link_sync(ReceiverRequester),
252259
flush(detached),
253260
?assertMatch(#{publishers := 3,
254261
consumers := 1},
255262
maps:get(#{protocol => amqp10}, get_global_counters(Config))),
263+
%% Therefore, HTTP GET on that queue should return 404.
264+
{error, Resp} = rabbitmq_amqp_client:get_queue(LinkPairResponder, ReplyQ),
265+
?assertMatch(#{subject := <<"404">>}, amqp10_msg:properties(Resp)),
266+
%% Also, RabbitMQ should refuse attaching to the volatile queue target.
256267
{ok, SenderResponder3} = amqp10_client:attach_sender_link_sync(
257268
SessionResponder, <<"sender responder 3">>,
258269
ReplyToAddr),
@@ -278,8 +289,8 @@ responder_attaches_queue_target(Config) ->
278289
after 9000 -> ct:fail({missing_event, ?LINE})
279290
end,
280291

281-
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, RequestQueue),
282-
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
292+
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPairResponder, RequestQueue),
293+
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPairResponder),
283294

284295
ok = close_connection_sync(ConnResponder),
285296
ok = close_connection_sync(ConnRequester),

deps/rabbit/test/direct_reply_to_amqpl_SUITE.erl

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
-import(rabbit_ct_helpers,
1919
[eventually/1]).
2020
-import(amqp_utils,
21-
[connection_config/1,
21+
[init/1,
22+
close/1,
23+
connection_config/1,
2224
wait_for_credit/1,
2325
end_session_sync/1,
2426
close_connection_sync/1]).
@@ -300,16 +302,23 @@ amqpl_amqp_amqpl(Config) ->
300302
payload = RequestPayload}),
301303

302304
%% Receive the request via AMQP 1.0.
303-
OpnConf = connection_config(Config),
304-
{ok, Conn} = amqp10_client:open_connection(OpnConf),
305-
{ok, Session} = amqp10_client:begin_session_sync(Conn),
305+
{_, Session, LinkPair} = Init = init(Config),
306306
{ok, Receiver} = amqp10_client:attach_receiver_link(
307307
Session, <<"receiver">>, AddrRequestQ),
308308
{ok, RequestMsg} = amqp10_client:get_msg(Receiver, ?TIMEOUT),
309309
?assertEqual(RequestPayload, amqp10_msg:body_bin(RequestMsg)),
310310
#{message_id := Id,
311311
reply_to := ReplyToAddr} = amqp10_msg:properties(RequestMsg),
312312

313+
%% AMQP 1.0 responder checks whether the AMQP 0.9.1 requester is still there.
314+
{ok, #{queue := ReplyQ}} = rabbitmq_amqp_address:to_map(ReplyToAddr),
315+
?assertMatch({ok, #{vhost := <<"/">>,
316+
durable := false,
317+
type := <<"rabbit_volatile_queue">>,
318+
message_count := 0,
319+
consumer_count := 1}},
320+
rabbitmq_amqp_client:get_queue(LinkPair, ReplyQ)),
321+
313322
%% Send the reply via AMQP 1.0.
314323
{ok, Sender} = amqp10_client:attach_sender_link_sync(
315324
Session, <<"sender">>, ReplyToAddr),
@@ -346,9 +355,19 @@ amqpl_amqp_amqpl(Config) ->
346355
after ?TIMEOUT -> ct:fail(missing_reply)
347356
end,
348357

349-
ok = end_session_sync(Session),
350-
ok = close_connection_sync(Conn),
358+
%% AMQP 0.9.1 requester cancels consumption.
359+
?assertMatch(#'basic.cancel_ok'{consumer_tag = CTag},
360+
amqp_channel:call(Chan, #'basic.cancel'{consumer_tag = CTag})),
361+
362+
%% This time, when the AMQP 1.0 responder checks whether the AMQP 0.9.1 requester
363+
%% is still there, an error should be returned.
364+
{error, Resp} = rabbitmq_amqp_client:get_queue(LinkPair, ReplyQ),
365+
?assertMatch(#{subject := <<"404">>}, amqp10_msg:properties(Resp)),
366+
?assertEqual(#'v1_0.amqp_value'{content = {utf8, <<"queue '", ReplyQ/binary, "' in vhost '/' not found">>}},
367+
amqp10_msg:body(Resp)),
368+
351369
#'queue.delete_ok'{} = amqp_channel:call(Chan, #'queue.delete'{queue = RequestQ}),
370+
ok = close(Init),
352371
ok = rabbit_ct_client_helpers:close_channel(Chan).
353372

354373
%% Test that Direct Reply-To works when the requester is an AMQP 1.0 client

deps/rabbitmq_amqp_client/src/rabbitmq_amqp_address.erl

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,13 @@
88

99
-export([exchange/1,
1010
exchange/2,
11-
queue/1]).
11+
queue/1,
12+
from_map/1,
13+
to_map/1]).
14+
15+
-type address_map() :: #{queue := unicode:unicode_binary()} |
16+
#{exchange := unicode:unicode_binary(),
17+
routing_key => unicode:unicode_binary()}.
1218

1319
-spec exchange(unicode:unicode_binary()) ->
1420
unicode:unicode_binary().
@@ -28,3 +34,57 @@ exchange(ExchangeName, RoutingKey) ->
2834
queue(QueueName) ->
2935
QueueNameQuoted = uri_string:quote(QueueName),
3036
<<"/queues/", QueueNameQuoted/binary>>.
37+
38+
-spec from_map(address_map()) ->
39+
unicode:unicode_binary().
40+
from_map(#{exchange := Exchange, routing_key := RoutingKey}) ->
41+
exchange(Exchange, RoutingKey);
42+
from_map(#{exchange := Exchange}) ->
43+
exchange(Exchange);
44+
from_map(#{queue := Queue}) ->
45+
queue(Queue).
46+
47+
-spec to_map(unicode:unicode_binary()) ->
48+
{ok, address_map()} | error.
49+
to_map(<<"/exchanges/", Rest/binary>>) ->
50+
case binary:split(Rest, <<"/">>, [global]) of
51+
[ExchangeQuoted]
52+
when ExchangeQuoted =/= <<>> ->
53+
Exchange = uri_string:unquote(ExchangeQuoted),
54+
{ok, #{exchange => Exchange}};
55+
[ExchangeQuoted, RoutingKeyQuoted]
56+
when ExchangeQuoted =/= <<>> ->
57+
Exchange = uri_string:unquote(ExchangeQuoted),
58+
RoutingKey = uri_string:unquote(RoutingKeyQuoted),
59+
{ok, #{exchange => Exchange,
60+
routing_key => RoutingKey}};
61+
_ ->
62+
error
63+
end;
64+
to_map(<<"/queues/">>) ->
65+
error;
66+
to_map(<<"/queues/", QueueQuoted/binary>>) ->
67+
Queue = uri_string:unquote(QueueQuoted),
68+
{ok, #{queue => Queue}};
69+
to_map(_) ->
70+
error.
71+
72+
-ifdef(TEST).
73+
-include_lib("eunit/include/eunit.hrl").
74+
address_test() ->
75+
M1 = #{queue => <<"my queue">>},
76+
M2 = #{queue => <<"🥕"/utf8>>},
77+
M3 = #{exchange => <<"my exchange">>},
78+
M4 = #{exchange => <<"🥕"/utf8>>},
79+
M5 = #{exchange => <<"my exchange">>,
80+
routing_key => <<"my routing key">>},
81+
M6 = #{exchange => <<"🥕"/utf8>>,
82+
routing_key => <<"🍰"/utf8>>},
83+
lists:foreach(fun(Map) ->
84+
{ok, Map} = to_map(from_map(Map))
85+
end, [M1, M2, M3, M4, M5, M6]),
86+
87+
error = to_map(<<"/queues/">>),
88+
error = to_map(<<"/exchanges/">>),
89+
error = to_map(<<"/exchanges//key">>).
90+
-endif.

0 commit comments

Comments
 (0)