Skip to content

Commit 1f6d6d4

Browse files
Merge branch 'stable' into rabbitmq-server-1122
2 parents bf74a92 + b1ce3f1 commit 1f6d6d4

10 files changed

+176
-66
lines changed

rabbitmq-components.mk

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ dep_rabbitmq_public_umbrella = git_rmq rabbitmq-public-umbrella $(curre
104104

105105
dep_cowboy_commit = 1.0.4
106106
dep_mochiweb = git git://github.com/basho/mochiweb.git v2.9.0p2
107+
# Last commit of PropEr supporting Erlang R16B03.
108+
dep_proper_commit = 735d972758d8bd85b12483626fe1b66450d6a6fe
107109
dep_ranch_commit = 1.3.1
108110
dep_webmachine_commit = 1.10.8p2
109111

src/rabbit.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,7 @@ start_apps(Apps) ->
475475
prompt ->
476476
IoDevice = get_input_iodevice(),
477477
io:setopts(IoDevice, [{echo, false}]),
478-
PP = lists:droplast(io:get_line(IoDevice,
478+
PP = rabbit_misc:lists_droplast(io:get_line(IoDevice,
479479
"\nPlease enter the passphrase to unlock encrypted "
480480
"configuration entries.\n\nPassphrase: ")),
481481
io:setopts(IoDevice, [{echo, true}]),

src/rabbit_alarm.erl

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,12 @@ handle_call(_Request, State) ->
128128

129129
handle_event({set_alarm, {{resource_limit, Source, Node}, []}}, State) ->
130130
case is_node_alarmed(Source, Node, State) of
131-
true -> {ok, State};
132-
false -> handle_set_resource_alarm(Source, Node, State)
131+
true ->
132+
{ok, State};
133+
false ->
134+
rabbit_event:notify(alarm_set, [{source, Source},
135+
{node, Node}]),
136+
handle_set_resource_alarm(Source, Node, State)
133137
end;
134138
handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
135139
case lists:member(Alarm, Alarms) of
@@ -141,6 +145,8 @@ handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
141145
handle_event({clear_alarm, {resource_limit, Source, Node}}, State) ->
142146
case is_node_alarmed(Source, Node, State) of
143147
true ->
148+
rabbit_event:notify(alarm_cleared, [{source, Source},
149+
{node, Node}]),
144150
handle_clear_resource_alarm(Source, Node, State);
145151
false ->
146152
{ok, State}

src/rabbit_cli.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ mutually_exclusive_flags(CurrentOptionValues, Default, FlagsAndValues) ->
277277
{ok, Value};
278278
_ ->
279279
Names = [ [$', N, $'] || {N, _} <- PresentFlags ],
280-
CommaSeparated = string:join(lists:droplast(Names), ", "),
280+
CommaSeparated = string:join(rabbit_misc:lists_droplast(Names), ", "),
281281
AndOneMore = lists:last(Names),
282282
Msg = io_lib:format("Options ~s and ~s are mutually exclusive", [CommaSeparated, AndOneMore]),
283283
{error, lists:flatten(Msg)}

src/rabbit_core_metrics_gc.erl

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ gc_connections() ->
6464
gc_process(connection_coarse_metrics).
6565

6666
gc_channels() ->
67-
%% TODO channel stats
6867
gc_process(channel_created),
6968
gc_process(channel_metrics),
7069
gc_process(channel_process_metrics),
@@ -96,16 +95,17 @@ gc_gen_server2() ->
9695
gc_process(Table) ->
9796
ets:foldl(fun({Pid = Key, _}, none) ->
9897
gc_process(Pid, Table, Key);
98+
({Pid = Key, _, _, _, _}, none) ->
99+
gc_process(Pid, Table, Key);
99100
({Pid = Key, _, _, _}, none) ->
100101
gc_process(Pid, Table, Key)
101102
end, none, Table).
102103

103104
gc_process(Pid, Table, Key) ->
104-
case erlang:is_process_alive(Pid) of
105+
case rabbit_misc:is_process_alive(Pid) of
105106
true ->
106107
none;
107108
false ->
108-
%% TODO catch?
109109
ets:delete(Table, Key),
110110
none
111111
end.
@@ -115,6 +115,8 @@ gc_entity(Table, GbSet) ->
115115
gc_entity(Id, Table, Key, GbSet);
116116
({Id = Key, _}, none) ->
117117
gc_entity(Id, Table, Key, GbSet);
118+
({Id = Key, _, _}, none) ->
119+
gc_entity(Id, Table, Key, GbSet);
118120
({Id = Key, _, _, _, _}, none) ->
119121
gc_entity(Id, Table, Key, GbSet)
120122
end, none, Table).
@@ -124,40 +126,35 @@ gc_entity(Id, Table, Key, GbSet) ->
124126
true ->
125127
none;
126128
false ->
127-
%% TODO catch?
128129
ets:delete(Table, Key),
129130
none
130131
end.
131132

132133
gc_process_and_entity(Table, GbSet) ->
133-
ets:foldl(fun({{Pid, Id} = Key, _, _, _, _, _, _}, none)
134+
ets:foldl(fun({{Pid, Id} = Key, _, _, _, _, _, _, _}, none)
134135
when Table == channel_queue_metrics ->
135-
gc_entity(Id, Table, Key, GbSet),
136-
gc_process(Pid, Table, Key);
137-
({{Pid, Id} = Key, _, _, _}, none)
136+
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
137+
({{Pid, Id} = Key, _, _, _, _}, none)
138138
when Table == channel_exchange_metrics ->
139-
gc_entity(Id, Table, Key, GbSet),
140-
gc_process(Pid, Table, Key);
139+
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
141140
({{Id, Pid, _} = Key, _, _, _, _}, none)
142141
when Table == consumer_created ->
143-
gc_entity(Id, Table, Key, GbSet),
144-
gc_process(Pid, Table, Key);
142+
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
145143
({{{Pid, Id}, _} = Key, _, _, _, _}, none) ->
146144
gc_process_and_entity(Id, Pid, Table, Key, GbSet)
147145
end, none, Table).
148146

149147
gc_process_and_entity(Id, Pid, Table, Key, GbSet) ->
150-
case erlang:is_process_alive(Pid) orelse gb_sets:is_member(Id, GbSet) of
148+
case rabbit_misc:is_process_alive(Pid) andalso gb_sets:is_member(Id, GbSet) of
151149
true ->
152150
none;
153151
false ->
154-
%% TODO catch?
155152
ets:delete(Table, Key),
156153
none
157154
end.
158155

159156
gc_process_and_entities(Table, QueueGbSet, ExchangeGbSet) ->
160-
ets:foldl(fun({{Pid, {Q, X}} = Key, _}, none) ->
157+
ets:foldl(fun({{Pid, {Q, X}} = Key, _, _}, none) ->
161158
gc_process(Pid, Table, Key),
162159
gc_entity(Q, Table, Key, QueueGbSet),
163160
gc_entity(X, Table, Key, ExchangeGbSet)

src/rabbit_queue_index.erl

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -176,13 +176,40 @@
176176

177177
%%----------------------------------------------------------------------------
178178

179-
-record(qistate, {dir, segments, journal_handle, dirty_count,
180-
max_journal_entries, on_sync, on_sync_msg,
181-
unconfirmed, unconfirmed_msg,
182-
pre_publish_cache, delivered_cache}).
183-
184-
-record(segment, {num, path, journal_entries,
185-
entries_to_segment, unacked}).
179+
-record(qistate, {
180+
%% queue directory where segment and journal files are stored
181+
dir,
182+
%% map of #segment records
183+
segments,
184+
%% journal file handle obtained from/used by file_handle_cache
185+
journal_handle,
186+
%% how many not yet flushed entries are there
187+
dirty_count,
188+
%% this many not yet flushed journal entries will force a flush
189+
max_journal_entries,
190+
%% callback function invoked when a message is "handled"
191+
%% by the index and potentially can be confirmed to the publisher
192+
on_sync,
193+
on_sync_msg,
194+
%% set of IDs of unconfirmed [to publishers] messages
195+
unconfirmed,
196+
unconfirmed_msg,
197+
%% optimisation
198+
pre_publish_cache,
199+
%% optimisation
200+
delivered_cache}).
201+
202+
-record(segment, {
203+
%% segment ID (an integer)
204+
num,
205+
%% segment file path (see also ?SEGMENT_EXTENSION)
206+
path,
207+
%% index operation log entries in this segment
208+
journal_entries,
209+
entries_to_segment,
210+
%% counter of unacknowledged messages
211+
unacked
212+
}).
186213

187214
-include("rabbit.hrl").
188215

test/metrics_SUITE.erl

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
-include_lib("proper/include/proper.hrl").
2121
-include_lib("eunit/include/eunit.hrl").
2222
-include_lib("amqp_client/include/amqp_client.hrl").
23+
-include_lib("rabbit_common/include/rabbit_core_metrics.hrl").
2324

2425

2526
all() ->
@@ -75,6 +76,7 @@ end_per_group(_, Config) ->
7576
Config.
7677

7778
init_per_testcase(Testcase, Config) ->
79+
clean_core_metrics(Config),
7880
rabbit_ct_helpers:testcase_started(Config, Testcase).
7981

8082
end_per_testcase(Testcase, Config) ->
@@ -144,7 +146,7 @@ connection_metric_idemp(Config, {N, R}) ->
144146
Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_coarse_metrics)],
145147
% referesh stats 'R' times
146148
[[Pid ! emit_stats || Pid <- Table] || _ <- lists:seq(1, R)],
147-
timer:sleep(100),
149+
force_metric_gc(Config),
148150
TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_metrics)],
149151
TableAfter2 = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_coarse_metrics)],
150152
[rabbit_ct_client_helpers:close_connection(Conn) || Conn <- Conns],
@@ -158,14 +160,15 @@ channel_metric_idemp(Config, {N, R}) ->
158160
Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_process_metrics)],
159161
% referesh stats 'R' times
160162
[[Pid ! emit_stats || Pid <- Table] || _ <- lists:seq(1, R)],
161-
timer:sleep(100),
163+
force_metric_gc(Config),
162164
TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_metrics)],
163165
TableAfter2 = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_process_metrics)],
164166
rabbit_ct_client_helpers:close_connection(Conn),
165167
(Table2 == TableAfter2) and (Table == TableAfter) and
166168
(N == length(Table)) and (N == length(TableAfter)).
167169

