Skip to content

Commit f22376e

Browse files
Merge branch 'master' into rabbitmq-server-1143
2 parents 8ce0f08 + c4519dd commit f22376e

File tree

5 files changed

+112
-51
lines changed

5 files changed

+112
-51
lines changed

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,9 @@ define PROJECT_ENV
115115
{background_gc_enabled, false},
116116
{background_gc_target_interval, 60000},
117117
%% rabbitmq-server-589
118-
{proxy_protocol, false}
118+
{proxy_protocol, false},
119+
{disk_monitor_failure_retries, 10},
120+
{disk_monitor_failure_retry_interval, 120000}
119121
]
120122
endef
121123

src/rabbit.erl

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -485,16 +485,21 @@ stop_and_halt() ->
485485
rabbit_log:error("Error trying to stop RabbitMQ: ~p:~p", [Type, Reason]),
486486
error({Type, Reason})
487487
after
488-
AppsLeft = [ A || {A, _, _} <- application:which_applications() ],
489-
rabbit_log:info(
490-
lists:flatten(["Halting Erlang VM with the following applications:~n",
491-
[" ~p~n" || _ <- AppsLeft]]),
492-
AppsLeft),
493-
%% Also duplicate this information to stderr, so console where
494-
%% foreground broker was running (or systemd journal) will
495-
%% contain information about graceful termination.
496-
io:format(standard_error, "Gracefully halting Erlang VM~n", []),
497-
init:stop()
488+
%% Enclose all the logging in the try block.
489+
%% init:stop() will be called regardless of any errors.
490+
try
491+
AppsLeft = [ A || {A, _, _} <- application:which_applications() ],
492+
rabbit_log:info(
493+
lists:flatten(["Halting Erlang VM with the following applications:~n",
494+
[" ~p~n" || _ <- AppsLeft]]),
495+
AppsLeft),
496+
%% Also duplicate this information to stderr, so console where
497+
%% foreground broker was running (or systemd journal) will
498+
%% contain information about graceful termination.
499+
io:format(standard_error, "Gracefully halting Erlang VM~n", [])
500+
after
501+
init:stop()
502+
end
498503
end,
499504
ok.
500505

src/rabbit_disk_monitor.erl

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,12 @@
6565
alarmed,
6666
%% is monitoring enabled? false on unsupported
6767
%% platforms
68-
enabled
68+
enabled,
69+
%% number of retries to enable monitoring if it fails
70+
%% on start-up
71+
retries,
72+
%% Interval between retries
73+
interval
6974
}).
7075

7176
%%----------------------------------------------------------------------------
@@ -114,20 +119,17 @@ start_link(Args) ->
114119

115120
init([Limit]) ->
116121
Dir = dir(),
122+
{ok, Retries} = application:get_env(rabbit, disk_monitor_failure_retries),
123+
{ok, Interval} = application:get_env(rabbit, disk_monitor_failure_retry_interval),
117124
State = #state{dir = Dir,
118125
min_interval = ?DEFAULT_MIN_DISK_CHECK_INTERVAL,
119126
max_interval = ?DEFAULT_MAX_DISK_CHECK_INTERVAL,
120127
alarmed = false,
121-
enabled = true},
122-
case {catch get_disk_free(Dir),
123-
vm_memory_monitor:get_total_memory()} of
124-
{N1, N2} when is_integer(N1), is_integer(N2) ->
125-
{ok, start_timer(set_disk_limits(State, Limit))};
126-
Err ->
127-
rabbit_log:info("Disabling disk free space monitoring "
128-
"on unsupported platform:~n~p~n", [Err]),
129-
{ok, State#state{enabled = false}}
130-
end.
128+
enabled = true,
129+
limit = Limit,
130+
retries = Retries,
131+
interval = Interval},
132+
{ok, enable(State)}.
131133

