Skip to content

Commit d596582

Browse files
Merge pull request #3167 from rabbitmq/mk-duplicate-shovels-imported-from-definitions
Acquire locks before starting or restarting dynamic Shovels (cherry picked from commit f08c895)
1 parent 74b5fd9 commit d596582

File tree

2 files changed

+47
-5
lines changed

2 files changed

+47
-5
lines changed

deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,28 +35,38 @@ adjust(Name, Def) ->
3535
end,
3636
start_child(Name, Def).
3737

38-
start_child(Name, Def) ->
38+
start_child({VHost, ShovelName} = Name, Def) ->
39+
rabbit_log_shovel:debug("Asked to start a dynamic Shovel named '~s' in virtual host '~s'", [ShovelName, VHost]),
40+
LockId = rabbit_shovel_locks:lock(Name),
3941
cleanup_specs(),
40-
case mirrored_supervisor:start_child(
42+
rabbit_log_shovel:debug("Starting a mirrored supervisor named '~s' in virtual host '~s'", [ShovelName, VHost]),
43+
Result = case mirrored_supervisor:start_child(
4144
?SUPERVISOR,
4245
{Name, {rabbit_shovel_dyn_worker_sup, start_link, [Name, Def]},
4346
transient, ?WORKER_WAIT, worker, [rabbit_shovel_dyn_worker_sup]}) of
4447
{ok, _Pid} -> ok;
4548
{error, {already_started, _Pid}} -> ok
46-
end.
49+
end,
50+
%% release the lock if we managed to acquire one
51+
rabbit_shovel_locks:unlock(LockId),
52+
Result.
4753

4854
child_exists(Name) ->
4955
lists:any(fun ({N, _, _, _}) -> N =:= Name end,
5056
mirrored_supervisor:which_children(?SUPERVISOR)).
5157

52-
stop_child(Name) ->
58+
stop_child({VHost, ShovelName} = Name) ->
59+
rabbit_log_shovel:debug("Asked to stop a dynamic Shovel named '~s' in virtual host '~s'", [ShovelName, VHost]),
60+
LockId = rabbit_shovel_locks:lock(Name),
5361
case get({shovel_worker_autodelete, Name}) of
5462
true -> ok; %% [1]
5563
_ ->
5664
ok = mirrored_supervisor:terminate_child(?SUPERVISOR, Name),
5765
ok = mirrored_supervisor:delete_child(?SUPERVISOR, Name),
5866
rabbit_shovel_status:remove(Name)
59-
end.
67+
end,
68+
rabbit_shovel_locks:unlock(LockId),
69+
ok.
6070

6171
%% [1] An autodeleting worker removes its own parameter, and thus ends
6272
%% up here via the parameter callback. It is a transient worker that
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
8+
-module(rabbit_shovel_locks).
9+
10+
-export([lock/1, unlock/1]).
11+
12+
%%
13+
%% API
14+
%%
15+
16+
lock(Name) ->
17+
Nodes = rabbit_nodes:all_running(),
18+
Retries = rabbit_nodes:lock_retries(),
19+
%% try to acquire a lock to avoid duplicate starts
20+
LockId = case global:set_lock({dynamic_shovel, Name}, Nodes, Retries) of
21+
true -> Name;
22+
false -> undefined
23+
end,
24+
LockId.
25+
26+
unlock(LockId) ->
27+
Nodes = rabbit_nodes:all_running(),
28+
case LockId of
29+
undefined -> ok;
30+
Value -> global:del_lock({dynamic_shovel, Value}, Nodes)
31+
end,
32+
ok.

0 commit comments

Comments
 (0)