Skip to content

Commit 0bb59f8

Browse files
Merge branch 'master' into rabbitmq-server-1171
2 parents a3e6909 + 261efd9 commit 0bb59f8

11 files changed

+152
-70
lines changed

src/rabbit.erl

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
stop_and_halt/0, await_startup/0, status/0, is_running/0, alarms/0,
2323
is_running/1, environment/0, rotate_logs/0, force_event_refresh/1,
2424
start_fhc/0]).
25-
-export([start/2, stop/1]).
25+
-export([start/2, stop/1, prep_stop/1]).
2626
-export([start_apps/1, stop_apps/1]).
2727
-export([log_locations/0, config_files/0, decrypt_config/2]). %% for testing and mgmt-agent
2828

@@ -327,7 +327,9 @@ broker_start() ->
327327
ToBeLoaded = Plugins ++ ?APPS,
328328
start_apps(ToBeLoaded),
329329
maybe_sd_notify(),
330-
ok = log_broker_started(rabbit_plugins:strictly_plugins(rabbit_plugins:active())).
330+
ok = log_broker_started(rabbit_plugins:strictly_plugins(rabbit_plugins:active())),
331+
rabbit_peer_discovery:maybe_register(),
332+
ok.
331333

332334
%% Try to send systemd ready notification if it makes sense in the
333335
%% current environment. standard_error is used intentionally in all
@@ -471,6 +473,8 @@ stop() ->
471473
end,
472474
rabbit_log:info("RabbitMQ is asked to stop...~n", []),
473475
Apps = ?APPS ++ rabbit_plugins:active(),
476+
%% this will also perform unregistration with the peer discovery backend
477+
%% as needed
474478
stop_apps(app_utils:app_dependency_order(Apps, true)),
475479
rabbit_log:info("Successfully stopped RabbitMQ and its dependencies~n", []).
476480

@@ -759,6 +763,10 @@ start(normal, []) ->
759763
Error
760764
end.
761765

766+
prep_stop(State) ->
767+
rabbit_peer_discovery:maybe_unregister(),
768+
State.
769+
762770
stop(_State) ->
763771
ok = rabbit_alarm:stop(),
764772
ok = case rabbit_mnesia:is_clustered() of

src/rabbit_amqqueue_process.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@
9999
-spec info_keys() -> rabbit_types:info_keys().
100100
-spec init_with_backing_queue_state
101101
(rabbit_types:amqqueue(), atom(), tuple(), any(),
102-
[rabbit_types:delivery()], pmon:pmon(), dict:dict()) ->
102+
[rabbit_types:delivery()], pmon:pmon(), gb_trees:tree()) ->
103103
#q{}.
104104

105105
%%----------------------------------------------------------------------------

src/rabbit_mirror_queue_master.erl

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,13 @@
5757
coordinator :: pid(),
5858
backing_queue :: atom(),
5959
backing_queue_state :: any(),
60-
seen_status :: dict:dict(),
60+
seen_status :: map(),
6161
confirmed :: [rabbit_guid:guid()],
6262
known_senders :: sets:set()
6363
}.
6464
-spec promote_backing_queue_state
6565
(rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()],
66-
dict:dict(), [pid()]) ->
66+
map(), [pid()]) ->
6767
master_state().
6868