132134
handle_call(get_disk_free_limit, _From, State = #state{limit = Limit}) ->
133135
{reply, Limit, State};
@@ -161,6 +163,8 @@ handle_call(_Request, _From, State) ->
161163
handle_cast(_Request, State) ->
162164
{noreply, State}.
163165

166+
handle_info(try_enable, #state{retries = Retries} = State) ->
167+
{noreply, enable(State#state{retries = Retries - 1})};
164168
handle_info(update, State) ->
165169
{noreply, start_timer(internal_update(State))};
166170

@@ -246,7 +250,7 @@ interpret_limit(Absolute) ->
246250

247251
emit_update_info(StateStr, CurrentFree, Limit) ->
248252
rabbit_log:info(
249-
"Disk free space ~s. Free bytes:~p Limit:~p~n",
253+
"Free disk space is ~s. Free bytes: ~p. Limit: ~p~n",
250254
[StateStr, CurrentFree, Limit]).
251255

252256
start_timer(State) ->
@@ -261,3 +265,20 @@ interval(#state{limit = Limit,
261265
max_interval = MaxInterval}) ->
262266
IdealInterval = 2 * (Actual - Limit) / ?FAST_RATE,
263267
trunc(erlang:max(MinInterval, erlang:min(MaxInterval, IdealInterval))).
268+
269+
enable(#state{retries = 0} = State) ->
270+
State;
271+
enable(#state{dir = Dir, interval = Interval, limit = Limit, retries = Retries}
272+
= State) ->
273+
case {catch get_disk_free(Dir),
274+
vm_memory_monitor:get_total_memory()} of
275+
{N1, N2} when is_integer(N1), is_integer(N2) ->
276+
rabbit_log:info("Enabling free disk space monitoring~n", []),
277+
start_timer(set_disk_limits(State, Limit));
278+
Err ->
279+
rabbit_log:info("Free disk space monitor encountered an error "
280+
"(e.g. failed to parse output from OS tools): ~p, retries left: ~s~n",
281+
[Err, Retries]),
282+
timer:send_after(Interval, self(), try_enable),
283+
State#state{enabled = false}
284+
end.

src/rabbit_msg_store.erl

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@
155155
-type client_msstate() :: #client_msstate {
156156
server :: server(),
157157
client_ref :: client_ref(),
158-
file_handle_cache :: dict:dict(),
158+
file_handle_cache :: map(),
159159
index_state :: any(),
160160
index_module :: atom(),
161161
dir :: file:filename(),
@@ -495,7 +495,7 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server); is_atom
495495
?CREDIT_DISC_BOUND),
496496
#client_msstate { server = Server,
497497
client_ref = Ref,
498-
file_handle_cache = dict:new(),
498+
file_handle_cache = #{},
499499
index_state = IState,
500500
index_module = IModule,
501501
dir = Dir,
@@ -703,7 +703,7 @@ client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts,
703703

704704
clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
705705
dying_clients = DyingClients }) ->
706-
State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM),
706+
State #msstate { cref_to_msg_ids = maps:remove(CRef, CTM),
707707
dying_clients = maps:remove(CRef, DyingClients) }.
708708

709709

@@ -740,7 +740,7 @@ init([Type, BaseDir, ClientRefs, StartupFunState]) ->
740740
{CleanShutdown, IndexState, ClientRefs1} =
741741
recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
742742
ClientRefs, Dir, Name),
743-
Clients = dict:from_list(
743+
Clients = maps:from_list(
744744
[{CRef, {undefined, undefined, undefined}} ||
745745
CRef <- ClientRefs1]),
746746
%% CleanShutdown => msg location index and file_summary both
@@ -776,7 +776,7 @@ init([Type, BaseDir, ClientRefs, StartupFunState]) ->
776776
index_state = IndexState,
777777
current_file = 0,
778778
current_file_handle = undefined,
779-
file_handle_cache = dict:new(),
779+
file_handle_cache = #{},
780780
sync_timer_ref = undefined,
781781
sum_valid_data = 0,
782782
sum_file_size = 0,
@@ -790,7 +790,7 @@ init([Type, BaseDir, ClientRefs, StartupFunState]) ->
790790
clients = Clients,
791791
successfully_recovered = CleanShutdown,
792792
file_size_limit = FileSizeLimit,
793-
cref_to_msg_ids = dict:new(),
793+
cref_to_msg_ids = #{},
794794
credit_disc_bound = CreditDiscBound
795795
},
796796
%% If we didn't recover the msg location index then we need to
@@ -843,7 +843,7 @@ handle_call({new_client_state, CRef, CPid, MsgOnDiskFun, CloseFDsFun}, _From,
843843
flying_ets = FlyingEts,
844844
clients = Clients,
845845
gc_pid = GCPid }) ->
846-
Clients1 = dict:store(CRef, {CPid, MsgOnDiskFun, CloseFDsFun}, Clients),
846+
Clients1 = maps:put(CRef, {CPid, MsgOnDiskFun, CloseFDsFun}, Clients),
847847
erlang:monitor(process, CPid),
848848
reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
849849
CurFileCacheEts, FlyingEts},
@@ -874,15 +874,15 @@ handle_cast({client_dying, CRef},
874874

875875
handle_cast({client_delete, CRef},
876876
State = #msstate { clients = Clients }) ->
877-
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
877+
State1 = State #msstate { clients = maps:remove(CRef, Clients) },
878878
noreply(clear_client(CRef, State1));
879879

880880
handle_cast({write, CRef, MsgId, Flow},
881881
State = #msstate { cur_file_cache_ets = CurFileCacheEts,
882882
clients = Clients,
883883
credit_disc_bound = CreditDiscBound }) ->
884884
case Flow of
885-
flow -> {CPid, _, _} = dict:fetch(CRef, Clients),
885+
flow -> {CPid, _, _} = maps:get(CRef, Clients),
886886
%% We are going to process a message sent by the
887887
%% rabbit_amqqueue_process. Now we are accessing the
888888
%% msg_store process dictionary.
@@ -1003,7 +1003,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
10031003
[true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts,
10041004
CurFileCacheEts, FlyingEts]],
10051005
IndexModule:terminate(IndexState),
1006-
case store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
1006+
case store_recovery_terms([{client_refs, maps:keys(Clients)},
10071007
{index_module, IndexModule}], Dir) of
10081008
ok ->
10091009
rabbit_log:info("Message store for directory '~s' is stopped", [Dir]),
@@ -1035,12 +1035,12 @@ reply(Reply, State) ->
10351035

