Skip to content

Commit 9f40da5

Browse files
committed
Return active and activity status when listing consumers
[#163298456] Fixes #1838
1 parent dc9b575 commit 9f40da5

File tree

5 files changed

+138
-37
lines changed

5 files changed

+138
-37
lines changed

src/rabbit_amqqueue.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@
224224

225225
-define(CONSUMER_INFO_KEYS,
226226
[queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
227-
single_active, arguments]).
227+
active, activity_status, arguments]).
228228

229229
warn_file_limit() ->
230230
DurableQueues = find_recoverable_queues(),
@@ -958,8 +958,8 @@ emit_consumers_local(VHostPath, Ref, AggregatorPid) ->
958958
get_queue_consumer_info(Q, ConsumerInfoKeys) ->
959959
[lists:zip(ConsumerInfoKeys,
960960
[Q#amqqueue.name, ChPid, CTag,
961-
AckRequired, Prefetch, SingleActive, Args]) ||
962-
{ChPid, CTag, AckRequired, Prefetch, SingleActive, Args, _} <- consumers(Q)].
961+
AckRequired, Prefetch, Active, ActivityStatus, Args]) ||
962+
{ChPid, CTag, AckRequired, Prefetch, Active, ActivityStatus, Args, _} <- consumers(Q)].
963963

964964
stat(#amqqueue{type = quorum} = Q) -> rabbit_quorum_queue:stat(Q);
965965
stat(#amqqueue{pid = QPid}) -> delegate:invoke(QPid, {gen_server2, call, [stat, infinity]}).

src/rabbit_amqqueue_process.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ terminate_shutdown(Fun, #q{status = Status} = State) ->
364364
QName = qname(State),
365365
notify_decorators(shutdown, State),
366366
[emit_consumer_deleted(Ch, CTag, QName, ActingUser) ||
367-
{Ch, CTag, _, _, _, _, _} <-
367+
{Ch, CTag, _, _, _, _, _, _} <-
368368
rabbit_queue_consumers:all(Consumers)],
369369
State1#q{backing_queue_state = Fun(BQS)}
370370
end.
@@ -1211,7 +1211,7 @@ handle_call({info, Items}, _From, State) ->
12111211
handle_call(consumers, _From, State = #q{consumers = Consumers, single_active_consumer_on = false}) ->
12121212
reply(rabbit_queue_consumers:all(Consumers), State);
12131213
handle_call(consumers, _From, State = #q{consumers = Consumers, active_consumer = ActiveConsumer}) ->
1214-
reply(rabbit_queue_consumers:all(Consumers, ActiveConsumer), State);
1214+
reply(rabbit_queue_consumers:all(Consumers, ActiveConsumer, true), State);
12151215

12161216
handle_call({notify_down, ChPid}, _From, State) ->
12171217
%% we want to do this synchronously, so that auto_deleted queues

src/rabbit_fifo.erl

Lines changed: 81 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -727,31 +727,48 @@ query_consumer_count(#state{consumers = Consumers,
727727
maps:size(Consumers) + length(WaitingConsumers).
728728

729729
query_consumers(#state{consumers = Consumers,
730-
waiting_consumers = WaitingConsumers} = State) ->
731-
SingleActiveConsumer = query_single_active_consumer(State),
732-
IsSingleActiveConsumerFun = fun({Tag, Pid} = _ConsumerId) ->
733-
case SingleActiveConsumer of
734-
{value, {Tag, Pid}} ->
735-
true;
736-
_ ->
737-
false
738-
end
739-
end,
740-
FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) ->
730+
waiting_consumers = WaitingConsumers,
731+
consumer_strategy = ConsumerStrategy } = State) ->
732+
ActiveActivityStatusFun = case ConsumerStrategy of
733+
default ->
734+
fun(_ConsumerId, #consumer{suspected_down = SuspectedDown}) ->
735+
case SuspectedDown of
736+
true ->
737+
{false, suspected_down};
738+
false ->
739+
{true, up}
740+
end
741+
end;
742+
single_active ->
743+
SingleActiveConsumer = query_single_active_consumer(State),
744+
fun({Tag, Pid} = _Consumer, _) ->
745+
case SingleActiveConsumer of
746+
{value, {Tag, Pid}} ->
747+
{true, single_active};
748+
_ ->
749+
{false, waiting}
750+
end
751+
end
752+
end,
753+
FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta} = Consumer) ->
754+
{Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer),
741755
{Pid, Tag,
742756
maps:get(ack, Meta, undefined),
743757
maps:get(prefetch, Meta, undefined),
744-
IsSingleActiveConsumerFun({Tag, Pid}),
758+
Active,
759+
ActivityStatus,
745760
maps:get(args, Meta, []),
746761
maps:get(username, Meta, undefined)}
747762
end, Consumers),
748763
FromWaitingConsumers =
749-
lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta}}, Acc) ->
764+
lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) ->
765+
{Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer),
750766
maps:put({Tag, Pid},
751767
{Pid, Tag,
752768
maps:get(ack, Meta, undefined),
753769
maps:get(prefetch, Meta, undefined),
754-
IsSingleActiveConsumerFun({Tag, Pid}),
770+
Active,
771+
ActivityStatus,
755772
maps:get(args, Meta, []),
756773
maps:get(username, Meta, undefined)},
757774
Acc)
@@ -761,6 +778,8 @@ query_consumers(#state{consumers = Consumers,
761778
query_single_active_consumer(#state{consumer_strategy = single_active,
762779
consumers = Consumers}) ->
763780
case maps:size(Consumers) of
781+
0 ->
782+
{error, no_value};
764783
1 ->
765784
{value, lists:nth(1, maps:keys(Consumers))};
766785
_
@@ -2341,6 +2360,44 @@ single_active_consumer_state_enter_eol_include_waiting_consumers_test() ->
23412360
?assertEqual(3, length(Effects)).
23422361