6969
-spec sender_death_fun() -> death_fun().
@@ -127,7 +127,7 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
127127
coordinator = CPid,
128128
backing_queue = BQ,
129129
backing_queue_state = BQS,
130-
seen_status = dict:new(),
130+
seen_status = #{},
131131
confirmed = [],
132132
known_senders = sets:new(),
133133
wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) };
@@ -266,7 +266,7 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow,
266266
seen_status = SS,
267267
backing_queue = BQ,
268268
backing_queue_state = BQS }) ->
269-
false = dict:is_key(MsgId, SS), %% ASSERTION
269+
false = maps:is_key(MsgId, SS), %% ASSERTION
270270
ok = gm:broadcast(GM, {publish, ChPid, Flow, MsgProps, Msg},
271271
rabbit_basic:msg_size(Msg)),
272272
BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS),
@@ -281,7 +281,7 @@ batch_publish(Publishes, ChPid, Flow,
281281
lists:foldl(fun ({Msg = #basic_message { id = MsgId },
282282
MsgProps, _IsDelivered}, {Pubs, false, Sizes}) ->
283283
{[{Msg, MsgProps, true} | Pubs], %% [0]
284-
false = dict:is_key(MsgId, SS), %% ASSERTION
284+
false = maps:is_key(MsgId, SS), %% ASSERTION
285285
Sizes + rabbit_basic:msg_size(Msg)}
286286
end, {[], false, 0}, Publishes),
287287
Publishes2 = lists:reverse(Publishes1),
@@ -298,7 +298,7 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
298298
seen_status = SS,
299299
backing_queue = BQ,
300300
backing_queue_state = BQS }) ->
301-
false = dict:is_key(MsgId, SS), %% ASSERTION
301+
false = maps:is_key(MsgId, SS), %% ASSERTION
302302
ok = gm:broadcast(GM, {publish_delivered, ChPid, Flow, MsgProps, Msg},
303303
rabbit_basic:msg_size(Msg)),
304304
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS),
@@ -313,7 +313,7 @@ batch_publish_delivered(Publishes, ChPid, Flow,
313313
{false, MsgSizes} =
314314
lists:foldl(fun ({Msg = #basic_message { id = MsgId }, _MsgProps},
315315
{false, Sizes}) ->
316-
{false = dict:is_key(MsgId, SS), %% ASSERTION
316+
{false = maps:is_key(MsgId, SS), %% ASSERTION
317317
Sizes + rabbit_basic:msg_size(Msg)}
318318
end, {false, 0}, Publishes),
319319
ok = gm:broadcast(GM, {batch_publish_delivered, ChPid, Flow, Publishes},
@@ -326,7 +326,7 @@ discard(MsgId, ChPid, Flow, State = #state { gm = GM,
326326
backing_queue = BQ,
327327
backing_queue_state = BQS,
328328
seen_status = SS }) ->
329-
false = dict:is_key(MsgId, SS), %% ASSERTION
329+
false = maps:is_key(MsgId, SS), %% ASSERTION
330330
ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}),
331331
ensure_monitoring(ChPid,
332332
State #state { backing_queue_state =
@@ -353,7 +353,7 @@ drain_confirmed(State = #state { backing_queue = BQ,
353353
lists:foldl(
354354
fun (MsgId, {MsgIdsN, SSN}) ->
355355
%% We will never see 'discarded' here
356-
case dict:find(MsgId, SSN) of
356+
case maps:find(MsgId, SSN) of
357357
error ->
358358
{[MsgId | MsgIdsN], SSN};
359359
{ok, published} ->
@@ -364,7 +364,7 @@ drain_confirmed(State = #state { backing_queue = BQ,
364364
%% consequently we need to filter out the
365365
%% confirm here. We will issue the confirm
366366
%% when we see the publish from the channel.
367-
{MsgIdsN, dict:store(MsgId, confirmed, SSN)};
367+
{MsgIdsN, maps:put(MsgId, confirmed, SSN)};
368368
{ok, confirmed} ->
369369
%% Well, confirms are racy by definition.
370370
{[MsgId | MsgIdsN], SSN}
@@ -457,7 +457,7 @@ msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
457457
info(backing_queue_status,
458458
State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
459459
BQ:info(backing_queue_status, BQS) ++
460-
[ {mirror_seen, dict:size(State #state.seen_status)},
460+
[ {mirror_seen, maps:size(State #state.seen_status)},
461461
{mirror_senders, sets:size(State #state.known_senders)} ];
462462
info(Item, #state { backing_queue = BQ, backing_queue_state = BQS }) ->
463463
BQ:info(Item, BQS).
@@ -480,7 +480,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
480480
%% it.
481481

482482
%% We will never see {published, ChPid, MsgSeqNo} here.
483-
case dict:find(MsgId, SS) of
483+
case maps:find(MsgId, SS) of
484484
error ->
485485
%% We permit the underlying BQ to have a peek at it, but
486486
%% only if we ourselves are not filtering out the msg.
@@ -494,7 +494,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
494494
%% immediately after calling is_duplicate). The msg is
495495
%% invalid. We will not see this again, nor will we be
496496
%% further involved in confirming this message, so erase.
497-
{true, State #state { seen_status = dict:erase(MsgId, SS) }};
497+
{true, State #state { seen_status = maps:remove(MsgId, SS) }};
498498
{ok, Disposition}
499499
when Disposition =:= confirmed
500500
%% It got published when we were a slave via gm, and
@@ -509,7 +509,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
509509
%% Message was discarded while we were a slave. Confirm now.
510510
%% As above, amqqueue_process will have the entry for the
511511
%% msg_id_to_channel mapping.
512-
{true, State #state { seen_status = dict:erase(MsgId, SS),
512+
{true, State #state { seen_status = maps:remove(MsgId, SS),
513513
confirmed = [MsgId | Confirmed] }}
514514
end.
515515

src/rabbit_mirror_queue_slave.erl

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,10 @@ handle_go(Q = #amqqueue{name = QName}) ->
129129
rate_timer_ref = undefined,
130130
sync_timer_ref = undefined,
131131

132-
sender_queues = dict:new(),
133-
msg_id_ack = dict:new(),
132+
sender_queues = #{},
133+
msg_id_ack = #{},
134134

135-
msg_id_status = dict:new(),
135+
msg_id_status = #{},
136136
known_senders = pmon:new(delegate),
137137

138138
depth_delta = undefined
@@ -310,7 +310,7 @@ handle_cast({sync_start, Ref, Syncer},
310310
State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State),
311311
S = fun({MA, TRefN, BQSN}) ->
312312
State1#state{depth_delta = undefined,
313-
msg_id_ack = dict:from_list(MA),
313+
msg_id_ack = maps:from_list(MA),
314314
rate_timer_ref = TRefN,
315315
backing_queue_state = BQSN}
316316
end,
@@ -546,7 +546,7 @@ send_or_record_confirm(published, #delivery { sender = ChPid,
546546
id = MsgId,
547547
is_persistent = true } },
548548
MS, #state { q = #amqqueue { durable = true } }) ->
549-
dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS);
549+
maps:put(MsgId, {published, ChPid, MsgSeqNo} , MS);
550550
send_or_record_confirm(_Status, #delivery { sender = ChPid,
551551
confirm = true,
552552
msg_seq_no = MsgSeqNo },
@@ -559,20 +559,20 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
559559
lists:foldl(
560560
fun (MsgId, {CMsN, MSN} = Acc) ->
561561
%% We will never see 'discarded' here
562-
case dict:find(MsgId, MSN) of
562+
case maps:find(MsgId, MSN) of
563563
error ->
564564
%% If it needed confirming, it'll have
565565
%% already been done.
566566
Acc;
567567
{ok, published} ->
568568
%% Still not seen it from the channel, just
569569
%% record that it's been confirmed.
570-
{CMsN, dict:store(MsgId, confirmed, MSN)};
570+
{CMsN, maps:put(MsgId, confirmed, MSN)};
571571
{ok, {published, ChPid, MsgSeqNo}} ->
572572
%% Seen from both GM and Channel. Can now
573573
%% confirm.
574574
{rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN),
575-
dict:erase(MsgId, MSN)};
575+
maps:remove(MsgId, MSN)};
576576
{ok, confirmed} ->
577577
%% It's already been confirmed. This is
578578
%% probably it's been both sync'd to disk
@@ -672,21 +672,21 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
672672
%% Master, or MTC in queue_process.
673673

674674
St = [published, confirmed, discarded],
675-
SS = dict:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS),
676-
AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)],
675+
SS = maps:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS),
676+
AckTags = [AckTag || {_MsgId, AckTag} <- maps:to_list(MA)],
677677

678678
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
679679
QName, CPid, BQ, BQS, GM, AckTags, SS, MPids),
680680

681-
MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
681+
MTC = maps:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
682682
gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
683683
(_Msgid, _Status, MTC0) ->
684684
MTC0
685685
end, gb_trees:empty(), MS),
686686
Deliveries = [promote_delivery(Delivery) ||
687-
{_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ),
687+
{_ChPid, {PubQ, _PendCh, _ChState}} <- maps:to_list(SQ),
688688
Delivery <- queue:to_list(PubQ)],
689-
AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)],
689+
AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- maps:to_list(SQ)],
690690
KS1 = lists:foldl(fun (ChPid0, KS0) ->
691691
pmon:demonitor(ChPid0, KS0)
692692
end, KS, AwaitGmDown),
@@ -798,20 +798,20 @@ forget_sender(Down1, Down2) when Down1 =/= Down2 -> true.
798798
maybe_forget_sender(ChPid, ChState, State = #state { sender_queues = SQ,
799799
msg_id_status = MS,
800800
known_senders = KS }) ->
801-
case dict:find(ChPid, SQ) of
801+
case maps:find(ChPid, SQ) of
802802
error ->
803803
State;
804804
{ok, {MQ, PendCh, ChStateRecord}} ->
805805
case forget_sender(ChState, ChStateRecord) of
806806
true ->
807807
credit_flow:peer_down(ChPid),
808-
State #state { sender_queues = dict:erase(ChPid, SQ),
808+
State #state { sender_queues = maps:remove(ChPid, SQ),
809809
msg_id_status = lists:foldl(
810-
fun dict:erase/2,
810+
fun maps:remove/2,
811811
MS, sets:to_list(PendCh)),
812812
known_senders = pmon:demonitor(ChPid, KS) };
813813
false ->
814-
SQ1 = dict:store(ChPid, {MQ, PendCh, ChState}, SQ),
814+
SQ1 = maps:put(ChPid, {MQ, PendCh, ChState}, SQ),
815815
State #state { sender_queues = SQ1 }
816816
end
817817
end.
@@ -823,32 +823,32 @@ maybe_enqueue_message(
823823
send_mandatory(Delivery), %% must do this before confirms
824824
State1 = ensure_monitoring(ChPid, State),
825825
%% We will never see {published, ChPid, MsgSeqNo} here.
826-
case dict:find(MsgId, MS) of
826+
case maps:find(MsgId, MS) of
827827
error ->
828828
{MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ),
829829
MQ1 = queue:in(Delivery, MQ),
830-
SQ1 = dict:store(ChPid, {MQ1, PendingCh, ChState}, SQ),
830+
SQ1 = maps:put(ChPid, {MQ1, PendingCh, ChState}, SQ),
831831
State1 #state { sender_queues = SQ1 };
832832
{ok, Status} ->
833833
MS1 = send_or_record_confirm(
834-
Status, Delivery, dict:erase(MsgId, MS), State1),
834+
Status, Delivery, maps:remove(MsgId, MS), State1),
835835
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
836836
State1 #state { msg_id_status = MS1,
837837
sender_queues = SQ1 }
838838
end.
839839

840840
get_sender_queue(ChPid, SQ) ->
841-
case dict:find(ChPid, SQ) of
841+
case maps:find(ChPid, SQ) of
842842
error -> {queue:new(), sets:new(), running};
843843
{ok, Val} -> Val
844844
end.
845845

846846
remove_from_pending_ch(MsgId, ChPid, SQ) ->
847-
case dict:find(ChPid, SQ) of
847+
case maps:find(ChPid, SQ) of
848848
error ->
849849
SQ;
850850
{ok, {MQ, PendingCh, ChState}} ->
851-
dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState},
851+
maps:put(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState},
852852
SQ)
853853
end.
854854

