Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions deps/rabbitmq_ct_helpers/src/rabbit_mgmt_test_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ http_delete(Config, Path, User, Pass, CodeExp) ->
assert_code(CodeExp, CodeAct, "DELETE", Path, ResBody),
decode(CodeExp, Headers, ResBody).

http_get_fails(Config, Path) ->
{error, {failed_connect, _}} = req(Config, get, Path, []).

format_for_upload(none) ->
<<"">>;
format_for_upload(List) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,12 @@ run([Name], #{node := Node, vhost := VHost}) ->
Error;
Xs when is_list(Xs) ->
ErrMsg = rabbit_misc:format("Shovel with the given name was not found "
"on the target node '~ts' and / or virtual host '~ts'",
"on the target node '~ts' and/or virtual host '~ts'. "
"It may be failing to connect and report its state, will delete its runtime parameter...",
[Node, VHost]),
case rabbit_shovel_status:find_matching_shovel(VHost, Name, Xs) of
undefined ->
try_force_removing(Node, VHost, Name, ActingUser),
{error, rabbit_data_coercion:to_binary(ErrMsg)};
Match ->
{{_Name, _VHost}, _Type, {_State, Opts}, _Timestamp} = Match,
Expand All @@ -83,10 +85,14 @@ run([Name], #{node := Node, vhost := VHost}) ->
Error;
{error, not_found} ->
ErrMsg = rabbit_misc:format("Shovel with the given name was not found "
"on the target node '~ts' and / or virtual host '~ts'",
"on the target node '~ts' and/or virtual host '~ts'. "
"It may be failing to connect and report its state, will delete its runtime parameter...",
[Node, VHost]),
try_force_removing(HostingNode, VHost, Name, ActingUser),
{error, rabbit_data_coercion:to_binary(ErrMsg)};
ok -> ok
ok ->
_ = try_clearing_runtime_parameter(Node, VHost, Name, ActingUser),
ok
end
end
end.
Expand All @@ -99,3 +105,16 @@ aliases() ->

output(E, _Opts) ->
'Elixir.RabbitMQ.CLI.DefaultOutput':output(E).

try_force_removing(Node, VHost, ShovelName, ActingUser) ->
%% Deleting the runtime parameter will cause the dynamic Shovel's child tree to be stopped eventually
%% regardless of the node it is hosted on. MK.
_ = try_clearing_runtime_parameter(Node, VHost, ShovelName, ActingUser),
%% These are best effort attempts to delete the Shovel. Clearing the parameter does all the heavy lifting. MK.
_ = try_stopping_child_process(Node, VHost, ShovelName).

try_clearing_runtime_parameter(Node, VHost, ShovelName, ActingUser) ->
_ = rabbit_misc:rpc_call(Node, rabbit_runtime_parameters, clear, [VHost, <<"shovel">>, ShovelName, ActingUser]).

try_stopping_child_process(Node, VHost, ShovelName) ->
_ = rabbit_misc:rpc_call(Node, rabbit_shovel_dyn_worker_sup_sup, stop_and_delete_child, [{VHost, ShovelName}]).
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ fmt_status({'running', Proplist}, Map) ->
fmt_status('starting' = St, Map) ->
Map#{state => St,
source => <<>>,
source_protocol => <<>>,
destination => <<>>,
destination_protocol => <<>>,
termination_reason => <<>>};
fmt_status({'terminated' = St, Reason}, Map) ->
Map#{state => St,
Expand Down
37 changes: 30 additions & 7 deletions deps/rabbitmq_shovel/src/rabbit_shovel_status.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
status/0,
lookup/1,
cluster_status/0,
cluster_status_with_nodes/0]).
cluster_status_with_nodes/0,
get_status_table/0
]).
-export([inject_node_info/2, find_matching_shovel/3]).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
Expand Down Expand Up @@ -93,6 +95,10 @@ cluster_status_with_nodes() ->
lookup(Name) ->
gen_server:call(?SERVER, {lookup, Name}, infinity).

-spec get_status_table() -> ok.
get_status_table() ->
gen_server:call(?SERVER, get_status_table).

