Skip to content

Commit 2d02a10

Browse files
committed
Fix crash when consuming from unavailable quorum queue
Prior to this commit, when a client consumed from an unavailable quorum queue, the following crash occurred: ``` {badmatch,{error,noproc}} [{rabbit_quorum_queue,consume,3,[{file,\"rabbit_quorum_queue.erl\"},{line,993}]} ``` This commit fixes this bug by returning any error when registering a quorum queue consumer to rabbit_queue_type. This commit also refactors errors returned by rabbit_queue_type:consume/3 to simplify and ensure seperation of concerns. For example prior to this commit, the channel did error formatting specifically for consuming from streams. It's better if the channel is unaware of what queue type it consumes from and have each queue type implementation format their own errors.
1 parent 0e803de commit 2d02a10

File tree

10 files changed

+190
-125
lines changed

10 files changed

+190
-125
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1494,12 +1494,7 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
14941494
topic_permission_cache = TopicPermCache},
14951495
rabbit_global_counters:consumer_created(?PROTOCOL),
14961496
{ok, [A], State1};
1497-
{error, Reason} ->
1498-
protocol_error(
1499-
?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
1500-
"Consuming from ~s failed: ~tp",
1501-
[rabbit_misc:rs(QName), Reason]);
1502-
{protocol_error, _Type, Reason, Args} ->
1497+
{error, _Type, Reason, Args} ->
15031498
protocol_error(
15041499
?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
15051500
Reason, Args)

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1816,8 +1816,7 @@ basic_get(Q, NoAck, LimiterPid, CTag, QStates) ->
18161816
rabbit_framing:amqp_table(), any(), rabbit_types:username(),
18171817
rabbit_queue_type:state()) ->
18181818
{ok, rabbit_queue_type:state()} |
1819-
{error, term()} |
1820-
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
1819+
{error, Type :: atom(), Format :: string(), FormatArgs :: [term()]}.
18211820
basic_consume(Q, NoAck, ChPid, LimiterPid,
18221821
LimiterActive, ConsumerPrefetchCount, ConsumerTag,
18231822
ExclusiveConsume, Args, OkMsg, ActingUser, QStates) ->

deps/rabbit/src/rabbit_channel.erl

