Skip to content

Commit ce0f160

Browse files
author
Emile Joubert
committed
Merged bug24794 into default
2 parents a1f2f13 + 56b9883 commit ce0f160

File tree

4 files changed

+82
-13
lines changed

4 files changed

+82
-13
lines changed

src/rabbit_amqqueue.erl

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242

4343
-define(MORE_CONSUMER_CREDIT_AFTER, 50).
4444

45+
-define(FAILOVER_WAIT_MILLIS, 100).
46+
4547
%%----------------------------------------------------------------------------
4648

4749
-ifdef(use_specs).
@@ -421,9 +423,26 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
421423

422424
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
423425

426+
%% We need to account for the idea that queues may be mid-promotion
427+
%% during force_event_refresh (since it's likely we're doing this in
428+
%% the first place since a node failed). Therefore we keep poking at
429+
%% the list of queues until we were able to talk to a live process or
430+
%% the queue no longer exists.
424431
force_event_refresh() ->
425-
[gen_server2:cast(Q#amqqueue.pid, force_event_refresh) || Q <- list()],
426-
ok.
432+
force_event_refresh([Q#amqqueue.name || Q <- list()]).
433+
434+
force_event_refresh(QNames) ->
435+
Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)],
436+
{_, Bad} = rabbit_misc:multi_call(
437+
[Q#amqqueue.pid || Q <- Qs], force_event_refresh),
438+
FailedPids = [Pid || {Pid, _Reason} <- Bad],
439+
Failed = [Name || #amqqueue{name = Name, pid = Pid} <- Qs,
440+
lists:member(Pid, FailedPids)],
441+
case Failed of
442+
[] -> ok;
443+
_ -> timer:sleep(?FAILOVER_WAIT_MILLIS),
444+
force_event_refresh(Failed)
445+
end.
427446

428447
consumers(#amqqueue{ pid = QPid }) ->
429448
delegate_call(QPid, consumers).

src/rabbit_amqqueue_process.erl

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1231,7 +1231,18 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
12311231
gen_server2:reply(From, ok),
12321232
noreply(subtract_acks(
12331233
ChPid, AckTags, State,
1234-
fun (State1) -> requeue_and_run(AckTags, State1) end)).
1234+
fun (State1) -> requeue_and_run(AckTags, State1) end));
1235+
1236+
handle_call(force_event_refresh, _From,
1237+
State = #q{exclusive_consumer = Exclusive}) ->
1238+
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
1239+
case Exclusive of
1240+
none -> [emit_consumer_created(Ch, CTag, false, AckRequired) ||
1241+
{Ch, CTag, AckRequired} <- consumers(State)];
1242+
{Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
1243+
emit_consumer_created(Ch, CTag, true, AckRequired)
1244+
end,
1245+
reply(ok, State).
12351246

12361247
handle_cast({confirm, MsgSeqNos, QPid}, State) ->
12371248
handle_confirm(MsgSeqNos, QPid, State);
@@ -1328,16 +1339,6 @@ handle_cast({set_maximum_since_use, Age}, State) ->
13281339
ok = file_handle_cache:set_maximum_since_use(Age),
13291340
noreply(State);
13301341

1331-
handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) ->
1332-
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
1333-
case Exclusive of
1334-
none -> [emit_consumer_created(Ch, CTag, false, AckRequired) ||
1335-
{Ch, CTag, AckRequired} <- consumers(State)];
1336-
{Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
1337-
emit_consumer_created(Ch, CTag, true, AckRequired)
1338-
end,
1339-
noreply(State);
1340-
13411342
handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) ->
13421343
dead_letter_msg(Msg, AckTag, Reason, State).
13431344

src/rabbit_misc.erl

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
-export([pget/2, pget/3, pget_or_die/2]).
5959
-export([format_message_queue/2]).
6060
-export([append_rpc_all_nodes/4]).
61+
-export([multi_call/2]).
6162
-export([quit/1]).
6263

6364
%%----------------------------------------------------------------------------
@@ -200,6 +201,8 @@
200201
-spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()).
201202
-spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()).
202203
-spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]).
204+
-spec(multi_call/2 ::
205+
([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}).
203206
-spec(quit/1 :: (integer() | string()) -> no_return()).
204207

205208
-endif.
@@ -880,6 +883,31 @@ append_rpc_all_nodes(Nodes, M, F, A) ->
880883
_ -> Res
881884
end || Res <- ResL]).
882885

886+
%% A simplified version of gen_server:multi_call/2 with a sane
887+
%% API. This is not in gen_server2 as there is no useful
888+
%% infrastructure there to share.
889+
multi_call(Pids, Req) ->
890+
MonitorPids = [start_multi_call(Pid, Req) || Pid <- Pids],
891+
receive_multi_call(MonitorPids, [], []).
892+
893+
start_multi_call(Pid, Req) when is_pid(Pid) ->
894+
Mref = erlang:monitor(process, Pid),
895+
Pid ! {'$gen_call', {self(), Mref}, Req},
896+
{Mref, Pid}.
897+
898+
receive_multi_call([], Good, Bad) ->
899+
{lists:reverse(Good), lists:reverse(Bad)};
900+
receive_multi_call([{Mref, Pid} | MonitorPids], Good, Bad) ->
901+
receive
902+
{Mref, Reply} ->
903+
erlang:demonitor(Mref, [flush]),
904+
receive_multi_call(MonitorPids, [{Pid, Reply} | Good], Bad);
905+
{'DOWN', Mref, _, _, noconnection} ->
906+
receive_multi_call(MonitorPids, Good, [{Pid, nodedown} | Bad]);
907+
{'DOWN', Mref, _, _, Reason} ->
908+
receive_multi_call(MonitorPids, Good, [{Pid, Reason} | Bad])
909+
end.
910+
883911
%% the slower shutdown on windows required to flush stdout
884912
quit(Status) ->
885913
case os:type() of

src/rabbit_tests.erl

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ all_tests() ->
3535
passed = mirrored_supervisor_tests:all_tests(),
3636
application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
3737
ok = file_handle_cache:set_limit(10),
38+
passed = test_multi_call(),
3839
passed = test_file_handle_cache(),
3940
passed = test_backing_queue(),
4041
passed = test_priority_queue(),
@@ -107,6 +108,26 @@ run_cluster_dependent_tests(SecondaryNode) ->
107108

108109
passed.
109110

111+
test_multi_call() ->
112+
Fun = fun() ->
113+
receive
114+
{'$gen_call', {From, Mref}, request} ->
115+
From ! {Mref, response}
116+
end,
117+
receive
118+
never -> ok
119+
end
120+
end,
121+
Pid1 = spawn(Fun),
122+
Pid2 = spawn(Fun),
123+
Pid3 = spawn(Fun),
124+
exit(Pid2, bang),
125+
{[{Pid1, response}, {Pid3, response}], [{Pid2, _Fail}]} =
126+
rabbit_misc:multi_call([Pid1, Pid2, Pid3], request),
127+
exit(Pid1, bang),
128+
exit(Pid3, bang),
129+
passed.
130+
110131
test_priority_queue() ->
111132

112133
false = priority_queue:is_queue(not_a_queue),

0 commit comments

Comments
 (0)