init([]) ->
?ETS_NAME = ets:new(?ETS_NAME,
[named_table, {keypos, #entry.name}, private]),
Expand All @@ -114,11 +120,20 @@ handle_call({lookup, Name}, _From, State) ->
{timestamp, Entry#entry.timestamp}];
[] -> not_found
end,
{reply, Link, State}.
{reply, Link, State};

handle_call(get_status_table, _From, State) ->
Entries = ets:tab2list(?ETS_NAME),
{reply, Entries, State}.

handle_cast({report, Name, Type, Info, Timestamp}, State) ->
true = ets:insert(?ETS_NAME, #entry{name = Name, type = Type, info = Info,
timestamp = Timestamp}),
Entry = #entry{
name = Name,
type = Type,
info = Info,
timestamp = Timestamp
},
true = ets:insert(?ETS_NAME, Entry),
rabbit_event:notify(shovel_worker_status,
split_name(Name) ++ split_status(Info)),
{noreply, State};
Expand Down Expand Up @@ -159,9 +174,17 @@ code_change(_OldVsn, State, _Extra) ->
-spec inject_node_info(node(), [status_tuple()]) -> [status_tuple()].
inject_node_info(Node, Shovels) ->
lists:map(
fun({Name, Type, {State, Opts}, Timestamp}) ->
Opts1 = Opts ++ [{node, Node}],
{Name, Type, {State, Opts1}, Timestamp}
%% starting
fun({Name, Type, State, Timestamp}) when is_atom(State) ->
Opts = [{node, Node}],
{Name, Type, {State, Opts}, Timestamp};
%% terminated
({Name, Type, {terminated, Reason}, Timestamp}) ->
{Name, Type, {terminated, Reason}, Timestamp};
%% running
({Name, Type, {State, Opts}, Timestamp}) ->
Opts1 = Opts ++ [{node, Node}],
{Name, Type, {State, Opts1}, Timestamp}
end, Shovels).

-spec find_matching_shovel(rabbit_types:vhost(), binary(), [status_tuple()]) -> status_tuple() | undefined.
Expand Down
5 changes: 5 additions & 0 deletions deps/rabbitmq_shovel/src/rabbit_shovel_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,13 @@ add_timestamp_header(Props = #'P_basic'{headers = Headers}) ->
delete_shovel(VHost, Name, ActingUser) ->
case rabbit_shovel_status:lookup({VHost, Name}) of
not_found ->
%% Follow the user's obvious intent and delete the runtime parameter just in case the Shovel is in
%% a starting-failing-restarting loop. MK.
rabbit_log:info("Will delete runtime parameters of shovel '~ts' in virtual host '~ts'", [Name, VHost]),
ok = rabbit_runtime_parameters:clear(VHost, <<"shovel">>, Name, ActingUser),
{error, not_found};
_Obj ->
rabbit_log:info("Will delete runtime parameters of shovel '~ts' in virtual host '~ts'", [Name, VHost]),
ok = rabbit_runtime_parameters:clear(VHost, <<"shovel">>, Name, ActingUser)
end.

Expand Down
3 changes: 3 additions & 0 deletions deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ handle_call(_Msg, _From, State) ->
{noreply, State}.

handle_cast(init, State = #state{config = Config0}) ->
rabbit_log_shovel:debug("Shovel ~ts is reporting its status", [human_readable_name(State#state.name)]),
rabbit_shovel_status:report(State#state.name, State#state.type, starting),
rabbit_log_shovel:info("Shovel ~ts will now try to connect...", [human_readable_name(State#state.name)]),
try rabbit_shovel_behaviour:connect_source(Config0) of
Config ->
rabbit_log_shovel:debug("Shovel ~ts connected to source", [human_readable_name(maps:get(name, Config))]),
Expand Down
11 changes: 1 addition & 10 deletions deps/rabbitmq_shovel_management/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,7 @@ rabbitmq_integration_suite(
)

rabbitmq_suite(
name = "rabbit_shovel_mgmt_SUITE",
deps = [
"//deps/rabbit_common:erlang_app",
"//deps/rabbitmq_management_agent:erlang_app",
"@meck//:erlang_app",
],
)

rabbitmq_suite(
name = "rabbit_shovel_mgmt_util_SUITE",
name = "unit_SUITE",
deps = [
"//deps/rabbit_common:erlang_app",
"//deps/rabbitmq_shovel:erlang_app",
Expand Down
15 changes: 3 additions & 12 deletions deps/rabbitmq_shovel_management/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,10 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
deps = ["//deps/rabbit_common:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
erlang_bytecode(
name = "rabbit_shovel_mgmt_SUITE_beam_files",
name = "unit_SUITE_beam_files",
testonly = True,
srcs = ["test/rabbit_shovel_mgmt_SUITE.erl"],
outs = ["test/rabbit_shovel_mgmt_SUITE.beam"],
app_name = "rabbitmq_shovel_management",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/rabbit_common:erlang_app", "//deps/rabbitmq_management_agent:erlang_app"],
)
erlang_bytecode(
name = "rabbit_shovel_mgmt_util_SUITE_beam_files",
testonly = True,
srcs = ["test/rabbit_shovel_mgmt_util_SUITE.erl"],
outs = ["test/rabbit_shovel_mgmt_util_SUITE.beam"],
srcs = ["test/unit_SUITE.erl"],
outs = ["test/unit_SUITE.beam"],
app_name = "rabbitmq_shovel_management",
erlc_opts = "//:test_erlc_opts",
)
50 changes: 35 additions & 15 deletions deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,15 @@ resource_exists(ReqData, Context) ->
%% Deleting or restarting a shovel
case get_shovel_node(VHost, Name, ReqData, Context) of
undefined ->
rabbit_log:error("Shovel with the name '~ts' was not found on virtual host '~ts'",
rabbit_log:error("Shovel with the name '~ts' was not found on virtual host '~ts'. "
"It may be failing to connect and report its status.",
[Name, VHost]),
false;
case is_restart(ReqData) of
true -> false;
%% this is a deletion attempt, it can continue and idempotently try to
%% delete the shovel
false -> true
end;
_ ->
true
end
Expand All @@ -73,9 +79,17 @@ delete_resource(ReqData, #context{user = #user{username = Username}}=Context) ->
Name ->
case get_shovel_node(VHost, Name, ReqData, Context) of
undefined -> rabbit_log:error("Could not find shovel data for shovel '~ts' in vhost: '~ts'", [Name, VHost]),
false;
case is_restart(ReqData) of
true ->
false;
%% this is a deletion attempt
false ->
%% if we do not know the node, use the local one
try_delete(node(), VHost, Name, Username),
true
end;
Node ->
%% We must distinguish between a delete and restart
%% We must distinguish between a delete and a restart
case is_restart(ReqData) of
true ->
rabbit_log:info("Asked to restart shovel '~ts' in vhost '~ts' on node '~s'", [Name, VHost, Node]),
Expand All @@ -91,17 +105,8 @@ delete_resource(ReqData, #context{user = #user{username = Username}}=Context) ->
end;

_ ->
rabbit_log:info("Asked to delete shovel '~ts' in vhost '~ts' on node '~s'", [Name, VHost, Node]),
try erpc:call(Node, rabbit_shovel_util, delete_shovel, [VHost, Name, Username], ?SHOVEL_CALLS_TIMEOUT_MS) of
ok -> true;
{error, not_found} ->
rabbit_log:error("Could not find shovel data for shovel '~s' in vhost: '~s'", [Name, VHost]),
false
catch _:Reason ->
rabbit_log:error("Failed to delete shovel '~s' on vhost '~s', reason: ~p",
[Name, VHost, Reason]),
false
end
try_delete(Node, VHost, Name, Username),
true

end
end
Expand Down Expand Up @@ -150,3 +155,18 @@ find_matching_shovel(VHost, Name, Shovels) ->
_ ->
undefined
end.

-spec try_delete(node(), vhost:name(), any(), rabbit_types:username()) -> boolean().
try_delete(Node, VHost, Name, Username) ->
rabbit_log:info("Asked to delete shovel '~ts' in vhost '~ts' on node '~s'", [Name, VHost, Node]),
%% this will clear the runtime parameter, the ultimate way of deleting a dynamic Shovel eventually. MK.
try erpc:call(Node, rabbit_shovel_util, delete_shovel, [VHost, Name, Username], ?SHOVEL_CALLS_TIMEOUT_MS) of
ok -> true;
{error, not_found} ->
rabbit_log:error("Could not find shovel data for shovel '~s' in vhost: '~s'", [Name, VHost]),
false
catch _:Reason ->
rabbit_log:error("Failed to delete shovel '~s' on vhost '~s', reason: ~p",
[Name, VHost, Reason]),
false
end.
Loading