Skip to content

Commit df276c6

Browse files
Merge pull request #197 from cloudamqp/publish_out_stats
Bump exchange publish out metric when routing delayed messages
2 parents baa407d + a38ee58 commit df276c6

File tree

2 files changed

+107
-9
lines changed

2 files changed

+107
-9
lines changed

src/rabbit_delayed_message.erl

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
code_change/3]).
2929
-export([messages_delayed/1]).
3030

31+
%% For testing, debugging and manual use
32+
-export([refresh_config/0]).
33+
3134
-import(rabbit_delayed_message_utils, [swap_delay_header/1]).
3235

3336
-type t_reference() :: reference().
@@ -49,7 +52,8 @@
4952
-define(TABLE_NAME, append_to_atom(?MODULE, node())).
5053
-define(INDEX_TABLE_NAME, append_to_atom(?TABLE_NAME, "_index")).
5154

52-
-record(state, {timer}).
55+
-record(state, {timer,
56+
stats_state}).
5357

5458
-record(delay_key,
5559
{ timestamp, %% timestamp delay
@@ -104,6 +108,9 @@ messages_delayed(Exchange) ->
104108
Delays = mnesia:dirty_select(?TABLE_NAME, [{MatchHead, [], ['$_']}]),
105109
length(Delays).
106110

111+
refresh_config() ->
112+
gen_server:call(?MODULE, refresh_config).
113+
107114
%%--------------------------------------------------------------------
108115

109116
init([]) ->
@@ -120,11 +127,14 @@ handle_call({delay_message, Exchange, Delivery, Delay},
120127
State
121128
end,
122129
{reply, Reply, State2};
130+
handle_call(refresh_config, _From, State) ->
131+
{reply, ok, refresh_config(State)};
123132
handle_call(_Req, _From, State) ->
124133
{reply, unknown_request, State}.
125134

126135
handle_cast(go, State) ->
127-
{noreply, State#state{timer = maybe_delay_first()}};
136+
State2 = refresh_config(State),
137+
{noreply, State2#state{timer = maybe_delay_first()}};
128138
handle_cast(_C, State) ->
129139
{noreply, State}.
130140

@@ -133,7 +143,7 @@ handle_info({timeout, _TimerRef, {deliver, Key}}, State) ->
133143
[] ->
134144
ok;
135145
Deliveries ->
136-
route(Key, Deliveries),
146+
route(Key, Deliveries, State),
137147
mnesia:dirty_delete(?TABLE_NAME, Key),
138148
mnesia:dirty_delete(?INDEX_TABLE_NAME, Key)
139149
end,
@@ -160,12 +170,14 @@ maybe_delay_first() ->
160170
not_set
161171
end.
162172

163-
route(#delay_key{exchange = Ex}, Deliveries) ->
173+
route(#delay_key{exchange = Ex}, Deliveries, State) ->
174+
ExName = Ex#exchange.name,
164175
lists:map(fun (#delay_entry{delivery = D}) ->
165176
D2 = swap_delay_header(D),
166177
Dests = rabbit_exchange:route(Ex, D2),
167178
Qs = rabbit_amqqueue:lookup(Dests),
168-
rabbit_amqqueue:deliver(Qs, D2)
179+
rabbit_amqqueue:deliver(Qs, D2),
180+
bump_routed_stats(ExName, Qs, State)
169181
end, Deliveries).
170182

171183
internal_delay_message(CurrTimer, Exchange, Delivery, Delay) ->
@@ -260,3 +272,45 @@ recover_exchange_and_bindings(#exchange{name = XName} = X) ->
260272
"recovered bindings for ~s",
261273
[rabbit_misc:rs(XName)])
262274
end).
275+
276+
%% These metrics are normally bumped from a channel process via which
277+
%% the publish actually happened. In the special case of delayed
278+
%% message delivery, the singleton delayed_message gen_server does
279+
%% this.
280+
%%
281+
%% Difference from delivering from a channel:
282+
%%
283+
%% The channel process keeps track of the state and monitors each
284+
%% queue it routed to. When the channel is notified of a queue DOWN,
285+
%% it marks all core metrics for that channel + queue as deleted.
286+
%% Monitoring all queues would be overkill for the delayed message
287+
%% gen_server, so this delete marking does not happen in this
288+
%% case. Still `rabbit_core_metrics_gc' will periodically scan all the
289+
%% core metrics and eventually delete entries for non-existing queues
290+
%% so there won't be any metrics leak. `rabbit_core_metrics_gc' will
291+
%% also delete the entries when this process is not alive ie when the
292+
%% plugin is disabled.
293+
bump_routed_stats(ExName, Qs, State) ->
294+
rabbit_global_counters:messages_routed(amqp091, length(Qs)),
295+
case rabbit_event:stats_level(State, #state.stats_state) of
296+
fine ->
297+
[begin
298+
QName = amqqueue:get_name(Q),
299+
%% Channel PID is just an identifier in the metrics
300+
%% DB. However core metrics GC will delete entries
301+
%% with a not-alive PID, and by the time the delayed
302+
%% message gets delivered the original channel
303+
%% process might be long gone, hence we need a live
304+
%% PID in the key.
305+
FakeChannelId = self(),
306+
Key = {FakeChannelId, {QName, ExName}},
307+
rabbit_core_metrics:channel_stats(queue_exchange_stats, publish, Key, 1)
308+
end
309+
|| Q <- Qs],
310+
ok;
311+
_ ->
312+
ok
313+
end.
314+
315+
refresh_config(State) ->
316+
rabbit_event:init_stats_timer(State, #state.stats_state).

test/plugin_SUITE.erl

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@
77

88
-module(plugin_SUITE).
99

10-
-compile(export_all).
10+
-compile([export_all, nowarn_export_all]).
1111

12-
-include_lib("common_test/include/ct.hrl").
1312
-include_lib("eunit/include/eunit.hrl").
1413
-include_lib("amqp_client/include/amqp_client.hrl").
1514

1615
all() ->
1716
[
18-
{group, non_parallel_tests}
17+
{group, non_parallel_tests},
18+
{group, fine_stats}
1919
].
2020

2121
groups() ->
@@ -33,7 +33,11 @@ groups() ->
3333
node_restart_before_delay_expires,
3434
node_restart_after_delay_expires,
3535
string_delay_header
36-
]}
36+
]},
37+
{fine_stats, [], [
38+
e2e_nodelay,
39+
e2e_delay
40+
]}
3741
].
3842

3943

@@ -55,16 +59,27 @@ end_per_suite(Config) ->
5559
rabbit_ct_client_helpers:teardown_steps() ++
5660
rabbit_ct_broker_helpers:teardown_steps()).
5761

62+
init_per_group(fine_stats, Config) ->
63+
CollectStatsOrig = get_collect_stats(Config),
64+
set_collect_stats(Config, fine),
65+
refresh_config(Config),
66+
[{collect_statistics, fine}, {collect_statistics_orig, CollectStatsOrig}|Config];
5867
init_per_group(_, Config) ->
5968
Config.
6069

70+
end_per_group(fine_stats, Config) ->
71+
CollectStatsOrig = rabbit_ct_helpers:get_config(Config, collect_statistics_orig),
72+
set_collect_stats(Config, CollectStatsOrig),
73+
refresh_config(Config),
74+
Config;
6175
end_per_group(_, Config) ->
6276
Config.
6377

6478
init_per_testcase(Testcase, Config) ->
6579
TestCaseName = rabbit_ct_helpers:config_to_testcase_name(Config, Testcase),
6680
BaseName = re:replace(TestCaseName, "/", "-", [global,{return,list}]),
6781
Config1 = rabbit_ct_helpers:set_config(Config, {test_resource_name, BaseName}),
82+
reset_publish_out_stats(Config),
6883
rabbit_ct_helpers:testcase_started(Config1, Testcase).
6984

7085
end_per_testcase(Testcase, Config) ->
@@ -169,13 +184,22 @@ e2e_test0(Config, Msgs) ->
169184
destination = Ex2
170185
}),
171186

187+
[] = get_publish_out_stat(Config),
172188

173189
publish_messages(Chan, Ex, Msgs),
174190

175191
{ok, Result} = consume(Chan, Q, Msgs),
176192
Sorted = lists:sort(Msgs),
177193
?assertEqual(Sorted, Result),
178194

195+
PublishOutCount = length(Msgs),
196+
case rabbit_ct_helpers:get_config(Config, collect_statistics, none) of
197+
fine ->
198+
?assertMatch([{_, PublishOutCount, _}], get_publish_out_stat(Config));
199+
_ ->
200+
?assertMatch([], get_publish_out_stat(Config))
201+
end,
202+
179203
amqp_channel:call(Chan, #'exchange.delete' { exchange = Ex }),
180204
amqp_channel:call(Chan, #'exchange.delete' { exchange = Ex2 }),
181205
amqp_channel:call(Chan, #'queue.delete' { queue = Q }),
@@ -434,3 +458,23 @@ make_queue_name(Config, Suffix) ->
434458
make_policy_name(Config, Suffix) ->
435459
B = rabbit_ct_helpers:get_config(Config, test_resource_name),
436460
erlang:list_to_binary("p-" ++ B ++ "-" ++ Suffix).
461+
462+
get_publish_out_stat(Config) ->
463+
rabbit_ct_broker_helpers:rpc(Config, 0, ets, tab2list, [channel_queue_exchange_metrics]).
464+
465+
reset_publish_out_stats(Config) ->
466+
rabbit_ct_broker_helpers:rpc(Config, 0, ets, delete_all_objects, [channel_queue_exchange_metrics]).
467+
468+
get_collect_stats(Config) ->
469+
rabbit_ct_broker_helpers:rpc(
470+
Config, 0, application, get_env, [rabbit, collect_statistics, undefined]).
471+
472+
set_collect_stats(Config, undefined) ->
473+
ok = rabbit_ct_broker_helpers:rpc(
474+
Config, 0, application, unset_env, [rabbit, collect_statistics]);
475+
set_collect_stats(Config, CollectStats) ->
476+
ok = rabbit_ct_broker_helpers:rpc(
477+
Config, 0, application, set_env, [rabbit, collect_statistics, CollectStats]).
478+
479+
refresh_config(Config) ->
480+
ok = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_delayed_message, refresh_config, []).

0 commit comments

Comments
 (0)