Skip to content

Commit 8b87747

Browse files
ctl delete_shovel: use a more effective way
Dynamic Shovels keep track of their status in a node-local ETS table which is updated as Shovels go through the lifecycle: start, init (connect, declare the topology), stop. This makes failing Shovels a bit special: their status records will not be long lived, which means it will be considered not to exist by certain code paths. In addition, for such Shovels we do not know what node they are hosted on. But that's fine: we just need to clear their runtime parameter and a periodic Shovel cleanup will remove all children for which no schema database entry (a runtime parameter one) does not exist. rabbitmq_shovel_management's key integration suite has been reworked and expanded to include a case where the Shovel has no chance of successfully connecting. This also deletes a mock-based test suite which does not serve much of a purpose. (cherry picked from commit cae964d) # Conflicts: # deps/rabbitmq_shovel_management/test/http_SUITE.erl # deps/rabbitmq_shovel_management/test/rabbit_shovel_mgmt_SUITE.erl
1 parent 2bd6921 commit 8b87747

File tree

11 files changed

+263
-198
lines changed

11 files changed

+263
-198
lines changed

deps/rabbitmq_ct_helpers/src/rabbit_mgmt_test_util.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,9 @@ http_delete(Config, Path, User, Pass, CodeExp) ->
175175
assert_code(CodeExp, CodeAct, "DELETE", Path, ResBody),
176176
decode(CodeExp, Headers, ResBody).
177177

178+
http_get_fails(Config, Path) ->
179+
{error, {failed_connect, _}} = req(Config, get, Path, []).
180+
178181
format_for_upload(none) ->
179182
<<"">>;
180183
format_for_upload(List) ->

deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand.erl

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,12 @@ run([Name], #{node := Node, vhost := VHost}) ->
6969
Error;
7070
Xs when is_list(Xs) ->
7171
ErrMsg = rabbit_misc:format("Shovel with the given name was not found "
72-
"on the target node '~ts' and / or virtual host '~ts'",
72+
"on the target node '~ts' and/or virtual host '~ts'. "
73+
"It may be failing to connect and report its state, will delete its runtime parameter...",
7374
[Node, VHost]),
7475
case rabbit_shovel_status:find_matching_shovel(VHost, Name, Xs) of
7576
undefined ->
77+
try_force_removing(Node, VHost, Name, ActingUser),
7678
{error, rabbit_data_coercion:to_binary(ErrMsg)};
7779
Match ->
7880
{{_Name, _VHost}, _Type, {_State, Opts}, _Timestamp} = Match,
@@ -83,10 +85,14 @@ run([Name], #{node := Node, vhost := VHost}) ->
8385
Error;
8486
{error, not_found} ->
8587
ErrMsg = rabbit_misc:format("Shovel with the given name was not found "
86-
"on the target node '~ts' and / or virtual host '~ts'",
88+
"on the target node '~ts' and/or virtual host '~ts'. "
89+
"It may be failing to connect and report its state, will delete its runtime parameter...",
8790
[Node, VHost]),
91+
try_force_removing(HostingNode, VHost, Name, ActingUser),
8892
{error, rabbit_data_coercion:to_binary(ErrMsg)};
89-
ok -> ok
93+
ok ->
94+
_ = try_clearing_runtime_parameter(Node, VHost, Name, ActingUser),
95+
ok
9096
end
9197
end
9298
end.
@@ -99,3 +105,16 @@ aliases() ->
99105

100106
output(E, _Opts) ->
101107
'Elixir.RabbitMQ.CLI.DefaultOutput':output(E).
108+
109+
try_force_removing(Node, VHost, ShovelName, ActingUser) ->
110+
%% Deleting the runtime parameter will cause the dynamic Shovel's child tree to be stopped eventually
111+
%% regardless of the node it is hosted on. MK.
112+
_ = try_clearing_runtime_parameter(Node, VHost, ShovelName, ActingUser),
113+
%% These are best effort attempts to delete the Shovel. Clearing the parameter does all the heavy lifting. MK.
114+
_ = try_stopping_child_process(Node, VHost, ShovelName).
115+
116+
try_clearing_runtime_parameter(Node, VHost, ShovelName, ActingUser) ->
117+
_ = rabbit_misc:rpc_call(Node, rabbit_runtime_parameters, clear, [VHost, <<"shovel">>, ShovelName, ActingUser]).
118+
119+
try_stopping_child_process(Node, VHost, ShovelName) ->
120+
_ = rabbit_misc:rpc_call(Node, rabbit_shovel_dyn_worker_sup_sup, stop_and_delete_child, [{VHost, ShovelName}]).

deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ShovelStatusCommand.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,9 @@ fmt_status({'running', Proplist}, Map) ->
112112
fmt_status('starting' = St, Map) ->
113113
Map#{state => St,
114114
source => <<>>,
115+
source_protocol => <<>>,
115116
destination => <<>>,
117+
destination_protocol => <<>>,
116118
termination_reason => <<>>};
117119
fmt_status({'terminated' = St, Reason}, Map) ->
118120
Map#{state => St,

deps/rabbitmq_shovel/src/rabbit_shovel_status.erl

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
status/0,
1717
lookup/1,
1818
cluster_status/0,
19-
cluster_status_with_nodes/0]).
19+
cluster_status_with_nodes/0,
20+
get_status_table/0
21+
]).
2022
-export([inject_node_info/2, find_matching_shovel/3]).
2123

