Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1494,12 +1494,7 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
topic_permission_cache = TopicPermCache},
rabbit_global_counters:consumer_created(?PROTOCOL),
{ok, [A], State1};
{error, Reason} ->
protocol_error(
?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
"Consuming from ~s failed: ~tp",
[rabbit_misc:rs(QName), Reason]);
{protocol_error, _Type, Reason, Args} ->
{error, _Type, Reason, Args} ->
protocol_error(
?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
Reason, Args)
Expand Down
3 changes: 1 addition & 2 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1816,8 +1816,7 @@ basic_get(Q, NoAck, LimiterPid, CTag, QStates) ->
rabbit_framing:amqp_table(), any(), rabbit_types:username(),
rabbit_queue_type:state()) ->
{ok, rabbit_queue_type:state()} |
{error, term()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
{error, Type :: atom(), Format :: string(), FormatArgs :: [term()]}.
basic_consume(Q, NoAck, ChPid, LimiterPid,
LimiterActive, ConsumerPrefetchCount, ConsumerTag,
ExclusiveConsume, Args, OkMsg, ActingUser, QStates) ->
Expand Down
83 changes: 30 additions & 53 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1354,43 +1354,26 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
CurrentConsumers = maps:size(ConsumerMapping),
case maps:find(ConsumerTag, ConsumerMapping) of
error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity'
rabbit_misc:protocol_error(
not_allowed, "reached maximum (~B) of consumers per channel", [MaxConsumers]);
rabbit_misc:protocol_error(not_allowed,
"reached maximum (~B) of consumers per channel",
[MaxConsumers]);
error ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
check_read_permitted(QueueName, User, AuthzContext),
ActualConsumerTag =
case ConsumerTag of
<<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
"amq.ctag");
Other -> Other
end,
case basic_consume(
QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
ExclusiveConsume, Args, NoWait, State) of
{ok, State1} ->
{noreply, State1};
{error, exclusive_consume_unavailable} ->
rabbit_misc:protocol_error(
access_refused, "~ts in exclusive use",
[rabbit_misc:rs(QueueName)]);
{error, global_qos_not_supported_for_queue_type} ->
rabbit_misc:protocol_error(
not_implemented, "~ts does not support global qos",
[rabbit_misc:rs(QueueName)]);
{error, timeout} ->
rabbit_misc:protocol_error(
internal_error, "~ts timeout occurred during consume operation",
[rabbit_misc:rs(QueueName)]);
{error, no_local_stream_replica_available} ->
rabbit_misc:protocol_error(
resource_error, "~ts does not have a running local replica",
[rabbit_misc:rs(QueueName)])
end;
ActualTag = case ConsumerTag of
<<>> ->
rabbit_guid:binary(
rabbit_guid:gen_secure(), "amq.ctag");
_ ->
ConsumerTag
end,
basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualTag,
ExclusiveConsume, Args, NoWait, State);
{ok, _} ->
%% Attempted reuse of consumer tag.
rabbit_misc:protocol_error(
not_allowed, "attempt to reuse consumer tag '~ts'", [ConsumerTag])
rabbit_misc:protocol_error(not_allowed,
"attempt to reuse consumer tag '~ts'",
[ConsumerTag])
end;

handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
Expand Down Expand Up @@ -1685,11 +1668,11 @@ handle_method(_MethodRecord, _Content, _State) ->
%% for why.
basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
ExclusiveConsume, Args, NoWait,
State = #ch{cfg = #conf{conn_pid = ConnPid,
user = #user{username = Username}},
limiter = Limiter,
consumer_mapping = ConsumerMapping,
queue_states = QueueStates0}) ->
State0 = #ch{cfg = #conf{conn_pid = ConnPid,
user = #user{username = Username}},
limiter = Limiter,
consumer_mapping = ConsumerMapping,
queue_states = QueueStates0}) ->
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) ->
Expand All @@ -1710,22 +1693,16 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
ActualConsumerTag,
{Q, {NoAck, ConsumerPrefetch, ExclusiveConsume, Args}},
ConsumerMapping),

State1 = State#ch{consumer_mapping = CM1,
queue_states = QueueStates},
{ok, case NoWait of
true -> consumer_monitor(ActualConsumerTag, State1);
false -> State1
end};
{{error, exclusive_consume_unavailable} = E, _Q} ->
E;
{{error, global_qos_not_supported_for_queue_type} = E, _Q} ->
E;
{{error, no_local_stream_replica_available} = E, _Q} ->
E;
{{error, timeout} = E, _Q} ->
E;
{{protocol_error, Type, Reason, ReasonArgs}, _Q} ->
State1 = State0#ch{consumer_mapping = CM1,
queue_states = QueueStates},
State = case NoWait of
true ->
consumer_monitor(ActualConsumerTag, State1);
false ->
State1
end,
{noreply, State};
{{error, Type, Reason, ReasonArgs}, _Q} ->
rabbit_misc:protocol_error(Type, Reason, ReasonArgs)
end.