Lines changed: 29 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1354,39 +1354,23 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
13541354
CurrentConsumers = maps:size(ConsumerMapping),
13551355
case maps:find(ConsumerTag, ConsumerMapping) of
13561356
error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity'
1357-
rabbit_misc:protocol_error(
1358-
not_allowed, "reached maximum (~B) of consumers per channel", [MaxConsumers]);
1357+
rabbit_misc:protocol_error(
1358+
not_allowed,
1359+
"reached maximum (~B) of consumers per channel",
1360+
[MaxConsumers]);
13591361
error ->
13601362
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
13611363
check_read_permitted(QueueName, User, AuthzContext),
1362-
ActualConsumerTag =
1363-
case ConsumerTag of
1364-
<<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
1365-
"amq.ctag");
1366-
Other -> Other
1367-
end,
1368-
case basic_consume(
1369-
QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
1370-
ExclusiveConsume, Args, NoWait, State) of
1371-
{ok, State1} ->
1372-
{noreply, State1};
1373-
{error, exclusive_consume_unavailable} ->
1374-
rabbit_misc:protocol_error(
1375-
access_refused, "~ts in exclusive use",
1376-
[rabbit_misc:rs(QueueName)]);
1377-
{error, global_qos_not_supported_for_queue_type} ->
1378-
rabbit_misc:protocol_error(
1379-
not_implemented, "~ts does not support global qos",
1380-
[rabbit_misc:rs(QueueName)]);
1381-
{error, timeout} ->
1382-
rabbit_misc:protocol_error(
1383-
internal_error, "~ts timeout occurred during consume operation",
1384-
[rabbit_misc:rs(QueueName)]);
1385-
{error, no_local_stream_replica_available} ->
1386-
rabbit_misc:protocol_error(
1387-
resource_error, "~ts does not have a running local replica",
1388-
[rabbit_misc:rs(QueueName)])
1389-
end;
1364+
ActualTag = case ConsumerTag of
1365+
<<>> ->
1366+
rabbit_guid:binary(
1367+
rabbit_guid:gen_secure(), "amq.ctag");
1368+
_ ->
1369+
ConsumerTag
1370+
end,
1371+
basic_consume(
1372+
QueueName, NoAck, ConsumerPrefetch, ActualTag,
1373+
ExclusiveConsume, Args, NoWait, State);
13901374
{ok, _} ->
13911375
%% Attempted reuse of consumer tag.
13921376
rabbit_misc:protocol_error(
@@ -1685,11 +1669,11 @@ handle_method(_MethodRecord, _Content, _State) ->
16851669
%% for why.
16861670
basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
16871671
ExclusiveConsume, Args, NoWait,
1688-
State = #ch{cfg = #conf{conn_pid = ConnPid,
1689-
user = #user{username = Username}},
1690-
limiter = Limiter,
1691-
consumer_mapping = ConsumerMapping,
1692-
queue_states = QueueStates0}) ->
1672+
State0 = #ch{cfg = #conf{conn_pid = ConnPid,
1673+
user = #user{username = Username}},
1674+
limiter = Limiter,
1675+
consumer_mapping = ConsumerMapping,
1676+
queue_states = QueueStates0}) ->
16931677
case rabbit_amqqueue:with_exclusive_access_or_die(
16941678
QueueName, ConnPid,
16951679
fun (Q) ->
@@ -1710,22 +1694,16 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
17101694
ActualConsumerTag,
17111695
{Q, {NoAck, ConsumerPrefetch, ExclusiveConsume, Args}},
17121696
ConsumerMapping),
1713-
1714-
State1 = State#ch{consumer_mapping = CM1,
1715-
queue_states = QueueStates},
1716-
{ok, case NoWait of
1717-
true -> consumer_monitor(ActualConsumerTag, State1);
1718-
false -> State1
1719-
end};
1720-
{{error, exclusive_consume_unavailable} = E, _Q} ->
1721-
E;
1722-
{{error, global_qos_not_supported_for_queue_type} = E, _Q} ->
1723-
E;
1724-
{{error, no_local_stream_replica_available} = E, _Q} ->
1725-
E;
1726-
{{error, timeout} = E, _Q} ->
1727-
E;
1728-
{{protocol_error, Type, Reason, ReasonArgs}, _Q} ->
1697+
State1 = State0#ch{consumer_mapping = CM1,
1698+
queue_states = QueueStates},
1699+
State = case NoWait of
1700+
true ->
1701+
consumer_monitor(ActualConsumerTag, State1);
1702+
false ->
1703+
State1
1704+
end,
1705+
{noreply, State};
1706+
{{error, Type, Reason, ReasonArgs}, _Q} ->
17291707
rabbit_misc:protocol_error(Type, Reason, ReasonArgs)
17301708
end.
17311709

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,8 +297,12 @@ consume(Q, Spec, State0) when ?amqqueue_is_classic(Q) ->
297297
%% TODO: track pids as they change
298298
State = ensure_monitor(QPid, QRef, State0),
299299
{ok, State#?STATE{pid = QPid}};
300-
Err ->
301-
Err
300+
{error, exclusive_consume_unavailable} ->
301+
{error, access_refused, "~ts in exclusive use",
302+
[rabbit_misc:rs(QRef)]};
303+
{error, Reason} ->
304+
{error, internal_error, "failed consuming from classic ~ts: ~tp",
305+
[rabbit_misc:rs(QRef), Reason]}
302306
end.
303307

304308
%% Delete this function when feature flag rabbitmq_4.0.0 becomes required.

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -211,8 +211,7 @@
211211
consume_spec(),
212212
queue_state()) ->
213213
{ok, queue_state(), actions()} |
214-
{error, term()} |
215-
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
214+
{error, Type :: atom(), Format :: string(), FormatArgs :: [term()]}.
216215

