Skip to content

Commit 3b8d508

Browse files
Merge pull request #1118 from rabbitmq/rabbitmq-management-agent-25
Periodic garbage collection of stats
2 parents 1f1d872 + af3fe74 commit 3b8d508

File tree

4 files changed

+475
-1
lines changed

4 files changed

+475
-1
lines changed

src/rabbit.erl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,12 @@
194194
[background_gc]}},
195195
{enables, networking}]}).
196196

197+
-rabbit_boot_step({rabbit_core_metrics_gc,
198+
[{description, "background core metrics garbage collection"},
199+
{mfa, {rabbit_sup, start_restartable_child,
200+
[rabbit_core_metrics_gc]}},
201+
{enables, networking}]}).
202+
197203
%%---------------------------------------------------------------------------
198204

199205
-include("rabbit_framing.hrl").

src/rabbit_core_metrics_gc.erl

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 1.1 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at http://www.mozilla.org/MPL/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is GoPivotal, Inc.
14+
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
15+
%%
16+
-module(rabbit_core_metrics_gc).
17+
18+
-record(state, {timer,
19+
interval
20+
}).
21+
22+
-spec start_link() -> rabbit_types:ok_pid_or_error().
23+
24+
-export([start_link/0]).
25+
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
26+
code_change/3]).
27+
28+
start_link() ->
29+
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
30+
31+
init(_) ->
32+
Interval = rabbit_misc:get_env(rabbit, core_metrics_gc_interval, 120000),
33+
{ok, start_timer(#state{interval = Interval})}.
34+
35+
handle_call(test, _From, State) ->
36+
{reply, ok, State}.
37+
38+
handle_cast(_Request, State) ->
39+
{noreply, State}.
40+
41+
handle_info(start_gc, State) ->
42+
gc_connections(),
43+
gc_channels(),
44+
gc_queues(),
45+
gc_exchanges(),
46+
gc_nodes(),
47+
gc_gen_server2(),
48+
{noreply, start_timer(State)}.
49+
50+
terminate(_Reason, #state{timer = TRef}) ->
51+
erlang:cancel_timer(TRef),
52+
ok.
53+
54+
code_change(_OldVsn, State, _Extra) ->
55+
{ok, State}.
56+
57+
start_timer(#state{interval = Interval} = St) ->
58+
TRef = erlang:send_after(Interval, self(), start_gc),
59+
St#state{timer = TRef}.
60+
61+
gc_connections() ->
62+
gc_process(connection_created),
63+
gc_process(connection_metrics),
64+
gc_process(connection_coarse_metrics).
65+
66+
gc_channels() ->
67+
%% TODO channel stats
68+
gc_process(channel_created),
69+
gc_process(channel_metrics),
70+
gc_process(channel_process_metrics),
71+
ok.
72+
73+
gc_queues() ->
74+
Queues = rabbit_amqqueue:list_names(),
75+
GbSet = gb_sets:from_list(Queues),
76+
gc_entity(queue_metrics, GbSet),
77+
gc_entity(queue_coarse_metrics, GbSet),
78+
gc_process_and_entity(channel_queue_metrics, GbSet),
79+
gc_process_and_entity(consumer_created, GbSet),
80+
ExchangeGbSet = gb_sets:from_list(rabbit_exchange:list_names()),
81+
gc_process_and_entities(channel_queue_exchange_metrics, GbSet, ExchangeGbSet).
82+
83+
gc_exchanges() ->
84+
Exchanges = rabbit_exchange:list_names(),
85+
GbSet = gb_sets:from_list(Exchanges),
86+
gc_process_and_entity(channel_exchange_metrics, GbSet).
87+
88+
gc_nodes() ->
89+
Nodes = rabbit_mnesia:cluster_nodes(all),
90+
GbSet = gb_sets:from_list(Nodes),
91+
gc_entity(node_node_metrics, GbSet).
92+
93+
gc_gen_server2() ->
94+
gc_process(gen_server2_metrics).
95+
96+
gc_process(Table) ->
97+
ets:foldl(fun({Pid = Key, _}, none) ->
98+
gc_process(Pid, Table, Key);
99+
({Pid = Key, _, _, _}, none) ->
100+
gc_process(Pid, Table, Key)
101+
end, none, Table).
102+
103+
gc_process(Pid, Table, Key) ->
104+
case erlang:is_process_alive(Pid) of
105+
true ->
106+
none;
107+
false ->
108+
%% TODO catch?
109+
ets:delete(Table, Key),
110+
none
111+
end.
112+
113+
gc_entity(Table, GbSet) ->
114+
ets:foldl(fun({{_, Id} = Key, _}, none) ->
115+
gc_entity(Id, Table, Key, GbSet);
116+
({Id = Key, _}, none) ->
117+
gc_entity(Id, Table, Key, GbSet);
118+
({Id = Key, _, _, _, _}, none) ->
119+
gc_entity(Id, Table, Key, GbSet)
120+
end, none, Table).
121+
122+
gc_entity(Id, Table, Key, GbSet) ->
123+
case gb_sets:is_member(Id, GbSet) of
124+
true ->
125+
none;
126+
false ->
127+
%% TODO catch?
128+
ets:delete(Table, Key),
129+
none
130+
end.
131+
132+
gc_process_and_entity(Table, GbSet) ->
133+
ets:foldl(fun({{Pid, Id} = Key, _, _, _, _, _, _}, none)
134+
when Table == channel_queue_metrics ->
135+
gc_entity(Id, Table, Key, GbSet),
136+
gc_process(Pid, Table, Key);
137+
({{Pid, Id} = Key, _, _, _}, none)
138+
when Table == channel_exchange_metrics ->
139+
gc_entity(Id, Table, Key, GbSet),
140+
gc_process(Pid, Table, Key);
141+
({{Id, Pid, _} = Key, _, _, _, _}, none)
142+
when Table == consumer_created ->
143+
gc_entity(Id, Table, Key, GbSet),
144+
gc_process(Pid, Table, Key);
145+
({{{Pid, Id}, _} = Key, _, _, _, _}, none) ->
146+
gc_process_and_entity(Id, Pid, Table, Key, GbSet)
147+
end, none, Table).
148+
149+
gc_process_and_entity(Id, Pid, Table, Key, GbSet) ->
150+
case erlang:is_process_alive(Pid) orelse gb_sets:is_member(Id, GbSet) of
151+
true ->
152+
none;
153+
false ->
154+
%% TODO catch?
155+
ets:delete(Table, Key),
156+
none
157+
end.
158+
159+
gc_process_and_entities(Table, QueueGbSet, ExchangeGbSet) ->
160+
ets:foldl(fun({{Pid, {Q, X}} = Key, _}, none) ->
161+
gc_process(Pid, Table, Key),
162+
gc_entity(Q, Table, Key, QueueGbSet),
163+
gc_entity(X, Table, Key, ExchangeGbSet)
164+
end, none, Table).

src/rabbit_exchange.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2,
2424
update_scratch/3, update_decorators/1, immutable/1,
2525
info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4,
26-
route/2, delete/2, validate_binding/2]).
26+
route/2, delete/2, validate_binding/2, list_names/0]).
2727
%% these must be run inside a mnesia tx
2828
-export([maybe_auto_delete/2, serial/1, peek_serial/1, update/2]).
2929

@@ -61,6 +61,7 @@
6161
(name()) -> rabbit_types:exchange() |
6262
rabbit_types:channel_exit().
6363
-spec list() -> [rabbit_types:exchange()].
64+
-spec list_names() -> [rabbit_exchange:name()].
6465
-spec list(rabbit_types:vhost()) -> [rabbit_types:exchange()].
6566
-spec lookup_scratch(name(), atom()) ->
6667
rabbit_types:ok(term()) |
@@ -258,6 +259,8 @@ lookup_or_die(Name) ->
258259

259260
list() -> mnesia:dirty_match_object(rabbit_exchange, #exchange{_ = '_'}).
260261

262+
list_names() -> mnesia:dirty_all_keys(rabbit_exchange).
263+
261264
%% Not dirty_match_object since that would not be transactional when used in a
262265
%% tx context
263266
list(VHostPath) ->

0 commit comments

Comments
 (0)