168170
queue_metric_idemp(Config, {N, R}) ->
171+
clean_core_metrics(Config),
169172
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
170173
{ok, Chan} = amqp_connection:open_channel(Conn),
171174
Queues =
@@ -175,14 +178,15 @@ queue_metric_idemp(Config, {N, R}) ->
175178
ensure_channel_queue_metrics_populated(Chan, Queue),
176179
Queue
177180
end || _ <- lists:seq(1, N)],
178-
Table = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_metrics)],
179-
Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_coarse_metrics)],
181+
182+
Table = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_metrics)],
183+
Table2 = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_coarse_metrics)],
180184
% referesh stats 'R' times
181185
ChanTable = read_table_rpc(Config, channel_created),
182-
[[Pid ! emit_stats || {Pid, _} <- ChanTable ] || _ <- lists:seq(1, R)],
183-
timer:sleep(100),
184-
TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_metrics)],
185-
TableAfter2 = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_coarse_metrics)],
186+
[[Pid ! emit_stats || {Pid, _, _} <- ChanTable ] || _ <- lists:seq(1, R)],
187+
force_metric_gc(Config),
188+
TableAfter = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_metrics)],
189+
TableAfter2 = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_coarse_metrics)],
186190
[ delete_queue(Chan, Q) || Q <- Queues],
187191
rabbit_ct_client_helpers:close_connection(Conn),
188192
(Table2 == TableAfter2) and (Table == TableAfter) and
@@ -191,7 +195,9 @@ queue_metric_idemp(Config, {N, R}) ->
191195
connection_metric_count(Config, Ops) ->
192196
add_rem_counter(Config, Ops,
193197
{fun rabbit_ct_client_helpers:open_unmanaged_connection/1,
194-
fun rabbit_ct_client_helpers:close_connection/1},
198+
fun(Cfg) ->
199+
rabbit_ct_client_helpers:close_connection(Cfg)
200+
end},
195201
[ connection_created,
196202
connection_metrics,
197203
connection_coarse_metrics ]).
@@ -222,9 +228,10 @@ queue_metric_count(Config, Ops) ->
222228
end,
223229
Result = add_rem_counter(Config, Ops,
224230
{AddFun,
225-
fun (Q) -> delete_queue(Chan, Q) end},
226-
[ channel_queue_metrics,
227-
channel_queue_exchange_metrics ]),
231+
fun (Q) -> delete_queue(Chan, Q),
232+
force_metric_gc(Config)
233+
end}, [channel_queue_metrics,
234+
channel_queue_exchange_metrics ]),
228235
ok = rabbit_ct_client_helpers:close_connection(Conn),
229236
Result.
230237