217216
-callback cancel(amqqueue:amqqueue(),
218217
cancel_spec(),
@@ -516,15 +515,14 @@ new(Q, State) when ?is_amqqueue(Q) ->
516515

517516
-spec consume(amqqueue:amqqueue(), consume_spec(), state()) ->
518517
{ok, state()} |
519-
{error, term()} |
520-
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
518+
{error, Type :: atom(), Format :: string(), FormatArgs :: [term()]}.
521519
consume(Q, Spec, State) ->
522520
#ctx{state = CtxState0} = Ctx = get_ctx(Q, State),
523521
Mod = amqqueue:get_type(Q),
524522
case Mod:consume(Q, Spec, CtxState0) of
525523
{ok, CtxState} ->
526524
{ok, set_ctx(Q, Ctx#ctx{state = CtxState}, State)};
527-
Err ->
525+
Err = {error, _Type, _Fmt, _FmtArgs} ->
528526
Err
529527
end.
530528

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 50 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -971,10 +971,12 @@ dequeue(QName, NoAck, _LimiterPid, CTag0, QState0) ->
971971
rabbit_queue_type:consume_spec(),
972972
rabbit_fifo_client:state()) ->
973973
{ok, rabbit_fifo_client:state(), rabbit_queue_type:actions()} |
974-
{error, global_qos_not_supported_for_queue_type | timeout}.
974+
{error, atom(), Format :: string(), FormatArgs :: [term()]}.
975975
consume(Q, #{limiter_active := true}, _State)
976976
when ?amqqueue_is_quorum(Q) ->
977-
{error, global_qos_not_supported_for_queue_type};
977+
{error, not_implemented,
978+
"~ts does not support global qos",
979+
[rabbit_misc:rs(amqqueue:get_name(Q))]};
978980
consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
979981
#{no_ack := NoAck,
980982
channel_pid := ChPid,
@@ -1008,46 +1010,58 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
10081010
args => Args,
10091011
username => ActingUser,
10101012
priority => Priority},
1011-
{ok, _Infos, QState} = rabbit_fifo_client:checkout(ConsumerTag,
1012-
Mode, ConsumerMeta,
1013-
QState0),
1014-
case single_active_consumer_on(Q) of
1015-
true ->
1016-
%% get the leader from state
1017-
case rabbit_fifo_client:query_single_active_consumer(QState) of
1018-
{ok, SacResult} ->
1019-
ActivityStatus = case SacResult of
1020-
{value, {ConsumerTag, ChPid}} ->
1021-
single_active;
1022-
_ ->
1023-
waiting
1024-
end,
1013+
case rabbit_fifo_client:checkout(
1014+
ConsumerTag, Mode, ConsumerMeta, QState0) of
1015+
{ok, _Infos, QState} ->
1016+
case single_active_consumer_on(Q) of
1017+
true ->
1018+
%% get the leader from state
1019+
case rabbit_fifo_client:query_single_active_consumer(QState) of
1020+
{ok, SacResult} ->
1021+
ActivityStatus = case SacResult of
1022+
{value, {ConsumerTag, ChPid}} ->
1023+
single_active;
1024+
_ ->
1025+
waiting
1026+
end,
1027+
rabbit_core_metrics:consumer_created(
1028+
ChPid, ConsumerTag, ExclusiveConsume,
1029+
AckRequired, QName,
1030+
Prefetch, ActivityStatus == single_active, %% Active
1031+
ActivityStatus, Args),
1032+
emit_consumer_created(
1033+
ChPid, ConsumerTag, ExclusiveConsume,
1034+
AckRequired, QName, Prefetch,
1035+
Args, none, ActingUser),
1036+
{ok, QState};
1037+
Err ->
1038+
consume_error(Err, QName)
1039+
end;
1040+
false ->
10251041
rabbit_core_metrics:consumer_created(
10261042
ChPid, ConsumerTag, ExclusiveConsume,
10271043
AckRequired, QName,
1028-
Prefetch, ActivityStatus == single_active, %% Active
1029-
ActivityStatus, Args),
1030-
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
1031-
AckRequired, QName, Prefetch,
1032-
Args, none, ActingUser),
1033-
{ok, QState};
1034-
{error, Error} ->
1035-
Error;
1036-
{timeout, _} ->
1037-
{error, timeout}
1044+
Prefetch, true, %% Active
1045+
up, Args),
1046+
emit_consumer_created(
1047+
ChPid, ConsumerTag, ExclusiveConsume,
1048+
AckRequired, QName, Prefetch,
1049+
Args, none, ActingUser),
1050+
{ok, QState}
10381051
end;
1039-
false ->
1040-
rabbit_core_metrics:consumer_created(
1041-
ChPid, ConsumerTag, ExclusiveConsume,
1042-
AckRequired, QName,
1043-
Prefetch, true, %% Active
1044-
up, Args),
1045-
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
1046-
AckRequired, QName, Prefetch,
1047-
Args, none, ActingUser),
1048-
{ok, QState}
1052+
Err ->
1053+
consume_error(Err, QName)
10491054
end.
10501055