23432362
query_consumers_test() ->
2363+
State0 = init(#{name => ?FUNCTION_NAME,
2364+
queue_resource => rabbit_misc:r("/", queue,
2365+
atom_to_binary(?FUNCTION_NAME, utf8)),
2366+
shadow_copy_interval => 0,
2367+
single_active_consumer_on => false}),
2368+
2369+
% adding some consumers
2370+
AddConsumer = fun(CTag, State) ->
2371+
{NewState, _, _} = apply(
2372+
#{},
2373+
#checkout{spec = {once, 1, simple_prefetch},
2374+
meta = #{},
2375+
consumer_id = {CTag, self()}},
2376+
State),
2377+
NewState
2378+
end,
2379+
State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
2380+
Consumers0 = State1#state.consumers,
2381+
Consumer = maps:get({<<"ctag2">>, self()}, Consumers0),
2382+
Consumers1 = maps:put({<<"ctag2">>, self()}, Consumer#consumer{suspected_down = true}, Consumers0),
2383+
State2 = State1#state{consumers = Consumers1},
2384+
2385+
?assertEqual(4, query_consumer_count(State2)),
2386+
Consumers2 = query_consumers(State2),
2387+
?assertEqual(4, maps:size(Consumers2)),
2388+
maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) ->
2389+
?assertEqual(self(), Pid),
2390+
case Tag of
2391+
<<"ctag2">> ->
2392+
?assertNot(Active),
2393+
?assertEqual(suspected_down, ActivityStatus);
2394+
_ ->
2395+
?assert(Active),
2396+
?assertEqual(up, ActivityStatus)
2397+
end
2398+
end, [], Consumers2).
2399+
2400+
query_consumers_when_single_active_consumer_is_on_test() ->
23442401
State0 = init(#{name => ?FUNCTION_NAME,
23452402
queue_resource => rabbit_misc:r("/", queue,
23462403
atom_to_binary(?FUNCTION_NAME, utf8)),
@@ -2362,8 +2419,16 @@ query_consumers_test() ->
23622419
?assertEqual(4, query_consumer_count(State1)),
23632420
Consumers = query_consumers(State1),
23642421
?assertEqual(4, maps:size(Consumers)),
2365-
maps:fold(fun({_Tag, Pid}, {Pid, _Tag, _, _, _, _, _}, _Acc) ->
2366-
?assertEqual(self(), Pid)
2422+
maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) ->
2423+
?assertEqual(self(), Pid),
2424+
case Tag of
2425+
<<"ctag1">> ->
2426+
?assert(Active),
2427+
?assertEqual(single_active, ActivityStatus);
2428+
_ ->
2429+
?assertNot(Active),
2430+
?assertEqual(waiting, ActivityStatus)
2431+
end
23672432
end, [], Consumers).
23682433

23692434
active_flag_updated_when_consumer_suspected_unsuspected_test() ->

src/rabbit_queue_consumers.erl

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
-module(rabbit_queue_consumers).
1818