Expand Down
8 changes: 6 additions & 2 deletions deps/rabbit/src/rabbit_classic_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,12 @@ consume(Q, Spec, State0) when ?amqqueue_is_classic(Q) ->
%% TODO: track pids as they change
State = ensure_monitor(QPid, QRef, State0),
{ok, State#?STATE{pid = QPid}};
Err ->
Err
{error, exclusive_consume_unavailable} ->
{error, access_refused, "~ts in exclusive use",
[rabbit_misc:rs(QRef)]};
{error, Reason} ->
{error, internal_error, "failed consuming from classic ~ts: ~tp",
[rabbit_misc:rs(QRef), Reason]}
end.

%% Delete this function when feature flag rabbitmq_4.0.0 becomes required.
Expand Down
8 changes: 3 additions & 5 deletions deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@
consume_spec(),
queue_state()) ->
{ok, queue_state(), actions()} |
{error, term()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
{error, Type :: atom(), Format :: string(), FormatArgs :: [term()]}.

-callback cancel(amqqueue:amqqueue(),
cancel_spec(),
Expand Down Expand Up @@ -516,15 +515,14 @@ new(Q, State) when ?is_amqqueue(Q) ->

-spec consume(amqqueue:amqqueue(), consume_spec(), state()) ->
{ok, state()} |
{error, term()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
{error, Type :: atom(), Format :: string(), FormatArgs :: [term()]}.
consume(Q, Spec, State) ->
#ctx{state = CtxState0} = Ctx = get_ctx(Q, State),
Mod = amqqueue:get_type(Q),
case Mod:consume(Q, Spec, CtxState0) of
{ok, CtxState} ->
{ok, set_ctx(Q, Ctx#ctx{state = CtxState}, State)};
Err ->
{error, _Type, _Fmt, _FmtArgs} = Err->
Err
end.

Expand Down
86 changes: 50 additions & 36 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -971,10 +971,12 @@ dequeue(QName, NoAck, _LimiterPid, CTag0, QState0) ->
rabbit_queue_type:consume_spec(),
rabbit_fifo_client:state()) ->
{ok, rabbit_fifo_client:state(), rabbit_queue_type:actions()} |
{error, global_qos_not_supported_for_queue_type | timeout}.
{error, atom(), Format :: string(), FormatArgs :: [term()]}.
consume(Q, #{limiter_active := true}, _State)
when ?amqqueue_is_quorum(Q) ->
{error, global_qos_not_supported_for_queue_type};
{error, not_implemented,
"~ts does not support global qos",
[rabbit_misc:rs(amqqueue:get_name(Q))]};
consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
#{no_ack := NoAck,
channel_pid := ChPid,
Expand Down Expand Up @@ -1008,46 +1010,58 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
args => Args,
username => ActingUser,
priority => Priority},
{ok, _Infos, QState} = rabbit_fifo_client:checkout(ConsumerTag,
Mode, ConsumerMeta,
QState0),
case single_active_consumer_on(Q) of
true ->
%% get the leader from state
case rabbit_fifo_client:query_single_active_consumer(QState) of
{ok, SacResult} ->
ActivityStatus = case SacResult of
{value, {ConsumerTag, ChPid}} ->
single_active;
_ ->
waiting
end,
rabbit_core_metrics:consumer_created(
ChPid, ConsumerTag, ExclusiveConsume,
AckRequired, QName,
Prefetch, ActivityStatus == single_active, %% Active
ActivityStatus, Args),
case rabbit_fifo_client:checkout(ConsumerTag, Mode, ConsumerMeta, QState0) of
{ok, _Infos, QState} ->
case single_active_consumer_on(Q) of
true ->
%% get the leader from state
case rabbit_fifo_client:query_single_active_consumer(QState) of
{ok, SacResult} ->
ActivityStatus = case SacResult of
{value, {ConsumerTag, ChPid}} ->
single_active;
_ ->
waiting
end,
rabbit_core_metrics:consumer_created(ChPid, ConsumerTag,
ExclusiveConsume,
AckRequired, QName,
Prefetch,
ActivityStatus == single_active,
ActivityStatus, Args),
emit_consumer_created(ChPid, ConsumerTag,
ExclusiveConsume,
AckRequired, QName,
Prefetch, Args, none,
ActingUser),
{ok, QState};
Err ->
consume_error(Err, QName)
end;
false ->
rabbit_core_metrics:consumer_created(ChPid, ConsumerTag,
ExclusiveConsume,
AckRequired, QName,
Prefetch, true,
up, Args),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
AckRequired, QName, Prefetch,
Args, none, ActingUser),
{ok, QState};
{error, Error} ->
Error;
{timeout, _} ->
{error, timeout}
{ok, QState}
end;
false ->
rabbit_core_metrics:consumer_created(
ChPid, ConsumerTag, ExclusiveConsume,
AckRequired, QName,
Prefetch, true, %% Active
up, Args),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
AckRequired, QName, Prefetch,
Args, none, ActingUser),
{ok, QState}
Err ->
consume_error(Err, QName)
end.

consume_error({error, Reason}, QName) ->
{error, internal_error,
"failed consuming from quorum ~ts: ~tp",
[rabbit_misc:rs(QName), Reason]};
consume_error({timeout, RaServerId}, QName) ->
{error, internal_error,
"timed out consuming from quorum ~ts: ~tp",
[rabbit_misc:rs(QName), RaServerId]}.

cancel(_Q, #{consumer_tag := ConsumerTag} = Spec, State) ->
maybe_send_reply(self(), maps:get(ok_msg, Spec, undefined)),
Reason = maps:get(reason, Spec, cancel),
Expand Down
24 changes: 15 additions & 9 deletions deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -290,19 +290,23 @@ format(Q, Ctx) ->

consume(Q, #{mode := {simple_prefetch, 0}}, _)
when ?amqqueue_is_stream(Q) ->
{protocol_error, precondition_failed, "consumer prefetch count is not set for stream ~ts",
{error, precondition_failed,
"consumer prefetch count is not set for stream ~ts",
[rabbit_misc:rs(amqqueue:get_name(Q))]};
consume(Q, #{no_ack := true,
mode := {simple_prefetch, _}}, _)
when ?amqqueue_is_stream(Q) ->
{protocol_error, not_implemented,
{error, not_implemented,
"automatic acknowledgement not supported by stream ~ts",
[rabbit_misc:rs(amqqueue:get_name(Q))]};
consume(Q, #{limiter_active := true}, _State)
when ?amqqueue_is_stream(Q) ->
{error, global_qos_not_supported_for_queue_type};
{error, not_implemented,
"~ts does not support global qos",
[rabbit_misc:rs(amqqueue:get_name(Q))]};
consume(Q, Spec, #stream_client{} = QState0)
when ?amqqueue_is_stream(Q) ->
QName = amqqueue:get_name(Q),
%% Messages should include the offset as a custom header.
case get_local_pid(QState0) of
{LocalPid, QState} when is_pid(LocalPid) ->
Expand All @@ -314,13 +318,10 @@ consume(Q, Spec, #stream_client{} = QState0)
args := Args,
ok_msg := OkMsg,
acting_user := ActingUser} = Spec,
QName = amqqueue:get_name(Q),
rabbit_log:debug("~s:~s Local pid resolved ~0p",
[?MODULE, ?FUNCTION_NAME, LocalPid]),
case parse_offset_arg(
rabbit_misc:table_lookup(Args, <<"x-stream-offset">>)) of
{error, _} = Err ->
Err;
{ok, OffsetSpec} ->
ConsumerPrefetchCount = case Mode of
{simple_prefetch, C} -> C;
Expand All @@ -344,12 +345,17 @@ consume(Q, Spec, #stream_client{} = QState0)
maybe_send_reply(ChPid, OkMsg),
_ = rabbit_stream_coordinator:register_local_member_listener(Q),
Filter = maps:get(filter, Spec, []),
begin_stream(QState, ConsumerTag, OffsetSpec, Mode, AckRequired, Filter, filter_spec(Args))
begin_stream(QState, ConsumerTag, OffsetSpec, Mode,
AckRequired, Filter, filter_spec(Args));
{error, Reason} ->
{error, precondition_failed,
"failed consuming from stream ~ts: ~tp",
[rabbit_misc:rs(QName), Reason]}
end;
{undefined, _} ->
{protocol_error, precondition_failed,
{error, precondition_failed,
"stream ~ts does not have a running replica on the local node",
[rabbit_misc:rs(amqqueue:get_name(Q))]}
[rabbit_misc:rs(QName)]}
end.

-spec parse_offset_arg(undefined |
Expand Down
Loading
Loading