@@ -240,7 +247,10 @@ queue_metric_count_channel_per_queue(Config, Ops) ->
240247
end,
241248
Result = add_rem_counter(Config, Ops,
242249
{AddFun,
243-
fun ({Chan, Q}) -> delete_queue(Chan, Q) end},
250+
fun ({Chan, Q}) ->
251+
delete_queue(Chan, Q),
252+
force_metric_gc(Config)
253+
end},
244254
[ channel_queue_metrics,
245255
channel_queue_exchange_metrics ]),
246256
ok = rabbit_ct_client_helpers:close_connection(Conn),
@@ -258,8 +268,10 @@ add_rem_counter(Config, {Initial, Ops}, {AddFun, RemFun}, Tables) ->
258268
(_, S) -> S end,
259269
{Initial, Things},
260270
Ops),
271+
force_metric_gc(Config),
261272
TabLens = lists:map(fun(T) ->
262-
length(read_table_rpc(Config, T)) end, Tables),
273+
length(read_table_rpc(Config, T))
274+
end, Tables),
263275
[RemFun(Thing) || Thing <- Things1],
264276
[FinalLen] == lists:usort(TabLens).
265277

@@ -270,9 +282,11 @@ connection(Config) ->
270282
[_] = read_table_rpc(Config, connection_metrics),
271283
[_] = read_table_rpc(Config, connection_coarse_metrics),
272284
ok = rabbit_ct_client_helpers:close_connection(Conn),
285+
force_metric_gc(Config),
273286
[] = read_table_rpc(Config, connection_created),
274287
[] = read_table_rpc(Config, connection_metrics),
275-
[] = read_table_rpc(Config, connection_coarse_metrics).
288+
[] = read_table_rpc(Config, connection_coarse_metrics),
289+
ok.
276290

