Skip to content

Commit c508660

Browse files
author
Matthew Sackman
committed
merging multiple heads of v1_5
2 parents 8835081 + 4057522 commit c508660

File tree

3 files changed

+52
-29
lines changed

3 files changed

+52
-29
lines changed

src/rabbit_amqqueue.erl

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -122,19 +122,32 @@ recover() ->
122122

123123
recover_durable_queues() ->
124124
Node = node(),
125-
%% TODO: use dirty ops instead
126-
R = rabbit_misc:execute_mnesia_transaction(
127-
fun () ->
128-
qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
129-
<- mnesia:table(durable_queues),
130-
node(Pid) == Node]))
131-
end),
132-
Queues = lists:map(fun start_queue_process/1, R),
133-
rabbit_misc:execute_mnesia_transaction(
134-
fun () ->
135-
lists:foreach(fun store_queue/1, Queues),
136-
ok
137-
end).
125+
lists:foreach(
126+
fun (RecoveredQ) ->
127+
Q = start_queue_process(RecoveredQ),
128+
%% We need to catch the case where a client connected to
129+
%% another node has deleted the queue (and possibly
130+
%% re-created it).
131+
case rabbit_misc:execute_mnesia_transaction(
132+
fun () -> case mnesia:match_object(
133+
durable_queues, RecoveredQ, read) of
134+
[_] -> ok = store_queue(Q),
135+
true;
136+
[] -> false
137+
end
138+
end) of
139+
true -> ok;
140+
false -> exit(Q#amqqueue.pid, shutdown)
141+
end
142+
end,
143+
%% TODO: use dirty ops instead
144+
rabbit_misc:execute_mnesia_transaction(
145+
fun () ->
146+
qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
147+
<- mnesia:table(durable_queues),
148+
node(Pid) == Node]))
149+
end)),
150+
ok.
138151

139152
declare(QueueName, Durable, AutoDelete, Args) ->
140153
Q = start_queue_process(#amqqueue{name = QueueName,

src/rabbit_exchange.erl

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -102,22 +102,15 @@
102102
-define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
103103

104104
recover() ->
105-
rabbit_misc:execute_mnesia_transaction(
106-
fun () ->
107-
mnesia:foldl(
108-
fun (Exchange, Acc) ->
109-
ok = mnesia:write(Exchange),
110-
Acc
111-
end, ok, durable_exchanges),
112-
mnesia:foldl(
113-
fun (Route, Acc) ->
114-
{_, ReverseRoute} = route_with_reverse(Route),
115-
ok = mnesia:write(Route),
116-
ok = mnesia:write(ReverseRoute),
117-
Acc
118-
end, ok, durable_routes),
119-
ok
120-
end).
105+
ok = rabbit_misc:table_foreach(
106+
fun(Exchange) -> ok = mnesia:write(Exchange) end,
107+
durable_exchanges),
108+
ok = rabbit_misc:table_foreach(
109+
fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route),
110+
ok = mnesia:write(Route),
111+
ok = mnesia:write(ReverseRoute)
112+
end, durable_routes),
113+
ok.
121114

122115
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
123116
Exchange = #exchange{name = ExchangeName,

src/rabbit_misc.erl

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
-export([ensure_ok/2]).
4747
-export([localnode/1, tcp_name/3]).
4848
-export([intersperse/2, upmap/2, map_in_order/2]).
49+
-export([table_foreach/2]).
4950
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
5051
-export([append_file/2, ensure_parent_dirs_exist/1]).
5152
-export([format_stderr/2]).
@@ -97,6 +98,7 @@
9798
-spec(intersperse/2 :: (A, [A]) -> [A]).
9899
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
99100
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
101+
-spec(table_foreach/2 :: (fun ((any()) -> any()), atom()) -> 'ok').
100102
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
101103
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) ->
102104
'ok' | 'aborted').
@@ -295,6 +297,21 @@ map_in_order(F, L) ->
295297
lists:reverse(
296298
lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)).
297299

300+
%% For each entry in a table, execute a function in a transaction.
301+
%% This is often far more efficient than wrapping a tx around the lot.
302+
%%
303+
%% We ignore entries that have been modified or removed.
304+
table_foreach(F, TableName) ->
305+
lists:foreach(
306+
fun (E) -> execute_mnesia_transaction(
307+
fun () -> case mnesia:match_object(TableName, E, read) of
308+
[] -> ok;
309+
_ -> F(E)
310+
end
311+
end)
312+
end, dirty_read_all(TableName)),
313+
ok.
314+
298315
dirty_read_all(TableName) ->
299316
mnesia:dirty_select(TableName, [{'$1',[],['$1']}]).
300317

0 commit comments

Comments
 (0)