Skip to content

Commit 270c43f

Browse files
Merge pull request #14637 from rabbitmq/issue-14623
Shovels: fix shovel status and deletion of failed shovels
2 parents 6f71b54 + c9697a6 commit 270c43f

6 files changed

+116
-17
lines changed

deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ run([Name], #{node := Node, vhost := VHost, force := Force}) ->
6868
true -> ?INTERNAL_USER;
6969
false -> 'Elixir.RabbitMQ.CLI.Core.Helpers':cli_acting_user()
7070
end,
71-
7271
case rabbit_misc:rpc_call(Node, rabbit_shovel_status, cluster_status_with_nodes, []) of
7372
{badrpc, _} = Error ->
7473
Error;
@@ -82,14 +81,15 @@ run([Name], #{node := Node, vhost := VHost, force := Force}) ->
8281
try_force_removing(Node, VHost, Name, ActingUser),
8382
{error, rabbit_data_coercion:to_binary(ErrMsg)};
8483
{{_Name, _VHost}, _Type, {_State, Opts}, _Metrics, _Timestamp} ->
85-
delete_shovel(ErrMsg, VHost, Name, ActingUser, Opts, Node);
84+
HostingNode = proplists:get_value(node, Opts, Node),
85+
delete_shovel(ErrMsg, VHost, Name, ActingUser, HostingNode, Node);
8686
{{_Name, _VHost}, _Type, {_State, Opts}, _Timestamp} ->
87-
delete_shovel(ErrMsg, VHost, Name, ActingUser, Opts, Node)
87+
HostingNode = proplists:get_value(node, Opts, Node),
88+
delete_shovel(ErrMsg, VHost, Name, ActingUser, HostingNode, Node)
8889
end
8990
end.
9091

91-
delete_shovel(ErrMsg, VHost, Name, ActingUser, Opts, Node) ->
92-
{_, HostingNode} = lists:keyfind(node, 1, Opts),
92+
delete_shovel(ErrMsg, VHost, Name, ActingUser, HostingNode, Node) ->
9393
case rabbit_misc:rpc_call(
9494
HostingNode, rabbit_shovel_util, delete_shovel, [VHost, Name, ActingUser]) of
9595
{badrpc, _} = Error ->

deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartShovelCommand.erl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,15 @@ run([Name], #{node := Node, vhost := VHost}) ->
6363
undefined ->
6464
{error, rabbit_data_coercion:to_binary(ErrMsg)};
6565
{{_Name, _VHost}, _Type, {_State, Opts}, _Metrics, _Timestamp} ->
66-
restart_shovel(ErrMsg, Name, VHost, Opts);
66+
HostingNode = proplists:get_value(node, Opts, Node),
67+
restart_shovel(ErrMsg, Name, VHost, HostingNode);
6768
{{_Name, _VHost}, _Type, {_State, Opts}, _Timestamp} ->
68-
restart_shovel(ErrMsg, Name, VHost, Opts)
69+
HostingNode = proplists:get_value(node, Opts, Node),
70+
restart_shovel(ErrMsg, Name, VHost, HostingNode)
6971
end
7072
end.
7173

72-
restart_shovel(ErrMsg, Name, VHost, Opts) ->
73-
{_, HostingNode} = lists:keyfind(node, 1, Opts),
74+
restart_shovel(ErrMsg, Name, VHost, HostingNode) ->
7475
case rabbit_misc:rpc_call(
7576
HostingNode, rabbit_shovel_util, restart_shovel, [VHost, Name]) of
7677
{badrpc, _} = Error ->

deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,10 @@ notify_clear(VHost, <<"shovel">>, Name, _Username) ->
8989
OpMode = rabbit_shovel_operating_mode:operating_mode(),
9090
case OpMode of
9191
standard ->
92-
rabbit_shovel_dyn_worker_sup_sup:stop_child({VHost, Name});
92+
rabbit_shovel_dyn_worker_sup_sup:stop_child({VHost, Name}),
93+
%% Only necessary for shovels stuck in a restart loop, as no
94+
%% process is running the terminate won't be called
95+
rabbit_shovel_status:remove({VHost, Name});
9396
_Other ->
9497
?LOG_DEBUG("Shovel: ignoring a cleared runtime parameter, operating mode: ~ts", [OpMode])
9598
end.

deps/rabbitmq_shovel/src/rabbit_shovel_status.erl

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@
4242
-type info() :: starting
4343
| {running, proplists:proplist()}
4444
| {terminated, term()}.
45+
-type info_with_node() :: starting
46+
| {running, proplists:proplist()}
47+
| {terminated, proplists:proplist()}.
4548
-type blocked_status() :: running | flow | blocked.
4649
-type shovel_status() :: blocked_status() | ignore.
4750

@@ -52,7 +55,7 @@
5255
pending := rabbit_types:option(non_neg_integer()),
5356
forwarded := rabbit_types:option(non_neg_integer())
5457
} | #{}.
55-
-type status_tuple_41x() :: {name(), type(), info(), metrics(), calendar:datetime()}.
58+
-type status_tuple_41x() :: {name(), type(), info() | info_with_node(), metrics(), calendar:datetime()}.
5659
-type status_tuple_40x_and_older() :: {name(), type(), info(), calendar:datetime()}.
5760
-type status_tuple() :: status_tuple_41x() | status_tuple_40x_and_older().
5861

@@ -216,7 +219,8 @@ inject_node_info(Node, Shovels) ->
216219
{Name, Type, {State, Opts}, Metrics, Timestamp};
217220
%% terminated
218221
({Name, Type, {terminated, Reason}, Metrics, Timestamp}) ->
219-
{Name, Type, {terminated, Reason}, Metrics, Timestamp};
222+
{Name, Type, {terminated, [{node, Node},
223+
{reason, Reason}]}, Metrics, Timestamp};
220224
%% running
221225
({Name, Type, {State, Opts}, Metrics, Timestamp}) ->
222226
Opts1 = Opts ++ [{node, Node}],

deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ handle_cast(init, State = #state{config = Config0}) ->
8282
catch E:R ->
8383
?LOG_ERROR("Shovel ~ts could not connect to source: ~p ~p",
8484
[human_readable_name(maps:get(name, Config0)), E, R]),
85-
{stop, shutdown, State}
85+
{stop, {shutdown, inbound_conn_failed}, State}
8686
end;
8787
handle_cast(connect_dest, State = #state{config = Config0}) ->
8888
try rabbit_shovel_behaviour:connect_dest(Config0) of
@@ -93,7 +93,7 @@ handle_cast(connect_dest, State = #state{config = Config0}) ->
9393
catch E:R ->
9494
?LOG_ERROR("Shovel ~ts could not connect to destination: ~p ~p",
9595
[human_readable_name(maps:get(name, Config0)), E, R]),
96-
{stop, shutdown, State}
96+
{stop, {shutdown, outbond_conn_failed}, State}
9797
end;
9898
handle_cast(init_shovel, State = #state{config = Config}) ->
9999
%% Don't trap exits until we have established connections so that
@@ -227,6 +227,20 @@ terminate({{shutdown, {server_initiated_close, Code, Reason}}, _}, State = #stat
227227
{terminated, "needed a restart"}),
228228
close_connections(State),
229229
ok;
230+
terminate({shutdown, outbond_conn_failed}, State = #state{name = Name}) ->
231+
?LOG_ERROR("Shovel ~ts is stopping because if failed to connect to destination",
232+
[human_readable_name(Name)]),
233+
rabbit_shovel_status:report(State#state.name, State#state.type,
234+
{terminated, "failed to connect to destination"}),
235+
close_connections(State),
236+
ok;
237+
terminate({shutdown, inbound_conn_failed}, State = #state{name = Name}) ->
238+
?LOG_ERROR("Shovel ~ts is stopping because it failed to connect to source",
239+
[human_readable_name(Name)]),
240+
rabbit_shovel_status:report(State#state.name, State#state.type,
241+
{terminated, "failed to connect to source"}),
242+
close_connections(State),
243+
ok;
230244
terminate(Reason, State = #state{name = Name}) ->
231245
?LOG_ERROR("Shovel ~ts is stopping, reason: ~tp", [human_readable_name(Name), Reason]),
232246
rabbit_shovel_status:report(State#state.name, State#state.type,

deps/rabbitmq_shovel/test/delete_shovel_command_SUITE.erl

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
-include_lib("amqp_client/include/amqp_client.hrl").
1111
-include_lib("stdlib/include/assert.hrl").
12+
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
1213

1314
-compile(export_all).
1415

@@ -26,11 +27,14 @@ groups() ->
2627
delete_not_found,
2728
delete,
2829
delete_internal,
29-
delete_internal_owner
30+
delete_internal_owner,
31+
delete_invalid_uri,
32+
delete_non_existent_dest_address
3033
]},
3134
{cluster_size_2, [], [
32-
clear_param_on_different_node
33-
]}
35+
clear_param_on_different_node,
36+
delete_invalid_uri_another_node
37+
]}
3438
].
3539

3640
%% -------------------------------------------------------------------
@@ -64,6 +68,15 @@ end_per_group(_Group, Config) ->
6468
rabbit_ct_client_helpers:teardown_steps() ++
6569
rabbit_ct_broker_helpers:teardown_steps()).
6670

71+
init_per_testcase(Testcase, Config) when Testcase == delete_invalid_uri_another_node ->
72+
case rabbit_ct_helpers:is_mixed_versions(Config) of
73+
true ->
74+
%% The code changes to delete shovel are compatible with older versions, however
75+
%% older versions fail to delete invalid shovels
76+
{skip, "not mixed versions compatible"};
77+
false ->
78+
rabbit_ct_helpers:testcase_started(Config, Testcase)
79+
end;
6780
init_per_testcase(Testcase, Config) ->
6881
rabbit_ct_helpers:testcase_started(Config, Testcase).
6982

@@ -148,3 +161,67 @@ clear_param_on_different_node(Config) ->
148161
status, []), "Deleted shovel still reported on node A"),
149162
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, B, rabbit_shovel_status,
150163
status, []), "Deleted shovel still reported on node B").
164+
165+
delete_invalid_uri(Config) ->
166+
ok = rabbit_ct_broker_helpers:rpc(
167+
Config, 0, rabbit_runtime_parameters, set,
168+
[<<"/">>, <<"shovel">>, <<"myshovel">>,
169+
[{<<"src-protocol">>, <<"amqp091">>},
170+
{<<"src-uri">>, <<"amqp://foo">>},
171+
{<<"src-queue">>, <<"src">>},
172+
{<<"dest-protocol">>, <<"amqp091">>},
173+
{<<"dest-uri">>, shovel_test_utils:make_uri(Config, 0)},
174+
{<<"dest-queue">>, <<"dest">>}],
175+
none]),
176+
?awaitMatch([{{<<"/">>, <<"myshovel">>}, dynamic, {terminated, _}, _, _}],
177+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status,
178+
status, []),
179+
45_000),
180+
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
181+
Opts = #{node => A, vhost => <<"/">>, force => false},
182+
ok = ?CMD:run([<<"myshovel">>], Opts),
183+
[] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status,
184+
status, []).
185+
186+
delete_non_existent_dest_address(Config) ->
187+
Uri = shovel_test_utils:make_uri(Config, 0),
188+
ok = rabbit_ct_broker_helpers:rpc(
189+
Config, 0, rabbit_runtime_parameters, set,
190+
[<<"/">>, <<"shovel">>, <<"myshovel">>,
191+
[{<<"src-protocol">>, <<"amqp091">>},
192+
{<<"src-uri">>, Uri},
193+
{<<"src-queue">>, <<"src">>},
194+
{<<"dest-protocol">>, <<"amqp10">>},
195+
{<<"dest-uri">>, Uri},
196+
{<<"dest-address">>, <<"/queues/q2">>}],
197+
none]),
198+
?awaitMatch([{{<<"/">>, <<"myshovel">>}, dynamic, {terminated, _}, _, _}],
199+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status,
200+
status, []),
201+
45_000),
202+
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
203+
Opts = #{node => A, vhost => <<"/">>, force => false},
204+
ok = ?CMD:run([<<"myshovel">>], Opts),
205+
[] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status,
206+
status, []).
207+
208+
delete_invalid_uri_another_node(Config) ->
209+
ok = rabbit_ct_broker_helpers:rpc(
210+
Config, 1, rabbit_runtime_parameters, set,
211+
[<<"/">>, <<"shovel">>, <<"myshovel">>,
212+
[{<<"src-protocol">>, <<"amqp091">>},
213+
{<<"src-uri">>, <<"amqp://foo">>},
214+
{<<"src-queue">>, <<"src">>},
215+
{<<"dest-protocol">>, <<"amqp091">>},
216+
{<<"dest-uri">>, shovel_test_utils:make_uri(Config, 0)},
217+
{<<"dest-queue">>, <<"dest">>}],
218+
none]),
219+
?awaitMatch([{{<<"/">>, <<"myshovel">>}, dynamic, {terminated, _}, _, _}],
220+
rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_shovel_status,
221+
status, []),
222+
45_000),
223+
[A, _B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
224+
Opts = #{node => A, vhost => <<"/">>, force => false},
225+
ok = ?CMD:run([<<"myshovel">>], Opts),
226+
[] = rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_shovel_status,
227+
status, []).

0 commit comments

Comments
 (0)