Skip to content

Commit dd24555

Browse files
Merge pull request #3981 from rabbitmq/mk-two-staged-definition-import
Import queue and binding definitions when a certain number of nodes join
2 parents aed6490 + 10f9e06 commit dd24555

File tree

2 files changed

+52
-10
lines changed

2 files changed

+52
-10
lines changed

deps/rabbit/src/rabbit_definitions.erl

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -257,16 +257,27 @@ apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) ->
257257
concurrent_for_all(permissions, ActingUser, Map, fun add_permission/2),
258258
concurrent_for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2),
259259

260-
concurrent_for_all(queues, ActingUser, Map, fun add_queue/2),
261260
concurrent_for_all(exchanges, ActingUser, Map, fun add_exchange/2),
262-
concurrent_for_all(bindings, ActingUser, Map, fun add_binding/2),
263261

264262
sequential_for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2),
265263
%% importing policies concurrently can be unsafe as queues will be getting
266264
%% potentially out of order notifications of applicable policy changes
267265
sequential_for_all(policies, ActingUser, Map, fun add_policy/2),
268266
sequential_for_all(parameters, ActingUser, Map, fun add_parameter/2),
269267

268+
rabbit_nodes:if_reached_target_cluster_size(
269+
fun() ->
270+
concurrent_for_all(queues, ActingUser, Map, fun add_queue/2),
271+
concurrent_for_all(bindings, ActingUser, Map, fun add_binding/2)
272+
end,
273+
274+
fun() ->
275+
rabbit_log:info("There are fewer than target cluster size (~b) nodes online,"
276+
" skipping queue and binding import from definitions",
277+
[rabbit_nodes:target_cluster_size_hint()])
278+
end
279+
),
280+
270281
SuccessFun(),
271282
ok
272283
catch {error, E} -> {error, E};
@@ -284,15 +295,25 @@ apply_defs(Map, ActingUser, SuccessFun, VHost) when is_binary(VHost) ->
284295
try
285296
validate_limits(Map, VHost),
286297

287-
concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
288298
concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
289-
concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
290-
291299
sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
292300
%% importing policies concurrently can be unsafe as queues will be getting
293301
%% potentially out of order notifications of applicable policy changes
294302
sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
295303

304+
rabbit_nodes:if_reached_target_cluster_size(
305+
fun() ->
306+
concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
307+
concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3)
308+
end,
309+
310+
fun() ->
311+
rabbit_log:info("There are fewer than target cluster size (~b) nodes online,"
312+
" skipping queue and binding import from definitions",
313+
[rabbit_nodes:target_cluster_size_hint()])
314+
end
315+
),
316+
296317
SuccessFun()
297318
catch {error, E} -> {error, format(E)};
298319
exit:E -> {error, format(E)}
@@ -310,15 +331,25 @@ apply_defs(Map, ActingUser, SuccessFun, ErrorFun, VHost) ->
310331
try
311332
validate_limits(Map, VHost),
312333

313-
concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
314-
concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
315334
concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
316-
317335
sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
318336
%% importing policies concurrently can be unsafe as queues will be getting
319337
%% potentially out of order notifications of applicable policy changes
320338
sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
321339

340+
rabbit_nodes:if_reached_target_cluster_size(
341+
fun() ->
342+
concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
343+
concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3)
344+
end,
345+
346+
fun() ->
347+
rabbit_log:info("There are fewer than target cluster size (~b) nodes online,"
348+
" skipping queue and binding import from definitions",
349+
[rabbit_nodes:target_cluster_size_hint()])
350+
end
351+
),
352+
322353
SuccessFun()
323354
catch {error, E} -> ErrorFun(format(E));
324355
exit:E -> ErrorFun(format(E))

deps/rabbit/src/rabbit_nodes.erl

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
await_running_count/2, is_single_node_cluster/0,
1515
boot/0]).
1616
-export([persistent_cluster_id/0, seed_internal_cluster_id/0, seed_user_provided_cluster_name/0]).
17-
-export([all/0, all_running_with_hashes/0, target_cluster_size_hint/0, reached_target_cluster_size/0]).
17+
-export([all/0, all_running_with_hashes/0, target_cluster_size_hint/0, reached_target_cluster_size/0,
18+
if_reached_target_cluster_size/2]).
1819
-export([lock_id/1, lock_retries/0]).
1920

2021
-include_lib("kernel/include/inet.hrl").
@@ -82,7 +83,7 @@ is_process_running(Node, Process) ->
8283
-spec cluster_name() -> binary().
8384

8485
cluster_name() ->
85-
case rabbit_runtime_parameters:value_global(cluster_name) of
86+
case rabbit_runtime_parameters:value_global(cluster_name) of
8687
not_found -> cluster_name_default();
8788
Name -> Name
8889
end.
@@ -183,6 +184,16 @@ target_cluster_size_hint() ->
183184
reached_target_cluster_size() ->
184185
running_count() >= target_cluster_size_hint().
185186

187+
-spec if_reached_target_cluster_size(ConditionSatisfiedFun :: fun(), ConditionNotSatisfiedFun :: fun()) -> boolean().
188+
if_reached_target_cluster_size(ConditionSatisfiedFun, ConditionNotSatisfiedFun) ->
189+
case reached_target_cluster_size() of
190+
true ->
191+
ConditionSatisfiedFun(),
192+
true;
193+
false ->
194+
ConditionNotSatisfiedFun(),
195+
false
196+
end.
186197

187198
-spec lock_id(Node :: node()) -> {ResourceId :: string(), LockRequesterId :: node()}.
188199
lock_id(Node) ->

0 commit comments

Comments
 (0)