@@ -865,7 +865,7 @@ publish_or_discard(Status, ChPid, MsgId,
865865
case queue:out(MQ) of
866866
{empty, _MQ2} ->
867867
{MQ, sets:add_element(MsgId, PendingCh),
868-
dict:store(MsgId, Status, MS)};
868+
maps:put(MsgId, Status, MS)};
869869
{{value, Delivery = #delivery {
870870
message = #basic_message { id = MsgId } }}, MQ2} ->
871871
{MQ2, PendingCh,
@@ -880,7 +880,7 @@ publish_or_discard(Status, ChPid, MsgId,
880880
%% expecting any confirms from us.
881881
{MQ, PendingCh, MS}
882882
end,
883-
SQ1 = dict:store(ChPid, {MQ1, PendingCh1, ChState}, SQ),
883+
SQ1 = maps:put(ChPid, {MQ1, PendingCh1, ChState}, SQ),
884884
State1 #state { sender_queues = SQ1, msg_id_status = MS1 }.
885885

886886

@@ -1002,17 +1002,17 @@ msg_ids_to_acktags(MsgIds, MA) ->
10021002
{AckTags, MA1} =
10031003
lists:foldl(
10041004
fun (MsgId, {Acc, MAN}) ->
1005-
case dict:find(MsgId, MA) of
1005+
case maps:find(MsgId, MA) of
10061006
error -> {Acc, MAN};
1007-
{ok, AckTag} -> {[AckTag | Acc], dict:erase(MsgId, MAN)}
1007+
{ok, AckTag} -> {[AckTag | Acc], maps:remove(MsgId, MAN)}
10081008
end
10091009
end, {[], MA}, MsgIds),
10101010
{lists:reverse(AckTags), MA1}.
10111011

10121012
maybe_store_ack(false, _MsgId, _AckTag, State) ->
10131013
State;
10141014
maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA }) ->
1015-
State #state { msg_id_ack = dict:store(MsgId, AckTag, MA) }.
1015+
State #state { msg_id_ack = maps:put(MsgId, AckTag, MA) }.
10161016

10171017
set_delta(0, State = #state { depth_delta = undefined }) ->
10181018
ok = record_synchronised(State#state.q),

src/rabbit_mnesia.erl

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,14 +139,17 @@ init_from_config() ->
139139
{ok, _} ->
140140
e(invalid_cluster_nodes_conf)
141141
end,
142-
case DiscoveredNodes of
142+
rabbit_log:info("All discovered existing cluster peers: ~p~n",
143+
[rabbit_peer_discovery:format_discovered_nodes(DiscoveredNodes)]),
144+
Peers = nodes_excl_me(DiscoveredNodes),
145+
case Peers of
143146
[] ->
144147
rabbit_log:info("Discovered no peer nodes to cluster with"),
145148
init_db_and_upgrade([node()], disc, false, _Retry = true);
146149
_ ->
147-
rabbit_log:info("Discovered peer nodes: ~s~n",
148-
[rabbit_peer_discovery:format_discovered_nodes(DiscoveredNodes)]),
149-
join_discovered_peers(DiscoveredNodes, NodeType)
150+
rabbit_log:info("Peer nodes we can cluster with: ~s~n",
151+
[rabbit_peer_discovery:format_discovered_nodes(Peers)]),
152+
join_discovered_peers(Peers, NodeType)
150153
end.
151154

152155
%% Attempts to join discovered,

0 commit comments

Comments
 (0)