1056+
consume_error({error, Reason}, QName) ->
1057+
{error, internal_error,
1058+
"failed consuming from quorum ~ts: ~tp",
1059+
[rabbit_misc:rs(QName), Reason]};
1060+
consume_error({timeout, RaServerId}, QName) ->
1061+
{error, internal_error,
1062+
"timed out consuming from quorum ~ts: ~tp",
1063+
[rabbit_misc:rs(QName), RaServerId]}.
1064+
10511065
cancel(_Q, #{consumer_tag := ConsumerTag} = Spec, State) ->
10521066
maybe_send_reply(self(), maps:get(ok_msg, Spec, undefined)),
10531067
Reason = maps:get(reason, Spec, cancel),

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -290,19 +290,23 @@ format(Q, Ctx) ->
290290

291291
consume(Q, #{mode := {simple_prefetch, 0}}, _)
292292
when ?amqqueue_is_stream(Q) ->
293-
{protocol_error, precondition_failed, "consumer prefetch count is not set for stream ~ts",
293+
{error, precondition_failed,
294+
"consumer prefetch count is not set for stream ~ts",
294295
[rabbit_misc:rs(amqqueue:get_name(Q))]};
295296
consume(Q, #{no_ack := true,
296297
mode := {simple_prefetch, _}}, _)
297298
when ?amqqueue_is_stream(Q) ->
298-
{protocol_error, not_implemented,
299+
{error, not_implemented,
299300
"automatic acknowledgement not supported by stream ~ts",
300301
[rabbit_misc:rs(amqqueue:get_name(Q))]};
301302
consume(Q, #{limiter_active := true}, _State)
302303
when ?amqqueue_is_stream(Q) ->
303-
{error, global_qos_not_supported_for_queue_type};
304+
{error, not_implemented,
305+
"~ts does not support global qos",
306+
[rabbit_misc:rs(amqqueue:get_name(Q))]};
304307
consume(Q, Spec, #stream_client{} = QState0)
305308
when ?amqqueue_is_stream(Q) ->
309+
QName = amqqueue:get_name(Q),
306310
%% Messages should include the offset as a custom header.
307311
case get_local_pid(QState0) of
308312
{LocalPid, QState} when is_pid(LocalPid) ->
@@ -314,13 +318,10 @@ consume(Q, Spec, #stream_client{} = QState0)
314318
args := Args,
315319
ok_msg := OkMsg,
316320
acting_user := ActingUser} = Spec,
317-
QName = amqqueue:get_name(Q),
318321
rabbit_log:debug("~s:~s Local pid resolved ~0p",
319322
[?MODULE, ?FUNCTION_NAME, LocalPid]),
320323
case parse_offset_arg(
321324
rabbit_misc:table_lookup(Args, <<"x-stream-offset">>)) of
322-
{error, _} = Err ->
323-
Err;
324325
{ok, OffsetSpec} ->
325326
ConsumerPrefetchCount = case Mode of
326327
{simple_prefetch, C} -> C;
@@ -344,12 +345,17 @@ consume(Q, Spec, #stream_client{} = QState0)
344345
maybe_send_reply(ChPid, OkMsg),
345346
_ = rabbit_stream_coordinator:register_local_member_listener(Q),
346347
Filter = maps:get(filter, Spec, []),
347-
begin_stream(QState, ConsumerTag, OffsetSpec, Mode, AckRequired, Filter, filter_spec(Args))
348+
begin_stream(QState, ConsumerTag, OffsetSpec, Mode,
349+
AckRequired, Filter, filter_spec(Args));
350+
{error, Reason} ->
351+
{error, precondition_failed,
352+
"failed consuming from stream ~ts: ~tp",
353+
[rabbit_misc:rs(QName), Reason]}
348354
end;
349355
{undefined, _} ->
350-
{protocol_error, precondition_failed,
356+
{error, precondition_failed,
351357
"stream ~ts does not have a running replica on the local node",
352-
[rabbit_misc:rs(amqqueue:get_name(Q))]}
358+
[rabbit_misc:rs(QName)]}
353359
end.
354360

355361
-spec parse_offset_arg(undefined |

0 commit comments

Comments
 (0)