Skip to content

Commit 5981a69

Browse files
author
Matthias Radestock
committed
recover exchanges/bindings/queues in per-item transactions
Because recovering them in large, single transactions is incredibly slow, with complexity that is far worse than linear in the number of entries we recover, presumably due to the way mnesia represents transaction-local storage.
1 parent f8fa3ed commit 5981a69

File tree

3 files changed

+52
-29
lines changed

3 files changed

+52
-29
lines changed

src/rabbit_amqqueue.erl

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -122,19 +122,32 @@ recover() ->
122122

123123
recover_durable_queues() ->
124124
Node = node(),
125-
%% TODO: use dirty ops instead
126-
R = rabbit_misc:execute_mnesia_transaction(
127-
fun () ->
128-
qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
129-
<- mnesia:table(durable_queues),
130-
node(Pid) == Node]))
131-
end),
132-
Queues = lists:map(fun start_queue_process/1, R),
133-
rabbit_misc:execute_mnesia_transaction(
134-
fun () ->
135-
lists:foreach(fun store_queue/1, Queues),
136-
ok
137-
end).
125+
lists:foreach(
126+
fun (RecoveredQ) ->
127+
Q = start_queue_process(RecoveredQ),
128+
%% We need to catch the case where a client connected to
129+
%% another node has deleted the queue (and possibly
130+
%% re-created it).
131+
case rabbit_misc:execute_mnesia_transaction(
132+
fun () -> case mnesia:match_object(
133+
durable_queues, RecoveredQ, read) of
134+
[_] -> ok = store_queue(Q),
135+
true;
136+
[] -> false
137+
end
138+
end) of
139+
true -> ok;
140+
false -> exit(Q#amqqueue.pid, shutdown)
141+
end
142+
end,
143+
%% TODO: use dirty ops instead
144+
rabbit_misc:execute_mnesia_transaction(
145+
fun () ->
146+
qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
147+
<- mnesia:table(durable_queues),
148+
node(Pid) == Node]))
149+
end)),
150+
ok.
138151

139152
declare(QueueName, Durable, AutoDelete, Args) ->
140153
Q = start_queue_process(#amqqueue{name = QueueName,

src/rabbit_exchange.erl

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -102,22 +102,15 @@
102102
-define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
103103

104104
recover() ->
105-
rabbit_misc:execute_mnesia_transaction(
106-
fun () ->
107-
mnesia:foldl(
108-
fun (Exchange, Acc) ->
109-
ok = mnesia:write(Exchange),
110-
Acc
111-
end, ok, durable_exchanges),
112-
mnesia:foldl(
113-
fun (Route, Acc) ->
114-
{_, ReverseRoute} = route_with_reverse(Route),
115-
ok = mnesia:write(Route),
116-
ok = mnesia:write(ReverseRoute),
117-
Acc
118-
end, ok, durable_routes),
119-
ok
120-
end).
105+
ok = rabbit_misc:table_foreach(
106+
fun(Exchange) -> ok = mnesia:write(Exchange) end,
107+
durable_exchanges),
108+
ok = rabbit_misc:table_foreach(
109+
fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route),
110+
ok = mnesia:write(Route),
111+
ok = mnesia:write(ReverseRoute)
112+
end, durable_routes),
113+
ok.
121114

122115
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
123116
Exchange = #exchange{name = ExchangeName,

src/rabbit_misc.erl

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
-export([ensure_ok/2]).
4747
-export([localnode/1, tcp_name/3]).
4848
-export([intersperse/2, upmap/2, map_in_order/2]).
49+
-export([table_foreach/2]).
4950
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
5051
-export([append_file/2, ensure_parent_dirs_exist/1]).
5152
-export([format_stderr/2]).
@@ -97,6 +98,7 @@
9798
-spec(intersperse/2 :: (A, [A]) -> [A]).
9899
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
99100
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
101+
-spec(table_foreach/2 :: (fun ((any()) -> any()), atom()) -> 'ok').
100102
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
101103
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) ->
102104
'ok' | 'aborted').
@@ -295,6 +297,21 @@ map_in_order(F, L) ->
295297
lists:reverse(
296298
lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)).
297299

300+
%% For each entry in a table, execute a function in a transaction.
301+
%% This is often far more efficient than wrapping a tx around the lot.
302+
%%
303+
%% We ignore entries that have been modified or removed.
304+
table_foreach(F, TableName) ->
305+
lists:foreach(
306+
fun (E) -> execute_mnesia_transaction(
307+
fun () -> case mnesia:match_object(TableName, E, read) of
308+
[] -> ok;
309+
_ -> F(E)
310+
end
311+
end)
312+
end, dirty_read_all(TableName)),
313+
ok.
314+
298315
dirty_read_all(TableName) ->
299316
mnesia:dirty_select(TableName, [{'$1',[],['$1']}]).
300317

0 commit comments

Comments
 (0)