diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index b31093dcceb6..d72a9666fe4f 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -1494,12 +1494,7 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, topic_permission_cache = TopicPermCache}, rabbit_global_counters:consumer_created(?PROTOCOL), {ok, [A], State1}; - {error, Reason} -> - protocol_error( - ?V_1_0_AMQP_ERROR_INTERNAL_ERROR, - "Consuming from ~s failed: ~tp", - [rabbit_misc:rs(QName), Reason]); - {protocol_error, _Type, Reason, Args} -> + {error, _Type, Reason, Args} -> protocol_error( ?V_1_0_AMQP_ERROR_INTERNAL_ERROR, Reason, Args) diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 93e9d5c2f0b1..b6e9ede763f7 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -1816,8 +1816,7 @@ basic_get(Q, NoAck, LimiterPid, CTag, QStates) -> rabbit_framing:amqp_table(), any(), rabbit_types:username(), rabbit_queue_type:state()) -> {ok, rabbit_queue_type:state()} | - {error, term()} | - {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. + {error, Type :: atom(), Format :: string(), FormatArgs :: [term()]}. basic_consume(Q, NoAck, ChPid, LimiterPid, LimiterActive, ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser, QStates) -> diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 0b913c406287..86d71d7af902 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -1354,43 +1354,26 @@ handle_method(#'basic.consume'{queue = QueueNameBin, CurrentConsumers = maps:size(ConsumerMapping), case maps:find(ConsumerTag, ConsumerMapping) of error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity' - rabbit_misc:protocol_error( - not_allowed, "reached maximum (~B) of consumers per channel", [MaxConsumers]); + rabbit_misc:protocol_error(not_allowed, + "reached maximum (~B) of consumers per channel", + [MaxConsumers]); error -> QueueName = qbin_to_resource(QueueNameBin, VHostPath), check_read_permitted(QueueName, User, AuthzContext), - ActualConsumerTag = - case ConsumerTag of - <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), - "amq.ctag"); - Other -> Other - end, - case basic_consume( - QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, - ExclusiveConsume, Args, NoWait, State) of - {ok, State1} -> - {noreply, State1}; - {error, exclusive_consume_unavailable} -> - rabbit_misc:protocol_error( - access_refused, "~ts in exclusive use", - [rabbit_misc:rs(QueueName)]); - {error, global_qos_not_supported_for_queue_type} -> - rabbit_misc:protocol_error( - not_implemented, "~ts does not support global qos", - [rabbit_misc:rs(QueueName)]); - {error, timeout} -> - rabbit_misc:protocol_error( - internal_error, "~ts timeout occurred during consume operation", - [rabbit_misc:rs(QueueName)]); - {error, no_local_stream_replica_available} -> - rabbit_misc:protocol_error( - resource_error, "~ts does not have a running local replica", - [rabbit_misc:rs(QueueName)]) - end; + ActualTag = case ConsumerTag of + <<>> -> + rabbit_guid:binary( + rabbit_guid:gen_secure(), "amq.ctag"); + _ -> + ConsumerTag + end, + basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualTag, + ExclusiveConsume, Args, NoWait, State); {ok, _} -> %% Attempted reuse of consumer tag. - rabbit_misc:protocol_error( - not_allowed, "attempt to reuse consumer tag '~ts'", [ConsumerTag]) + rabbit_misc:protocol_error(not_allowed, + "attempt to reuse consumer tag '~ts'", + [ConsumerTag]) end; handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, @@ -1685,11 +1668,11 @@ handle_method(_MethodRecord, _Content, _State) -> %% for why. basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, ExclusiveConsume, Args, NoWait, - State = #ch{cfg = #conf{conn_pid = ConnPid, - user = #user{username = Username}}, - limiter = Limiter, - consumer_mapping = ConsumerMapping, - queue_states = QueueStates0}) -> + State0 = #ch{cfg = #conf{conn_pid = ConnPid, + user = #user{username = Username}}, + limiter = Limiter, + consumer_mapping = ConsumerMapping, + queue_states = QueueStates0}) -> case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> @@ -1710,22 +1693,16 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, ActualConsumerTag, {Q, {NoAck, ConsumerPrefetch, ExclusiveConsume, Args}}, ConsumerMapping), - - State1 = State#ch{consumer_mapping = CM1, - queue_states = QueueStates}, - {ok, case NoWait of - true -> consumer_monitor(ActualConsumerTag, State1); - false -> State1 - end}; - {{error, exclusive_consume_unavailable} = E, _Q} -> - E; - {{error, global_qos_not_supported_for_queue_type} = E, _Q} -> - E; - {{error, no_local_stream_replica_available} = E, _Q} -> - E; - {{error, timeout} = E, _Q} -> - E; - {{protocol_error, Type, Reason, ReasonArgs}, _Q} -> + State1 = State0#ch{consumer_mapping = CM1, + queue_states = QueueStates}, + State = case NoWait of + true -> + consumer_monitor(ActualConsumerTag, State1); + false -> + State1 + end, + {noreply, State}; + {{error, Type, Reason, ReasonArgs}, _Q} -> rabbit_misc:protocol_error(Type, Reason, ReasonArgs) end. diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index 2732e9819081..5c79b6804615 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -297,8 +297,12 @@ consume(Q, Spec, State0) when ?amqqueue_is_classic(Q) -> %% TODO: track pids as they change State = ensure_monitor(QPid, QRef, State0), {ok, State#?STATE{pid = QPid}}; - Err -> - Err + {error, exclusive_consume_unavailable} -> + {error, access_refused, "~ts in exclusive use", + [rabbit_misc:rs(QRef)]}; + {error, Reason} -> + {error, internal_error, "failed consuming from classic ~ts: ~tp", + [rabbit_misc:rs(QRef), Reason]} end. %% Delete this function when feature flag rabbitmq_4.0.0 becomes required. diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 498db95dc88d..4ddf31780538 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -211,8 +211,7 @@ consume_spec(), queue_state()) -> {ok, queue_state(), actions()} | - {error, term()} | - {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. + {error, Type :: atom(), Format :: string(), FormatArgs :: [term()]}. -callback cancel(amqqueue:amqqueue(), cancel_spec(), @@ -516,15 +515,14 @@ new(Q, State) when ?is_amqqueue(Q) -> -spec consume(amqqueue:amqqueue(), consume_spec(), state()) -> {ok, state()} | - {error, term()} | - {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. + {error, Type :: atom(), Format :: string(), FormatArgs :: [term()]}. consume(Q, Spec, State) -> #ctx{state = CtxState0} = Ctx = get_ctx(Q, State), Mod = amqqueue:get_type(Q), case Mod:consume(Q, Spec, CtxState0) of {ok, CtxState} -> {ok, set_ctx(Q, Ctx#ctx{state = CtxState}, State)}; - Err -> + {error, _Type, _Fmt, _FmtArgs} = Err-> Err end. diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 7056edab2485..0d99e9a8bd99 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -971,10 +971,12 @@ dequeue(QName, NoAck, _LimiterPid, CTag0, QState0) -> rabbit_queue_type:consume_spec(), rabbit_fifo_client:state()) -> {ok, rabbit_fifo_client:state(), rabbit_queue_type:actions()} | - {error, global_qos_not_supported_for_queue_type | timeout}. + {error, atom(), Format :: string(), FormatArgs :: [term()]}. consume(Q, #{limiter_active := true}, _State) when ?amqqueue_is_quorum(Q) -> - {error, global_qos_not_supported_for_queue_type}; + {error, not_implemented, + "~ts does not support global qos", + [rabbit_misc:rs(amqqueue:get_name(Q))]}; consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) -> #{no_ack := NoAck, channel_pid := ChPid, @@ -1008,46 +1010,58 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) -> args => Args, username => ActingUser, priority => Priority}, - {ok, _Infos, QState} = rabbit_fifo_client:checkout(ConsumerTag, - Mode, ConsumerMeta, - QState0), - case single_active_consumer_on(Q) of - true -> - %% get the leader from state - case rabbit_fifo_client:query_single_active_consumer(QState) of - {ok, SacResult} -> - ActivityStatus = case SacResult of - {value, {ConsumerTag, ChPid}} -> - single_active; - _ -> - waiting - end, - rabbit_core_metrics:consumer_created( - ChPid, ConsumerTag, ExclusiveConsume, - AckRequired, QName, - Prefetch, ActivityStatus == single_active, %% Active - ActivityStatus, Args), + case rabbit_fifo_client:checkout(ConsumerTag, Mode, ConsumerMeta, QState0) of + {ok, _Infos, QState} -> + case single_active_consumer_on(Q) of + true -> + %% get the leader from state + case rabbit_fifo_client:query_single_active_consumer(QState) of + {ok, SacResult} -> + ActivityStatus = case SacResult of + {value, {ConsumerTag, ChPid}} -> + single_active; + _ -> + waiting + end, + rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, + ExclusiveConsume, + AckRequired, QName, + Prefetch, + ActivityStatus == single_active, + ActivityStatus, Args), + emit_consumer_created(ChPid, ConsumerTag, + ExclusiveConsume, + AckRequired, QName, + Prefetch, Args, none, + ActingUser), + {ok, QState}; + Err -> + consume_error(Err, QName) + end; + false -> + rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, + ExclusiveConsume, + AckRequired, QName, + Prefetch, true, + up, Args), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, Prefetch, Args, none, ActingUser), - {ok, QState}; - {error, Error} -> - Error; - {timeout, _} -> - {error, timeout} + {ok, QState} end; - false -> - rabbit_core_metrics:consumer_created( - ChPid, ConsumerTag, ExclusiveConsume, - AckRequired, QName, - Prefetch, true, %% Active - up, Args), - emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - AckRequired, QName, Prefetch, - Args, none, ActingUser), - {ok, QState} + Err -> + consume_error(Err, QName) end. +consume_error({error, Reason}, QName) -> + {error, internal_error, + "failed consuming from quorum ~ts: ~tp", + [rabbit_misc:rs(QName), Reason]}; +consume_error({timeout, RaServerId}, QName) -> + {error, internal_error, + "timed out consuming from quorum ~ts: ~tp", + [rabbit_misc:rs(QName), RaServerId]}. + cancel(_Q, #{consumer_tag := ConsumerTag} = Spec, State) -> maybe_send_reply(self(), maps:get(ok_msg, Spec, undefined)), Reason = maps:get(reason, Spec, cancel), diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index a52897437c66..0b7c1c0bbba9 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -290,19 +290,23 @@ format(Q, Ctx) -> consume(Q, #{mode := {simple_prefetch, 0}}, _) when ?amqqueue_is_stream(Q) -> - {protocol_error, precondition_failed, "consumer prefetch count is not set for stream ~ts", + {error, precondition_failed, + "consumer prefetch count is not set for stream ~ts", [rabbit_misc:rs(amqqueue:get_name(Q))]}; consume(Q, #{no_ack := true, mode := {simple_prefetch, _}}, _) when ?amqqueue_is_stream(Q) -> - {protocol_error, not_implemented, + {error, not_implemented, "automatic acknowledgement not supported by stream ~ts", [rabbit_misc:rs(amqqueue:get_name(Q))]}; consume(Q, #{limiter_active := true}, _State) when ?amqqueue_is_stream(Q) -> - {error, global_qos_not_supported_for_queue_type}; + {error, not_implemented, + "~ts does not support global qos", + [rabbit_misc:rs(amqqueue:get_name(Q))]}; consume(Q, Spec, #stream_client{} = QState0) when ?amqqueue_is_stream(Q) -> + QName = amqqueue:get_name(Q), %% Messages should include the offset as a custom header. case get_local_pid(QState0) of {LocalPid, QState} when is_pid(LocalPid) -> @@ -314,13 +318,10 @@ consume(Q, Spec, #stream_client{} = QState0) args := Args, ok_msg := OkMsg, acting_user := ActingUser} = Spec, - QName = amqqueue:get_name(Q), rabbit_log:debug("~s:~s Local pid resolved ~0p", [?MODULE, ?FUNCTION_NAME, LocalPid]), case parse_offset_arg( rabbit_misc:table_lookup(Args, <<"x-stream-offset">>)) of - {error, _} = Err -> - Err; {ok, OffsetSpec} -> ConsumerPrefetchCount = case Mode of {simple_prefetch, C} -> C; @@ -344,12 +345,17 @@ consume(Q, Spec, #stream_client{} = QState0) maybe_send_reply(ChPid, OkMsg), _ = rabbit_stream_coordinator:register_local_member_listener(Q), Filter = maps:get(filter, Spec, []), - begin_stream(QState, ConsumerTag, OffsetSpec, Mode, AckRequired, Filter, filter_spec(Args)) + begin_stream(QState, ConsumerTag, OffsetSpec, Mode, + AckRequired, Filter, filter_spec(Args)); + {error, Reason} -> + {error, precondition_failed, + "failed consuming from stream ~ts: ~tp", + [rabbit_misc:rs(QName), Reason]} end; {undefined, _} -> - {protocol_error, precondition_failed, + {error, precondition_failed, "stream ~ts does not have a running replica on the local node", - [rabbit_misc:rs(amqqueue:get_name(Q))]} + [rabbit_misc:rs(QName)]} end. -spec parse_offset_arg(undefined | diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 35f7c9d5c198..1c1ebf3bc1e4 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -201,7 +201,8 @@ groups() -> leader_transfer_stream_send, list_connections, detach_requeues_two_connections_classic_queue, - detach_requeues_two_connections_quorum_queue + detach_requeues_two_connections_quorum_queue, + attach_to_down_quorum_queue ]}, {metrics, [shuffle], @@ -6596,8 +6597,55 @@ bad_x_cc_annotation_exchange(Config) -> ok = end_session_sync(Session), ok = close_connection_sync(Connection). +%% Attach a receiver to an unavailable quorum queue. +attach_to_down_quorum_queue(Config) -> + QName = <<"q-down">>, + Address = rabbitmq_amqp_address:queue(QName), + + %% Create quorum queue with single replica on node 2. + {_, _, LinkPair2} = Init2 = init(2, Config), + {ok, _} = rabbitmq_amqp_client:declare_queue( + LinkPair2, + QName, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}, + <<"x-quorum-initial-group-size">> => {ulong, 1} + }}), + ok = close(Init2), + + %% Make quorum queue unavailable. + ok = rabbit_ct_broker_helpers:stop_broker(Config, 2), + + OpnConf = connection_config(0, Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session0} = amqp10_client:begin_session_sync(Connection), + flush(attaching_receiver), + {ok, _Receiver} = amqp10_client:attach_receiver_link( + Session0, <<"receiver">>, Address), + receive + {amqp10_event, + {session, Session0, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_INTERNAL_ERROR, + description = {utf8, Desc}}}}} -> + ?assertMatch( + <<"failed consuming from quorum queue 'q-down' in vhost '/'", _Reason/binary>>, + Desc) + after 9000 -> + ct:fail({missing_event, ?LINE}) + end, + + ok = rabbit_ct_broker_helpers:start_broker(Config, 2), + + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync( + Session, <<"my link pair">>), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = close({Connection, Session, LinkPair}). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% internal -%% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% receive_all_messages(Receiver, Accept) -> receive_all_messages0(Receiver, Accept, []). diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 629361c9eb3e..56e5f4a710c8 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -80,6 +80,7 @@ groups() -> metrics_cleanup_on_leadership_takeover, metrics_cleanup_on_leader_crash, consume_in_minority, + get_in_minority, reject_after_leader_transfer, shrink_all, rebalance, @@ -1030,25 +1031,48 @@ publish_and_restart(Config) -> wait_for_messages_pending_ack(Servers, RaName, 0). consume_in_minority(Config) -> - [Server0, Server1, Server2] = - rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server0, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), QQ = ?config(queue_name, Config), - RaName = binary_to_atom(<<"%2F_", QQ/binary>>, utf8), + RaName = binary_to_atom(<<"%2F_", QQ/binary>>), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - rabbit_quorum_queue:stop_server({RaName, Server1}), - rabbit_quorum_queue:stop_server({RaName, Server2}), + ok = rabbit_quorum_queue:stop_server({RaName, Server1}), + ok = rabbit_quorum_queue:stop_server({RaName, Server2}), + + ?assertExit( + {{shutdown, + {connection_closing, + {server_initiated_close, 541, + <<"INTERNAL_ERROR - failed consuming from quorum queue " + "'consume_in_minority' in vhost '/'", _Reason/binary>>}}}, _}, + amqp_channel:subscribe(Ch, #'basic.consume'{queue = QQ}, self())), + + ok = rabbit_quorum_queue:restart_server({RaName, Server1}), + ok = rabbit_quorum_queue:restart_server({RaName, Server2}). + +get_in_minority(Config) -> + [Server0, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + RaName = binary_to_atom(<<"%2F_", QQ/binary>>), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + ok = rabbit_quorum_queue:stop_server({RaName, Server1}), + ok = rabbit_quorum_queue:stop_server({RaName, Server2}), ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _}, amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = false})), - rabbit_quorum_queue:restart_server({RaName, Server1}), - rabbit_quorum_queue:restart_server({RaName, Server2}), - ok. + ok = rabbit_quorum_queue:restart_server({RaName, Server1}), + ok = rabbit_quorum_queue:restart_server({RaName, Server2}). single_active_consumer_priority_take_over(Config) -> check_quorum_queues_v4_compat(Config), diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index b14decb18971..7ae0893a13eb 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -1506,10 +1506,9 @@ consume(Q, QoS, #state{ State1 = State0#state{queue_states = QStates}, State = maybe_set_queue_qos1(QoS, State1), {ok, State}; - {error, Reason} = Err -> - ?LOG_ERROR("Failed to consume from ~s: ~p", - [rabbit_misc:rs(QName), Reason]), - Err + {error, Type, Fmt, Args} -> + ?LOG_ERROR(Fmt, Args), + {error, Type} end end) end;