Skip to content

Commit 9eaa79d

Browse files
committed
Add consumer activity status to metrics
[#163298456] Fixes #1838
1 parent 9f40da5 commit 9eaa79d

File tree

5 files changed

+38
-34
lines changed

5 files changed

+38
-34
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1296,17 +1296,18 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
12961296
QName = qname(State1),
12971297
AckRequired = not NoAck,
12981298
TheConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, State1#q.consumers),
1299-
ConsumerIsActive = case {SingleActiveConsumerOn, State1#q.active_consumer} of
1300-
{true, TheConsumer} ->
1301-
true;
1302-
{true, _} ->
1303-
false;
1304-
{false, _} ->
1305-
true
1306-
end,
1299+
{ConsumerIsActive, ActivityStatus} =
1300+
case {SingleActiveConsumerOn, State1#q.active_consumer} of
1301+
{true, TheConsumer} ->
1302+
{true, single_active};
1303+
{true, _} ->
1304+
{false, waiting};
1305+
{false, _} ->
1306+
{true, up}
1307+
end,
13071308
rabbit_core_metrics:consumer_created(
13081309
ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
1309-
PrefetchCount, ConsumerIsActive, Args),
1310+
PrefetchCount, ConsumerIsActive, ActivityStatus, Args),
13101311
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
13111312
AckRequired, QName, PrefetchCount,
13121313
Args, none, ActingUser),
@@ -1427,7 +1428,7 @@ maybe_notify_consumer_updated(#q{single_active_consumer_on = true} = State, _Pre
14271428
{Tag, Ack, Prefetch, Args} = rabbit_queue_consumers:get_infos(Consumer),
14281429
rabbit_core_metrics:consumer_updated(
14291430
ChPid, Tag, false, Ack, qname(State),
1430-
Prefetch, true, Args
1431+
Prefetch, true, single_active, Args
14311432
),
14321433
ok;
14331434
_ ->

src/rabbit_core_metrics_gc.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ gc_process_and_entity(Table, GbSet) ->
160160
({{Pid, Id} = Key, _, _, _, _}, none)
161161
when Table == channel_exchange_metrics ->
162162
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
163-
({{Id, Pid, _} = Key, _, _, _, _, _}, none)
163+
({{Id, Pid, _} = Key, _, _, _, _, _, _}, none)
164164
when Table == consumer_created ->
165165
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
166166
({{{Pid, Id}, _} = Key, _, _, _, _}, none) ->

src/rabbit_fifo.erl

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ apply(_, {down, ConsumerPid, noconnection},
464464
%% TODO: need to increment credit here
465465
%% with the size of the Checked map
466466
Credit = increase_credit(C, maps:size(Checked0)),
467-
Eff1 = ConsumerUpdateActiveFun(St, K, C, false, Eff),
467+
Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff),
468468
{maps:put(K, C#consumer{suspected_down = true,
469469
credit = Credit,
470470
checked_out = #{}}, Co),
@@ -531,7 +531,7 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0,
531531
{Cons1, SQ, Effects} =
532532
maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
533533
when node(P) =:= Node ->
534-
EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, EAcc),
534+
EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, up, EAcc),
535535
update_or_remove_sub(
536536
ConsumerId, C#consumer{suspected_down = false},
537537
CAcc, SQAcc, EAcc1);
@@ -548,11 +548,11 @@ apply(_, #update_config{config = Conf}, State) ->
548548
{update_config(Conf, State), ok}.
549549

550550
consumer_active_flag_update_function(#state{consumer_strategy = default}) ->
551-
fun(State, ConsumerId, Consumer, Active, Effects) ->
552-
consumer_update_active_effects(State, ConsumerId, Consumer, Active, Effects)
551+
fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) ->
552+
consumer_update_active_effects(State, ConsumerId, Consumer, Active, ActivityStatus, Effects)
553553
end;
554554
consumer_active_flag_update_function(#state{consumer_strategy = single_active}) ->
555-
fun(_, _, _, _, Effects) ->
555+
fun(_, _, _, _, _, Effects) ->
556556
Effects
557557
end.
558558

@@ -869,7 +869,8 @@ cancel_consumer(ConsumerId,
869869
service_queue = ServiceQueue1,
870870
waiting_consumers = RemainingWaitingConsumers},
871871
Effects2 = consumer_update_active_effects(State, NewActiveConsumerId,
872-
NewActiveConsumer, true, Effects1),
872+
NewActiveConsumer, true,
873+
single_active, Effects1),
873874
{State, Effects2};
874875
error ->
875876
% The cancelled consumer is not the active one
@@ -883,15 +884,16 @@ cancel_consumer(ConsumerId,
883884
end.
884885

885886
consumer_update_active_effects(#state{queue_resource = QName },
886-
ConsumerId, #consumer{meta = Meta}, Active,
887+
ConsumerId, #consumer{meta = Meta},
888+
Active, ActivityStatus,
887889
Effects) ->
888890
Ack = maps:get(ack, Meta, undefined),
889891
Prefetch = maps:get(prefetch, Meta, undefined),
890892
Args = maps:get(args, Meta, []),
891893
[{mod_call,
892894
rabbit_quorum_queue,
893895
update_consumer_handler,
894-
[QName, ConsumerId, false, Ack, Prefetch, Active, Args]}
896+
[QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]}
895897
| Effects].
896898

897899
cancel_consumer0(ConsumerId,

src/rabbit_quorum_queue.erl

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
-export([dead_letter_publish/4]).
2727
-export([queue_name/1]).
2828
-export([cluster_state/1, status/2]).
29-
-export([update_consumer_handler/7, update_consumer/8]).
29+
-export([update_consumer_handler/8, update_consumer/9]).
3030
-export([cancel_consumer_handler/2, cancel_consumer/3]).
3131
-export([become_leader/2, update_metrics/2]).
3232
-export([rpc_delete_metrics/1]).
@@ -164,13 +164,13 @@ single_active_consumer_on(#amqqueue{arguments = QArguments}) ->
164164
_ -> false
165165
end.
166166

167-
update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, Active, Args) ->
167+
update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args) ->
168168
local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer,
169-
[QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, Args]).
169+
[QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args]).
170170

171-
update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, Args) ->
171+
update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args) ->
172172
catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive, AckRequired,
173-
QName, Prefetch, Active, Args).
173+
QName, Prefetch, Active, ActivityStatus, Args).
174174

