diff --git a/deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl b/deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl index 548788466bbc..6f2ffb0a0d5b 100644 --- a/deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl +++ b/deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl @@ -18,6 +18,9 @@ -define(TIMEOUT, 30_000). +%% This is the pseudo queue that is specially interpreted by RabbitMQ. +-define(REPLY_QUEUE, <<"amq.rabbitmq.reply-to">>). + all() -> [ {group, cluster_size_1}, @@ -28,7 +31,11 @@ groups() -> [ {cluster_size_1, [shuffle], [ - trace + trace, + failure_ack_mode, + failure_multiple_consumers, + failure_reuse_consumer_tag, + failure_publish ]}, {cluster_size_3, [shuffle], [ @@ -82,8 +89,6 @@ trace(Config) -> Node = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)), TraceQueue = <<"tests.amqpl_direct_reply_to.trace.tracing">>, RequestQueue = <<"tests.amqpl_direct_reply_to.trace.requests">>, - %% This is the pseudo queue that is specially interpreted by RabbitMQ. - ReplyQueue = <<"amq.rabbitmq.reply-to">>, RequestPayload = <<"my request">>, ReplyPayload = <<"my reply">>, CorrelationId = <<"my correlation ID">>, @@ -102,7 +107,7 @@ trace(Config) -> %% There is no need to declare this pseudo queue first. amqp_channel:subscribe(RequesterCh, - #'basic.consume'{queue = ReplyQueue, + #'basic.consume'{queue = ?REPLY_QUEUE, no_ack = true}, self()), CTag = receive #'basic.consume_ok'{consumer_tag = CTag0} -> CTag0 @@ -114,7 +119,7 @@ trace(Config) -> amqp_channel:cast( RequesterCh, #'basic.publish'{routing_key = RequestQueue}, - #amqp_msg{props = #'P_basic'{reply_to = ReplyQueue, + #amqp_msg{props = #'P_basic'{reply_to = ?REPLY_QUEUE, correlation_id = CorrelationId}, payload = RequestPayload}), receive #'basic.ack'{} -> ok @@ -182,6 +187,85 @@ trace(Config) -> [#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = Q0}) || Q0 <- Qs], {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_off"]). +%% A consumer must consume in no-ack mode. +failure_ack_mode(Config) -> + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), + Consume = #'basic.consume'{queue = ?REPLY_QUEUE, + no_ack = false}, + try amqp_channel:subscribe(Ch, Consume, self()) of + _ -> + ct:fail("expected subscribe in ack mode to fail") + catch exit:Reason -> + ?assertMatch( + {{_, {_, _, <<"PRECONDITION_FAILED - reply consumer cannot acknowledge">>}}, _}, + Reason) + end, + ok = rabbit_ct_client_helpers:close_connection(Conn). + +%% In AMQP 0.9.1 there can be at most one reply consumer per channel. +failure_multiple_consumers(Config) -> + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), + Consume = #'basic.consume'{queue = ?REPLY_QUEUE, + no_ack = true}, + amqp_channel:subscribe(Ch, Consume, self()), + receive #'basic.consume_ok'{} -> ok + end, + + try amqp_channel:subscribe(Ch, Consume, self()) of + _ -> + ct:fail("expected second subscribe to fail") + catch exit:Reason -> + ?assertMatch( + {{_, {_, _, <<"PRECONDITION_FAILED - reply consumer already set">>}}, _}, + Reason) + end, + ok = rabbit_ct_client_helpers:close_connection(Conn). + +%% Reusing the same consumer tag should fail. +failure_reuse_consumer_tag(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), + Ctag = <<"my-tag">>, + + #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, #'queue.declare'{exclusive = true}), + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, + consumer_tag = Ctag}, self()), + receive #'basic.consume_ok'{} -> ok + end, + + try amqp_channel:subscribe(Ch, #'basic.consume'{queue = ?REPLY_QUEUE, + consumer_tag = Ctag, + no_ack = true}, self()) of + _ -> + ct:fail("expected reusing consumer tag to fail") + catch exit:Reason -> + ?assertMatch( + {{_, {connection_closing, + {_, _, <<"NOT_ALLOWED - attempt to reuse consumer tag 'my-tag'">>} + }}, _}, + Reason) + end. + +%% Publishing with reply_to header set but without consuming from the pseudo queue should fail. +failure_publish(Config) -> + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), + + Ref = monitor(process, Ch), + amqp_channel:cast( + Ch, + #'basic.publish'{routing_key = <<"some request queue">>}, + #amqp_msg{props = #'P_basic'{reply_to = ?REPLY_QUEUE, + correlation_id = <<"some correlation ID">>}, + payload = <<"some payload">>}), + + receive {'DOWN', Ref, process, Ch, Reason} -> + ?assertMatch( + {_, {_, _, <<"PRECONDITION_FAILED - fast reply consumer does not exist">>}}, + Reason) + after ?TIMEOUT -> + ct:fail("expected channel error") + end, + ok = rabbit_ct_client_helpers:close_connection(Conn). + %% "new" and "old" refers to new and old RabbitMQ versions in mixed version tests. rpc_new_to_old_node(Config) -> rpc(0, 1, Config). @@ -190,36 +274,40 @@ rpc_old_to_new_node(Config) -> rpc(1, 0, Config). rpc(RequesterNode, ResponderNode, Config) -> - RequestQueue = <<"tests.amqpl_direct_reply_to.rpc.requests">>, - %% This is the pseudo queue that is specially interpreted by RabbitMQ. - ReplyQueue = <<"amq.rabbitmq.reply-to">>, + RequestQueue = <<"request queue">>, RequestPayload = <<"my request">>, - ReplyPayload = <<"my reply">>, CorrelationId = <<"my correlation ID">>, RequesterCh = rabbit_ct_client_helpers:open_channel(Config, RequesterNode), ResponderCh = rabbit_ct_client_helpers:open_channel(Config, ResponderNode), %% There is no need to declare this pseudo queue first. amqp_channel:subscribe(RequesterCh, - #'basic.consume'{queue = ReplyQueue, + #'basic.consume'{queue = ?REPLY_QUEUE, no_ack = true}, self()), CTag = receive #'basic.consume_ok'{consumer_tag = CTag0} -> CTag0 end, + + ?assertEqual(#'queue.declare_ok'{queue = ?REPLY_QUEUE, + message_count = 0, + consumer_count = 1}, + amqp_channel:call(RequesterCh, + #'queue.declare'{queue = ?REPLY_QUEUE})), + #'queue.declare_ok'{} = amqp_channel:call( RequesterCh, #'queue.declare'{queue = RequestQueue}), #'confirm.select_ok'{} = amqp_channel:call(RequesterCh, #'confirm.select'{}), amqp_channel:register_confirm_handler(RequesterCh, self()), + %% Send the request. amqp_channel:cast( RequesterCh, #'basic.publish'{routing_key = RequestQueue}, - #amqp_msg{props = #'P_basic'{reply_to = ReplyQueue, + #amqp_msg{props = #'P_basic'{reply_to = ?REPLY_QUEUE, correlation_id = CorrelationId}, payload = RequestPayload}), receive #'basic.ack'{} -> ok - after ?TIMEOUT -> ct:fail(confirm_timeout) end, ok = wait_for_queue_declared(RequestQueue, ResponderNode, Config), @@ -229,20 +317,101 @@ rpc(RequesterNode, ResponderNode, Config) -> correlation_id = CorrelationId}, payload = RequestPayload} } = amqp_channel:call(ResponderCh, #'basic.get'{queue = RequestQueue}), + + %% Test what the docs state: + %% "If the RPC server is going to perform some expensive computation it might wish + %% to check if the client has gone away. To do this the server can declare the + %% generated reply name first on a disposable channel in order to determine whether + %% it still exists." + ?assertEqual(#'queue.declare_ok'{queue = ReplyTo, + message_count = 0, + consumer_count = 1}, + amqp_channel:call(ResponderCh, + #'queue.declare'{queue = ReplyTo})), + %% Send the reply. amqp_channel:cast( ResponderCh, #'basic.publish'{routing_key = ReplyTo}, #amqp_msg{props = #'P_basic'{correlation_id = CorrelationId}, - payload = ReplyPayload}), + payload = <<"reply 1">>}), - %% Receive the reply. + %% Let's assume the RPC server sends multiple replies for a single request. + %% (This is a bit unusual but should work.) + amqp_channel:cast( + ResponderCh, + #'basic.publish'{routing_key = ReplyTo}, + #amqp_msg{props = #'P_basic'{correlation_id = CorrelationId}, + payload = <<"reply 2">>}), + + %% Receive the frst reply. + receive {#'basic.deliver'{consumer_tag = CTag, + redelivered = false, + exchange = <<>>, + routing_key = ReplyTo}, + #amqp_msg{payload = P1, + props = #'P_basic'{correlation_id = CorrelationId}}} -> + ?assertEqual(<<"reply 1">>, P1) + after ?TIMEOUT -> ct:fail({missing_reply, ?LINE}) + end, + + %% Receive the second reply. receive {#'basic.deliver'{consumer_tag = CTag}, - #amqp_msg{payload = ReplyPayload, + #amqp_msg{payload = P2, props = #'P_basic'{correlation_id = CorrelationId}}} -> - ok - after ?TIMEOUT -> ct:fail(missing_reply) - end. + ?assertEqual(<<"reply 2">>, P2) + after ?TIMEOUT -> ct:fail({missing_reply, ?LINE}) + end, + + %% The requester sends a reply to itself. + %% (Really odd, but should work.) + amqp_channel:cast( + RequesterCh, + #'basic.publish'{routing_key = ReplyTo}, + #amqp_msg{props = #'P_basic'{correlation_id = CorrelationId}, + payload = <<"reply 3">>}), + + receive {#'basic.deliver'{consumer_tag = CTag}, + #amqp_msg{payload = P3, + props = #'P_basic'{correlation_id = CorrelationId}}} -> + ?assertEqual(<<"reply 3">>, P3) + after ?TIMEOUT -> ct:fail({missing_reply, ?LINE}) + end, + + %% Requester cancels consumption. + ?assertMatch(#'basic.cancel_ok'{consumer_tag = CTag}, + amqp_channel:call(RequesterCh, #'basic.cancel'{consumer_tag = CTag})), + + %% Send a final reply. + amqp_channel:cast( + ResponderCh, + #'basic.publish'{routing_key = ReplyTo}, + #amqp_msg{props = #'P_basic'{correlation_id = CorrelationId}, + payload = <<"reply 4">>}), + + %% The final reply shouldn't be delivered since the requester cancelled consumption. + receive {#'basic.deliver'{}, #amqp_msg{}} -> + ct:fail("did not expect delivery after cancellation") + after 100 -> ok + end, + + %% Responder checks again if the requester is still there. + %% This time, the requester and its queue should be gone. + try amqp_channel:call(ResponderCh, #'queue.declare'{queue = ReplyTo}) of + _ -> + ct:fail("expected queue.declare to fail") + catch exit:Reason -> + ?assertMatch( + {{_, {_, _, <<"NOT_FOUND - no queue '", + ReplyTo:(byte_size(ReplyTo))/binary, + "' in vhost '/'">>}}, _}, + Reason) + end, + + %% Clean up. + #'queue.delete_ok'{} = amqp_channel:call(RequesterCh, + #'queue.delete'{queue = RequestQueue}), + ok = rabbit_ct_client_helpers:close_channel(RequesterCh). wait_for_queue_declared(Queue, Node, Config) -> eventually(