277291
channel(Config) ->
278292
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
@@ -308,10 +322,12 @@ channel_queue_delete_queue(Config) ->
308322
[_] = read_table_rpc(Config, channel_queue_exchange_metrics),
309323

310324
delete_queue(Chan, Queue),
325+
force_metric_gc(Config),
311326
% ensure removal of queue cleans up channel_queue metrics
312327
[] = read_table_rpc(Config, channel_queue_exchange_metrics),
313328
[] = read_table_rpc(Config, channel_queue_metrics),
314-
ok = rabbit_ct_client_helpers:close_connection(Conn).
329+
ok = rabbit_ct_client_helpers:close_connection(Conn),
330+
ok.
315331

316332
channel_queue_exchange_consumer_close_connection(Config) ->
317333
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
@@ -335,10 +351,13 @@ channel_queue_exchange_consumer_close_connection(Config) ->
335351

336352
ok = rabbit_ct_client_helpers:close_connection(Conn),
337353
% ensure cleanup happened
354+
force_metric_gc(Config),
338355
[] = read_table_rpc(Config, channel_exchange_metrics),
339356
[] = read_table_rpc(Config, channel_queue_exchange_metrics),
340357
[] = read_table_rpc(Config, channel_queue_metrics),
341-
[] = read_table_rpc(Config, consumer_created).
358+
[] = read_table_rpc(Config, consumer_created),
359+
ok.
360+
342361

343362

344363
%% -------------------------------------------------------------------
@@ -371,6 +390,16 @@ force_channel_stats(Config) ->
371390
read_table_rpc(Config, Table) ->
372391
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, read_table, [Table]).
373392

393+
clean_core_metrics(Config) ->
394+
[ rabbit_ct_broker_helpers:rpc(Config, 0, ets, delete_all_objects, [Table])
395+
|| {Table, _} <- ?CORE_TABLES].
396+
374397
read_table(Table) ->
375398
ets:tab2list(Table).
376399

400+
force_metric_gc(Config) ->
401+
timer:sleep(300),
402+
rabbit_ct_broker_helpers:rpc(Config, 0, erlang, send,
403+
[rabbit_core_metrics_gc, start_gc]),
404+
rabbit_ct_broker_helpers:rpc(Config, 0, gen_server, call,
405+
[rabbit_core_metrics_gc, test]).

test/partitions_SUITE.erl

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
%% The Original Code is RabbitMQ.
1212
%%
1313
%% The Initial Developer of the Original Code is GoPivotal, Inc.
14-
%% Copyright (c) 2011-2016 Pivotal Software, Inc. All rights reserved.
14+
%% Copyright (c) 2011-2017 Pivotal Software, Inc. All rights reserved.
1515
%%
1616

1717
-module(partitions_SUITE).
@@ -21,10 +21,6 @@
2121

2222
-compile(export_all).
2323

24-
-import(rabbit_ct_broker_helpers, [enable_dist_proxy_manager/1,
25-
enable_dist_proxy/1,
26-
enable_dist_proxy_on_node/3]).
27-
2824
%% We set ticktime to 1s and setuptime is 7s so to make sure it
2925
%% passes...
3026
-define(DELAY, 8000).
@@ -33,6 +29,9 @@
3329
%% It's a lot, but still better than timetrap_timeout
3430
-define(AWAIT_TIMEOUT, 300000).
3531

32+
suite() ->
33+
[{timetrap, 5 * 60000}].
34+
3635
all() ->
3736
[
3837
{group, net_ticktime_1},
@@ -49,8 +48,8 @@ groups() ->
4948
{cluster_size_3, [], [
5049
autoheal,
5150
autoheal_after_pause_if_all_down,
52-
autoheal_multiple_partial_partitions,
53-
autoheal_unexpected_finish,
51+
autoheal_multiple_partial_partitions,
52+
autoheal_unexpected_finish,
5453
ignore,
5554
pause_if_all_down_on_blocked,
5655
pause_if_all_down_on_down,

0 commit comments

Comments
 (0)