175175
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
176176
local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]).
@@ -440,19 +440,20 @@ basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum} = Q, NoAck, ChP
440440
fun rabbit_fifo:query_single_active_consumer/1),
441441

442442
SingleActiveConsumerOn = single_active_consumer_on(Q),
443-
IsSingleActiveConsumer = case {SingleActiveConsumerOn, SacResult} of
444-
{false, _} ->
445-
true;
446-
{true, {value, {ConsumerTag, ChPid}}} ->
447-
true;
448-
_ ->
449-
false
450-
end,
443+
{IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of
444+
{false, _} ->
445+
{true, up};
446+
{true, {value, {ConsumerTag, ChPid}}} ->
447+
{true, single_active};
448+
_ ->
449+
{false, waiting}
450+
end,
451451

452452
%% TODO: emit as rabbit_fifo effect
453453
rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
454454
not NoAck, QName,
455-
ConsumerPrefetchCount, IsSingleActiveConsumer, Args),
455+
ConsumerPrefetchCount, IsSingleActiveConsumer,
456+
ActivityStatus, Args),
456457
{ok, QState}.
457458

458459
basic_cancel(ConsumerTag, ChPid, OkMsg, QState0) ->

test/rabbit_core_metrics_gc_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ consumer_metrics(Config) ->
300300
CTag = <<"tag">>,
301301
rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics,
302302
consumer_created, [DeadPid, CTag, true, true,
303-
QName, 1, false, []]),
303+
QName, 1, false, waiting, []]),
304304
Id = {QName, DeadPid, CTag},
305305
[_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, [consumer_created, Id]),
306306

0 commit comments

Comments
 (0)