10361036
next_state(State = #msstate { sync_timer_ref = undefined,
10371037
cref_to_msg_ids = CTM }) ->
1038-
case dict:size(CTM) of
1038+
case maps:size(CTM) of
10391039
0 -> {State, hibernate};
10401040
_ -> {start_sync_timer(State), 0}
10411041
end;
10421042
next_state(State = #msstate { cref_to_msg_ids = CTM }) ->
1043-
case dict:size(CTM) of
1043+
case maps:size(CTM) of
10441044
0 -> {stop_sync_timer(State), hibernate};
10451045
_ -> {State, 0}
10461046
end.
@@ -1055,7 +1055,7 @@ stop_sync_timer(State) ->
10551055
internal_sync(State = #msstate { current_file_handle = CurHdl,
10561056
cref_to_msg_ids = CTM }) ->
10571057
State1 = stop_sync_timer(State),
1058-
CGs = dict:fold(fun (CRef, MsgIds, NS) ->
1058+
CGs = maps:fold(fun (CRef, MsgIds, NS) ->
10591059
case gb_sets:is_empty(MsgIds) of
10601060
true -> NS;
10611061
false -> [{CRef, MsgIds} | NS]
@@ -1327,7 +1327,7 @@ orddict_store(Key, Val, Dict) ->
13271327
update_pending_confirms(Fun, CRef,
13281328
State = #msstate { clients = Clients,
13291329
cref_to_msg_ids = CTM }) ->
1330-
case dict:fetch(CRef, Clients) of
1330+
case maps:get(CRef, Clients) of
13311331
{_CPid, undefined, _CloseFDsFun} -> State;
13321332
{_CPid, MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM),
13331333
State #msstate {
@@ -1337,21 +1337,22 @@ update_pending_confirms(Fun, CRef,
13371337
record_pending_confirm(CRef, MsgId, State) ->
13381338
update_pending_confirms(
13391339
fun (_MsgOnDiskFun, CTM) ->
1340-
dict:update(CRef, fun (MsgIds) -> gb_sets:add(MsgId, MsgIds) end,
1341-
gb_sets:singleton(MsgId), CTM)
1340+
maps:update_with(CRef,
1341+
fun (MsgIds) -> gb_sets:add(MsgId, MsgIds) end,
1342+
gb_sets:singleton(MsgId), CTM)
13421343
end, CRef, State).
13431344

13441345
client_confirm(CRef, MsgIds, ActionTaken, State) ->
13451346
update_pending_confirms(
13461347
fun (MsgOnDiskFun, CTM) ->
1347-
case dict:find(CRef, CTM) of
1348+
case maps:find(CRef, CTM) of
13481349
{ok, Gs} -> MsgOnDiskFun(gb_sets:intersection(Gs, MsgIds),
13491350
ActionTaken),
13501351
MsgIds1 = rabbit_misc:gb_sets_difference(
13511352
Gs, MsgIds),
13521353
case gb_sets:is_empty(MsgIds1) of
1353-
true -> dict:erase(CRef, CTM);
1354-
false -> dict:store(CRef, MsgIds1, CTM)
1354+
true -> maps:remove(CRef, CTM);
1355+
false -> maps:put(CRef, MsgIds1, CTM)
13551356
end;
13561357
error -> CTM
13571358
end
@@ -1402,9 +1403,9 @@ close_handle(Key, State = #msstate { file_handle_cache = FHC }) ->
14021403
State #msstate { file_handle_cache = close_handle(Key, FHC) };
14031404

14041405
close_handle(Key, FHC) ->
1405-
case dict:find(Key, FHC) of
1406+
case maps:find(Key, FHC) of
14061407
{ok, Hdl} -> ok = file_handle_cache:close(Hdl),
1407-
dict:erase(Key, FHC);
1408+
maps:remove(Key, FHC);
14081409
error -> FHC
14091410
end.
14101411

@@ -1419,7 +1420,7 @@ mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) ->
14191420
[ begin
14201421
case (ets:update_element(FileHandlesEts, Key, {2, close})
14211422
andalso Invoke) of
1422-
true -> case dict:fetch(Ref, ClientRefs) of
1423+
true -> case maps:get(Ref, ClientRefs) of
14231424
{_CPid, _MsgOnDiskFun, undefined} ->
14241425
ok;
14251426
{_CPid, _MsgOnDiskFun, CloseFDsFun} ->
@@ -1456,16 +1457,16 @@ close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts,
14561457
close_all_handles(CState = #client_msstate { file_handles_ets = FileHandlesEts,
14571458
file_handle_cache = FHC,
14581459
client_ref = Ref }) ->
1459-
ok = dict:fold(fun (File, Hdl, ok) ->
1460+
ok = maps:fold(fun (File, Hdl, ok) ->
14601461
true = ets:delete(FileHandlesEts, {Ref, File}),
14611462
file_handle_cache:close(Hdl)
14621463
end, ok, FHC),
1463-
CState #client_msstate { file_handle_cache = dict:new() };
1464+
CState #client_msstate { file_handle_cache = #{} };
14641465

14651466
close_all_handles(State = #msstate { file_handle_cache = FHC }) ->
1466-
ok = dict:fold(fun (_Key, Hdl, ok) -> file_handle_cache:close(Hdl) end,
1467+
ok = maps:fold(fun (_Key, Hdl, ok) -> file_handle_cache:close(Hdl) end,
14671468
ok, FHC),
1468-
State #msstate { file_handle_cache = dict:new() }.
1469+
State #msstate { file_handle_cache = #{} }.
14691470

14701471
get_read_handle(FileNum, CState = #client_msstate { file_handle_cache = FHC,
14711472
dir = Dir }) ->
@@ -1478,11 +1479,11 @@ get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC,
14781479
{Hdl, State #msstate { file_handle_cache = FHC2 }}.
14791480

14801481
get_read_handle(FileNum, FHC, Dir) ->
1481-
case dict:find(FileNum, FHC) of
1482+
case maps:find(FileNum, FHC) of
14821483
{ok, Hdl} -> {Hdl, FHC};
14831484
error -> {ok, Hdl} = open_file(Dir, filenum_to_name(FileNum),
14841485
?READ_MODE),
1485-
{Hdl, dict:store(FileNum, Hdl, FHC)}
1486+
{Hdl, maps:put(FileNum, Hdl, FHC)}
14861487
end.
14871488

14881489
preallocate(Hdl, FileSizeLimit, FinalPos) ->

test/unit_inbroker_non_parallel_SUITE.erl

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ groups() ->
3535
app_management, %% Restart RabbitMQ.
3636
channel_statistics, %% Expect specific statistics.
3737
disk_monitor, %% Replace rabbit_misc module.
38+
disk_monitor_enable,
3839
file_handle_cache, %% Change FHC limit.
3940
head_message_timestamp_statistics, %% Expect specific statistics.
4041
log_management, %% Check log files.
@@ -631,6 +632,37 @@ disk_monitor1(_Config) ->
631632
meck:unload(rabbit_misc),
632633
passed.
633634

635+
disk_monitor_enable(Config) ->
636+
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
637+
?MODULE, disk_monitor_enable1, [Config]).
638+
639+
disk_monitor_enable1(_Config) ->
640+
case os:type() of
641+
{unix, _} ->
642+
disk_monitor_enable1();
643+
_ ->
644+
%% skip windows testing
645+
skipped
646+
end.
647+
648+
disk_monitor_enable1() ->
649+
ok = meck:new(rabbit_misc, [passthrough]),
650+
ok = meck:expect(rabbit_misc, os_cmd, fun(_) -> "\n" end),
651+
application:set_env(rabbit, disk_monitor_failure_retries, 20000),
652+
application:set_env(rabbit, disk_monitor_failure_retry_interval, 100),
653+
ok = rabbit_sup:stop_child(rabbit_disk_monitor_sup),
654+
ok = rabbit_sup:start_delayed_restartable_child(rabbit_disk_monitor, [1000]),
655+
undefined = rabbit_disk_monitor:get_disk_free(),
656+
Cmd = "Filesystem 1024-blocks Used Available Capacity iused ifree %iused Mounted on\n/dev/disk1 975798272 234783364 740758908 25% 58759839 185189727 24% /\n",
657+
ok = meck:expect(rabbit_misc, os_cmd, fun(_) -> Cmd end),
658+
timer:sleep(1000),
659+
Bytes = 740758908 * 1024,
660+
Bytes = rabbit_disk_monitor:get_disk_free(),
661+
meck:unload(rabbit_misc),
662+
application:set_env(rabbit, disk_monitor_failure_retries, 10),
663+
application:set_env(rabbit, disk_monitor_failure_retry_interval, 120000),
664+
passed.
665+
634666
%% ---------------------------------------------------------------------------
635667
%% rabbitmqctl helpers.
636668
%% ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)