19-
-export([new/0, max_active_priority/1, inactive/1, all/1, all/2, count/0,
19+
-export([new/0, max_active_priority/1, inactive/1, all/1, all/3, count/0,
2020
unacknowledged_message_count/0, add/10, remove/3, erase_ch/2,
2121
send_drained/0, deliver/5, record_ack/3, subtract_acks/3,
2222
possibly_unblock/3,
@@ -118,24 +118,32 @@ inactive(#state{consumers = Consumers}) ->
118118
priority_queue:is_empty(Consumers).
119119

120120
all(State) ->
121-
all(State, none).
122-
123-
all(#state{consumers = Consumers}, SingleActiveConsumer) ->
124-
lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, SingleActiveConsumer, Acc) end,
125-
consumers(Consumers, SingleActiveConsumer, []), all_ch_record()).
126-
127-
consumers(Consumers, SingleActiveConsumer, Acc) ->
121+
all(State, none, false).
122+
123+
all(#state{consumers = Consumers}, SingleActiveConsumer, SingleActiveConsumerOn) ->
124+
lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, SingleActiveConsumer, SingleActiveConsumerOn, Acc) end,
125+
consumers(Consumers, SingleActiveConsumer, SingleActiveConsumerOn, []), all_ch_record()).
126+
127+
consumers(Consumers, SingleActiveConsumer, SingleActiveConsumerOn, Acc) ->
128+
ActiveActivityStatusFun = case SingleActiveConsumerOn of
129+
true ->
130+
fun({ChPid, Consumer}) ->
131+
case SingleActiveConsumer of
132+
{ChPid, Consumer} ->
133+
{true, single_active};
134+
_ ->
135+
{false, waiting}
136+
end
137+
end;
138+
false ->
139+
fun(_) -> {true, up} end
140+
end,
128141
priority_queue:fold(
129142
fun ({ChPid, Consumer}, _P, Acc1) ->
130143
#consumer{tag = CTag, ack_required = Ack, prefetch = Prefetch,
131144
args = Args, user = Username} = Consumer,
132-
IsSingleActive = case SingleActiveConsumer of
133-
{ChPid, Consumer} ->
134-
true;
135-
_ ->
136-
false
137-
end,
138-
[{ChPid, CTag, Ack, Prefetch, IsSingleActive, Args, Username} | Acc1]
145+
{Active, ActivityStatus} = ActiveActivityStatusFun({ChPid, Consumer}),
146+
[{ChPid, CTag, Ack, Prefetch, Active, ActivityStatus, Args, Username} | Acc1]
139147
end, Acc, Consumers).
140148

141149
count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).

test/unit_queue_consumers_SUITE.erl

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ all() ->
2525
[
2626
is_same,
2727
get_consumer,
28-
get
28+
get,
29+
list_consumers
2930
].
3031

3132
is_same(_Config) ->
@@ -79,6 +80,33 @@ get_consumer(_Config) ->
7980
),
8081
ok.
8182

83+
list_consumers(_Config) ->
84+
State = state(consumers([consumer(self(), <<"1">>), consumer(self(), <<"2">>), consumer(self(), <<"3">>)])),
85+
Consumer = rabbit_queue_consumers:get_consumer(State),
86+
{_Pid, ConsumerRecord} = Consumer,
87+
CTag = rabbit_queue_consumers:consumer_tag(ConsumerRecord),
88+
ConsumersWithSingleActive = rabbit_queue_consumers:all(State, Consumer, true),
89+
?assertEqual(3, length(ConsumersWithSingleActive)),
90+
lists:foldl(fun({Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) ->
91+
?assertEqual(self(), Pid),
92+
case Tag of
93+
CTag ->
94+
?assert(Active),
95+
?assertEqual(single_active, ActivityStatus);
96+
_ ->
97+
?assertNot(Active),
98+
?assertEqual(waiting, ActivityStatus)
99+
end
100+
end, [], ConsumersWithSingleActive),
101+
ConsumersNoSingleActive = rabbit_queue_consumers:all(State, none, false),
102+
?assertEqual(3, length(ConsumersNoSingleActive)),
103+
lists:foldl(fun({Pid, _, _, _, Active, ActivityStatus, _, _}, _Acc) ->
104+
?assertEqual(self(), Pid),
105+
?assert(Active),
106+
?assertEqual(up, ActivityStatus)
107+
end, [], ConsumersNoSingleActive),
108+
ok.
109+
82110
consumers([]) ->
83111
priority_queue:new();
84112
consumers(Consumers) ->

0 commit comments

Comments
 (0)