Skip to content

Commit dd947c6

Browse files
authored
Merge pull request #1839 from rabbitmq/rabbitmq-server-1838-active-field-for-consumers
Update active flag for consumers
2 parents a4b6025 + dcf663f commit dd947c6

8 files changed

+283
-93
lines changed

src/rabbit_amqqueue.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@
224224

225225
-define(CONSUMER_INFO_KEYS,
226226
[queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
227-
single_active, arguments]).
227+
active, activity_status, arguments]).
228228

229229
warn_file_limit() ->
230230
DurableQueues = find_recoverable_queues(),
@@ -958,8 +958,8 @@ emit_consumers_local(VHostPath, Ref, AggregatorPid) ->
958958
get_queue_consumer_info(Q, ConsumerInfoKeys) ->
959959
[lists:zip(ConsumerInfoKeys,
960960
[Q#amqqueue.name, ChPid, CTag,
961-
AckRequired, Prefetch, SingleActive, Args]) ||
962-
{ChPid, CTag, AckRequired, Prefetch, SingleActive, Args, _} <- consumers(Q)].
961+
AckRequired, Prefetch, Active, ActivityStatus, Args]) ||
962+
{ChPid, CTag, AckRequired, Prefetch, Active, ActivityStatus, Args, _} <- consumers(Q)].
963963

964964
stat(#amqqueue{type = quorum} = Q) -> rabbit_quorum_queue:stat(Q);
965965
stat(#amqqueue{pid = QPid}) -> delegate:invoke(QPid, {gen_server2, call, [stat, infinity]}).

src/rabbit_amqqueue_process.erl

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ terminate_shutdown(Fun, #q{status = Status} = State) ->
364364
QName = qname(State),
365365
notify_decorators(shutdown, State),
366366
[emit_consumer_deleted(Ch, CTag, QName, ActingUser) ||
367-
{Ch, CTag, _, _, _, _, _} <-
367+
{Ch, CTag, _, _, _, _, _, _} <-
368368
rabbit_queue_consumers:all(Consumers)],
369369
State1#q{backing_queue_state = Fun(BQS)}
370370
end.
@@ -1211,7 +1211,7 @@ handle_call({info, Items}, _From, State) ->
12111211
handle_call(consumers, _From, State = #q{consumers = Consumers, single_active_consumer_on = false}) ->
12121212
reply(rabbit_queue_consumers:all(Consumers), State);
12131213
handle_call(consumers, _From, State = #q{consumers = Consumers, active_consumer = ActiveConsumer}) ->
1214-
reply(rabbit_queue_consumers:all(Consumers, ActiveConsumer), State);
1214+
reply(rabbit_queue_consumers:all(Consumers, ActiveConsumer, true), State);
12151215

12161216
handle_call({notify_down, ChPid}, _From, State) ->
12171217
%% we want to do this synchronously, so that auto_deleted queues
@@ -1296,15 +1296,18 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
12961296
QName = qname(State1),
12971297
AckRequired = not NoAck,
12981298
TheConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, State1#q.consumers),
1299-
IsSingleActiveConsumer = case {SingleActiveConsumerOn, State1#q.active_consumer} of
1300-
{true, TheConsumer} ->
1301-
true;
1302-
_ ->
1303-
false
1304-
end,
1299+
{ConsumerIsActive, ActivityStatus} =
1300+
case {SingleActiveConsumerOn, State1#q.active_consumer} of
1301+
{true, TheConsumer} ->
1302+
{true, single_active};
1303+
{true, _} ->
1304+
{false, waiting};
1305+
{false, _} ->
1306+
{true, up}
1307+
end,
13051308
rabbit_core_metrics:consumer_created(
13061309
ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
1307-
PrefetchCount, IsSingleActiveConsumer, Args),
1310+
PrefetchCount, ConsumerIsActive, ActivityStatus, Args),
13081311
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
13091312
AckRequired, QName, PrefetchCount,
13101313
Args, none, ActingUser),
@@ -1425,7 +1428,7 @@ maybe_notify_consumer_updated(#q{single_active_consumer_on = true} = State, _Pre
14251428
{Tag, Ack, Prefetch, Args} = rabbit_queue_consumers:get_infos(Consumer),
14261429
rabbit_core_metrics:consumer_updated(
14271430
ChPid, Tag, false, Ack, qname(State),
1428-
Prefetch, true, Args
1431+
Prefetch, true, single_active, Args
14291432
),
14301433
ok;
14311434
_ ->

src/rabbit_core_metrics_gc.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ gc_process_and_entity(Table, GbSet) ->
160160
({{Pid, Id} = Key, _, _, _, _}, none)
161161
when Table == channel_exchange_metrics ->
162162
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
163-
({{Id, Pid, _} = Key, _, _, _, _, _}, none)
163+
({{Id, Pid, _} = Key, _, _, _, _, _, _}, none)
164164
when Table == consumer_created ->
165165
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
166166
({{{Pid, Id}, _} = Key, _, _, _, _}, none) ->

0 commit comments

Comments
 (0)