Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions deps/rabbitmq_shovel/include/rabbit_shovel.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-define(SHOVEL_APP, rabbitmq_shovel).

-record(endpoint,
{uris,
resource_declaration
Expand Down
4 changes: 4 additions & 0 deletions deps/rabbitmq_shovel/priv/schema/rabbitmq_shovel.schema
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
%% RabbitMQ Shovel plugin
%% ----------------------------------------------------------------------------

{mapping, "shovel.operating_mode", "rabbitmq_shovel.operating_mode", [
[{datatype, {enum, [standard, alternative, library]}}]
]}.

{mapping, "shovel.topology.predeclared", "rabbitmq_shovel.topology.predeclared", [
[{datatype, {enum, [true, false]}}]
]}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,14 @@ start_link() ->
{ok, Pid0} -> Pid0;
{error, {already_started, Pid0}} -> Pid0
end,
Shovels = rabbit_runtime_parameters:list_component(<<"shovel">>),
IsStandard = rabbit_shovel_operating_mode:is_standard(),
Shovels = case IsStandard of
false ->
%% when operating in a non-standard mode, do not start any shovels
[];
true ->
rabbit_runtime_parameters:list_component(<<"shovel">>)
end,
[start_child({pget(vhost, Shovel), pget(name, Shovel)},
pget(value, Shovel)) || Shovel <- Shovels],
{ok, Pid}.
Expand Down
34 changes: 34 additions & 0 deletions deps/rabbitmq_shovel/src/rabbit_shovel_operating_mode.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(rabbit_shovel_operating_mode).

-include("rabbit_shovel.hrl").

-export([
operating_mode/0,
is_standard/0,
is_alternative/0
]).

-type operating_mode() :: 'standard' | atom().

%%
%% API
%%

-spec operating_mode() -> operating_mode().
operating_mode() ->
application:get_env(?SHOVEL_APP, operating_mode, standard).

-spec is_standard() -> boolean().
is_standard() ->
operating_mode() =:= standard.

-spec is_alternative() -> boolean().
is_alternative() ->
operating_mode() =/= standard.
28 changes: 24 additions & 4 deletions deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,20 @@
{enables, recovery}]}).

register() ->
rabbit_registry:register(runtime_parameter, <<"shovel">>, ?MODULE).
case rabbit_shovel_operating_mode:is_standard() of
true ->
rabbit_registry:register(runtime_parameter, <<"shovel">>, ?MODULE);
false ->
?LOG_DEBUG("Shovel: skipping runtime parameter registration, operating mode: ~ts", [rabbit_shovel_operating_mode:operating_mode()])
end.

unregister() ->
rabbit_registry:unregister(runtime_parameter, <<"shovel">>).
case rabbit_shovel_operating_mode:is_standard() of
true ->
rabbit_registry:unregister(runtime_parameter, <<"shovel">>);
false ->
?LOG_DEBUG("Shovel: skipping runtime parameter deregistration, operating mode: ~ts", [rabbit_shovel_operating_mode:operating_mode()])
end.

validate(_VHost, <<"shovel">>, Name, Def0, User) ->
Def = rabbit_data_coercion:to_proplist(Def0),
Expand All @@ -65,10 +75,20 @@ pget2(K1, K2, Defs) -> case {pget(K1, Defs), pget(K2, Defs)} of
end.

notify(VHost, <<"shovel">>, Name, Definition, _Username) ->
rabbit_shovel_dyn_worker_sup_sup:adjust({VHost, Name}, Definition).
case rabbit_shovel_operating_mode:is_standard() of
true ->
rabbit_shovel_dyn_worker_sup_sup:adjust({VHost, Name}, Definition);
false ->
?LOG_DEBUG("Shovel: ignoring a runtime parameter update, operating mode: ~ts", [rabbit_shovel_operating_mode:operating_mode()])
end.

notify_clear(VHost, <<"shovel">>, Name, _Username) ->
rabbit_shovel_dyn_worker_sup_sup:stop_child({VHost, Name}).
case rabbit_shovel_operating_mode:is_standard() of
true ->
rabbit_shovel_dyn_worker_sup_sup:stop_child({VHost, Name});
false ->
?LOG_DEBUG("Shovel: ignoring a cleared runtime parameter, operating mode: ~ts", [rabbit_shovel_operating_mode:operating_mode()])
end.

%%----------------------------------------------------------------------------

Expand Down
12 changes: 9 additions & 3 deletions deps/rabbitmq_shovel/src/rabbit_shovel_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ start_link() ->
end.

init([Configurations]) ->
IsStandard = rabbit_shovel_operating_mode:is_standard(),
StaticShovelSpecs = make_child_specs(IsStandard, Configurations),
Len = dict:size(Configurations),
ChildSpecs = [
#{
Expand All @@ -39,11 +41,15 @@ init([Configurations]) ->
type => supervisor,
modules => [rabbit_shovel_dyn_worker_sup_sup]
}
| make_child_specs(Configurations)
| StaticShovelSpecs
],
{ok, {#{strategy => one_for_one, intensity => 2 * Len, period => 2}, ChildSpecs}}.
Opts = #{strategy => one_for_one, intensity => 2 * Len, period => 2},
{ok, {Opts, ChildSpecs}}.

make_child_specs(Configurations) ->
make_child_specs(false = _StandardOperatingMode, _Configurations) ->
%% when operating in a non-standard mode, do not start any shovels
[];
make_child_specs(true, Configurations) ->
dict:fold(
fun (ShovelName, ShovelConfig, Acc) ->
[
Expand Down
Loading