2224
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -93,6 +95,10 @@ cluster_status_with_nodes() ->
9395
lookup(Name) ->
9496
gen_server:call(?SERVER, {lookup, Name}, infinity).
9597

98+
-spec get_status_table() -> ok.
99+
get_status_table() ->
100+
gen_server:call(?SERVER, get_status_table).
101+
96102
init([]) ->
97103
?ETS_NAME = ets:new(?ETS_NAME,
98104
[named_table, {keypos, #entry.name}, private]),
@@ -114,11 +120,20 @@ handle_call({lookup, Name}, _From, State) ->
114120
{timestamp, Entry#entry.timestamp}];
115121
[] -> not_found
116122
end,
117-
{reply, Link, State}.
123+
{reply, Link, State};
124+
125+
handle_call(get_status_table, _From, State) ->
126+
Entries = ets:tab2list(?ETS_NAME),
127+
{reply, Entries, State}.
118128

119129
handle_cast({report, Name, Type, Info, Timestamp}, State) ->
120-
true = ets:insert(?ETS_NAME, #entry{name = Name, type = Type, info = Info,
121-
timestamp = Timestamp}),
130+
Entry = #entry{
131+
name = Name,
132+
type = Type,
133+
info = Info,
134+
timestamp = Timestamp
135+
},
136+
true = ets:insert(?ETS_NAME, Entry),
122137
rabbit_event:notify(shovel_worker_status,
123138
split_name(Name) ++ split_status(Info)),
124139
{noreply, State};
@@ -159,9 +174,17 @@ code_change(_OldVsn, State, _Extra) ->
159174
-spec inject_node_info(node(), [status_tuple()]) -> [status_tuple()].
160175
inject_node_info(Node, Shovels) ->
161176
lists:map(
162-
fun({Name, Type, {State, Opts}, Timestamp}) ->
163-
Opts1 = Opts ++ [{node, Node}],
164-
{Name, Type, {State, Opts1}, Timestamp}
177+
%% starting
178+
fun({Name, Type, State, Timestamp}) when is_atom(State) ->
179+
Opts = [{node, Node}],
180+
{Name, Type, {State, Opts}, Timestamp};
181+
%% terminated
182+
({Name, Type, {terminated, Reason}, Timestamp}) ->
183+
{Name, Type, {terminated, Reason}, Timestamp};
184+
%% running
185+
({Name, Type, {State, Opts}, Timestamp}) ->
186+
Opts1 = Opts ++ [{node, Node}],
187+
{Name, Type, {State, Opts1}, Timestamp}
165188
end, Shovels).
166189

167190
-spec find_matching_shovel(rabbit_types:vhost(), binary(), [status_tuple()]) -> status_tuple() | undefined.

deps/rabbitmq_shovel/src/rabbit_shovel_util.erl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,13 @@ add_timestamp_header(Props = #'P_basic'{headers = Headers}) ->
3939
delete_shovel(VHost, Name, ActingUser) ->
4040
case rabbit_shovel_status:lookup({VHost, Name}) of
4141
not_found ->
42+
%% Follow the user's obvious intent and delete the runtime parameter just in case the Shovel is in
43+
%% a starting-failing-restarting loop. MK.
44+
rabbit_log:info("Will delete runtime parameters of shovel '~ts' in virtual host '~ts'", [Name, VHost]),
45+
ok = rabbit_runtime_parameters:clear(VHost, <<"shovel">>, Name, ActingUser),
4246
{error, not_found};
4347
_Obj ->
48+
rabbit_log:info("Will delete runtime parameters of shovel '~ts' in virtual host '~ts'", [Name, VHost]),
4449
ok = rabbit_runtime_parameters:clear(VHost, <<"shovel">>, Name, ActingUser)
4550
end.
4651

deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ handle_call(_Msg, _From, State) ->
6262
{noreply, State}.
6363

6464
handle_cast(init, State = #state{config = Config0}) ->
65+
rabbit_log_shovel:debug("Shovel ~ts is reporting its status", [human_readable_name(State#state.name)]),
66+
rabbit_shovel_status:report(State#state.name, State#state.type, starting),
67+
rabbit_log_shovel:info("Shovel ~ts will now try to connect...", [human_readable_name(State#state.name)]),
6568
try rabbit_shovel_behaviour:connect_source(Config0) of
6669
Config ->
6770
rabbit_log_shovel:debug("Shovel ~ts connected to source", [human_readable_name(maps:get(name, Config))]),

deps/rabbitmq_shovel_management/BUILD.bazel

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -100,16 +100,7 @@ rabbitmq_integration_suite(
100100
)
101101

102102
rabbitmq_suite(
103-
name = "rabbit_shovel_mgmt_SUITE",
104-
deps = [
105-
"//deps/rabbit_common:erlang_app",
106-
"//deps/rabbitmq_management_agent:erlang_app",
107-
"@meck//:erlang_app",
108-
],
109-
)
110-
111-
rabbitmq_suite(
112-
name = "rabbit_shovel_mgmt_util_SUITE",
103+
name = "unit_SUITE",
113104
deps = [
114105
"//deps/rabbit_common:erlang_app",
115106
"//deps/rabbitmq_shovel:erlang_app",

deps/rabbitmq_shovel_management/app.bzl

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -99,19 +99,10 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
9999
deps = ["//deps/rabbit_common:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
100100
)
101101
erlang_bytecode(
102-
name = "rabbit_shovel_mgmt_SUITE_beam_files",
102+
name = "unit_SUITE_beam_files",
103103
testonly = True,
104-
srcs = ["test/rabbit_shovel_mgmt_SUITE.erl"],
105-
outs = ["test/rabbit_shovel_mgmt_SUITE.beam"],
106-
app_name = "rabbitmq_shovel_management",
107-
erlc_opts = "//:test_erlc_opts",
108-
deps = ["//deps/rabbit_common:erlang_app", "//deps/rabbitmq_management_agent:erlang_app"],
109-
)
110-
erlang_bytecode(
111-
name = "rabbit_shovel_mgmt_util_SUITE_beam_files",
112-
testonly = True,
113-
srcs = ["test/rabbit_shovel_mgmt_util_SUITE.erl"],
114-
outs = ["test/rabbit_shovel_mgmt_util_SUITE.beam"],
104+
srcs = ["test/unit_SUITE.erl"],
105+
outs = ["test/unit_SUITE.beam"],
115106
app_name = "rabbitmq_shovel_management",
116107
erlc_opts = "//:test_erlc_opts",
117108
)

deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt.erl

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,15 @@ resource_exists(ReqData, Context) ->
4848
%% Deleting or restarting a shovel
4949
case get_shovel_node(VHost, Name, ReqData, Context) of
5050
undefined ->
51-
rabbit_log:error("Shovel with the name '~ts' was not found on virtual host '~ts'",
51+
rabbit_log:error("Shovel with the name '~ts' was not found on virtual host '~ts'. "
52+
"It may be failing to connect and report its status.",
5253
[Name, VHost]),
53-
false;
54+
case is_restart(ReqData) of
55+
true -> false;
56+
%% this is a deletion attempt, it can continue and idempotently try to
57+
%% delete the shovel
58+
false -> true
59+
end;
5460
_ ->
5561
true
5662
end
@@ -73,9 +79,17 @@ delete_resource(ReqData, #context{user = #user{username = Username}}=Context) ->
7379
Name ->
7480
case get_shovel_node(VHost, Name, ReqData, Context) of
7581
undefined -> rabbit_log:error("Could not find shovel data for shovel '~ts' in vhost: '~ts'", [Name, VHost]),
76-
false;
82+
case is_restart(ReqData) of
83+
true ->
84+
false;
85+
%% this is a deletion attempt
86+
false ->
87+
%% if we do not know the node, use the local one
88+
try_delete(node(), VHost, Name, Username),
89+
true
90+
end;
7791
Node ->
78-
%% We must distinguish between a delete and restart
92+
%% We must distinguish between a delete and a restart
7993
case is_restart(ReqData) of
8094
true ->
8195
rabbit_log:info("Asked to restart shovel '~ts' in vhost '~ts' on node '~s'", [Name, VHost, Node]),
@@ -91,17 +105,8 @@ delete_resource(ReqData, #context{user = #user{username = Username}}=Context) ->
91105
end;
92106

93107
_ ->
94-
rabbit_log:info("Asked to delete shovel '~ts' in vhost '~ts' on node '~s'", [Name, VHost, Node]),
95-
try erpc:call(Node, rabbit_shovel_util, delete_shovel, [VHost, Name, Username], ?SHOVEL_CALLS_TIMEOUT_MS) of
96-
ok -> true;
97-
{error, not_found} ->
98-
rabbit_log:error("Could not find shovel data for shovel '~s' in vhost: '~s'", [Name, VHost]),
99-
false
100-
catch _:Reason ->
101-
rabbit_log:error("Failed to delete shovel '~s' on vhost '~s', reason: ~p",
102-
[Name, VHost, Reason]),
103-
false
104-
end
108+
try_delete(Node, VHost, Name, Username),
109+
true
105110

106111
end
107112
end
@@ -150,3 +155,18 @@ find_matching_shovel(VHost, Name, Shovels) ->
150155
_ ->
151156
undefined
152157
end.
158+
159+
-spec try_delete(node(), vhost:name(), any(), rabbit_types:username()) -> boolean().
160+
try_delete(Node, VHost, Name, Username) ->
161+
rabbit_log:info("Asked to delete shovel '~ts' in vhost '~ts' on node '~s'", [Name, VHost, Node]),
162+
%% this will clear the runtime parameter, the ultimate way of deleting a dynamic Shovel eventually. MK.
163+
try erpc:call(Node, rabbit_shovel_util, delete_shovel, [VHost, Name, Username], ?SHOVEL_CALLS_TIMEOUT_MS) of
164+
ok -> true;
165+
{error, not_found} ->
166+
rabbit_log:error("Could not find shovel data for shovel '~s' in vhost: '~s'", [Name, VHost]),
167+
false
168+
catch _:Reason ->
169+
rabbit_log:error("Failed to delete shovel '~s' on vhost '~s', reason: ~p",
170+
[Name, VHost, Reason]),
171+
false
172+
end.

0 commit comments

Comments
 (0)