From 3043dc621f9f18c46a9d709069f2e648cd32824c Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 25 Aug 2025 13:37:49 -0400 Subject: [PATCH 1/3] Shovel: introduce operating modes Sometimes you want a plugin to act as a library and not an application. That is, for its modules to be available at compile time and on a running node but, say, the actual runtime parameter handling and supervision of shovels to be handled by another plugin. Since we do not currently have a concept of "library plugins" or "library dependencies", this approach demonstrates one example of how some plugins can be used as libraries. --- .../rabbitmq_shovel/include/rabbit_shovel.hrl | 2 ++ .../priv/schema/rabbitmq_shovel.schema | 4 +++ .../src/rabbit_shovel_dyn_worker_sup_sup.erl | 9 ++++- .../src/rabbit_shovel_operating_mode.erl | 34 +++++++++++++++++++ .../src/rabbit_shovel_parameters.erl | 28 ++++++++++++--- .../rabbitmq_shovel/src/rabbit_shovel_sup.erl | 12 +++++-- 6 files changed, 81 insertions(+), 8 deletions(-) create mode 100644 deps/rabbitmq_shovel/src/rabbit_shovel_operating_mode.erl diff --git a/deps/rabbitmq_shovel/include/rabbit_shovel.hrl b/deps/rabbitmq_shovel/include/rabbit_shovel.hrl index 0417b51c15d..b6d1645f355 100644 --- a/deps/rabbitmq_shovel/include/rabbit_shovel.hrl +++ b/deps/rabbitmq_shovel/include/rabbit_shovel.hrl @@ -5,6 +5,8 @@ %% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% +-define(SHOVEL_APP, rabbitmq_shovel). + -record(endpoint, {uris, resource_declaration diff --git a/deps/rabbitmq_shovel/priv/schema/rabbitmq_shovel.schema b/deps/rabbitmq_shovel/priv/schema/rabbitmq_shovel.schema index 42815459714..73d2744c006 100644 --- a/deps/rabbitmq_shovel/priv/schema/rabbitmq_shovel.schema +++ b/deps/rabbitmq_shovel/priv/schema/rabbitmq_shovel.schema @@ -2,6 +2,10 @@ %% RabbitMQ Shovel plugin %% ---------------------------------------------------------------------------- +{mapping, "shovel.operating_mode", "rabbitmq_shovel.operating_mode", [ + [{datatype, {enum, [standard, alternative, library]}}] +]}. + {mapping, "shovel.topology.predeclared", "rabbitmq_shovel.topology.predeclared", [ [{datatype, {enum, [true, false]}}] ]}. diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl index 2032f686200..ecd39aef4a7 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl @@ -26,7 +26,14 @@ start_link() -> {ok, Pid0} -> Pid0; {error, {already_started, Pid0}} -> Pid0 end, - Shovels = rabbit_runtime_parameters:list_component(<<"shovel">>), + IsStandard = rabbit_shovel_operating_mode:is_standard(), + Shovels = case IsStandard of + false -> + %% when operating in a non-standard mode, do not start any shovels + []; + true -> + rabbit_runtime_parameters:list_component(<<"shovel">>) + end, [start_child({pget(vhost, Shovel), pget(name, Shovel)}, pget(value, Shovel)) || Shovel <- Shovels], {ok, Pid}. diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_operating_mode.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_operating_mode.erl new file mode 100644 index 00000000000..1f9dca7d8c2 --- /dev/null +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_operating_mode.erl @@ -0,0 +1,34 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rabbit_shovel_operating_mode). + +-include("rabbit_shovel.hrl"). + +-export([ + operating_mode/0, + is_standard/0, + is_alternative/0 +]). + +-type operating_mode() :: 'standard' | atom(). + +%% +%% API +%% + +-spec operating_mode() -> operating_mode(). +operating_mode() -> + application:get_env(?SHOVEL_APP, operating_mode, standard). + +-spec is_standard() -> boolean(). +is_standard() -> + operating_mode() =:= standard. + +-spec is_alternative() -> boolean(). +is_alternative() -> + operating_mode() =/= standard. diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl index a0b751fb76c..acc4eba7cd3 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl @@ -39,10 +39,20 @@ {enables, recovery}]}). register() -> - rabbit_registry:register(runtime_parameter, <<"shovel">>, ?MODULE). + case rabbit_shovel_operating_mode:is_standard() of + true -> + rabbit_registry:register(runtime_parameter, <<"shovel">>, ?MODULE); + false -> + ?LOG_DEBUG("Shovel: skipping runtime parameter registration, operating mode: ~ts", [rabbit_shovel_operating_mode:operating_mode()]) + end. unregister() -> - rabbit_registry:unregister(runtime_parameter, <<"shovel">>). + case rabbit_shovel_operating_mode:is_standard() of + true -> + rabbit_registry:unregister(runtime_parameter, <<"shovel">>); + false -> + ?LOG_DEBUG("Shovel: skipping runtime parameter deregistration, operating mode: ~ts", [rabbit_shovel_operating_mode:operating_mode()]) + end. validate(_VHost, <<"shovel">>, Name, Def0, User) -> Def = rabbit_data_coercion:to_proplist(Def0), @@ -65,10 +75,20 @@ pget2(K1, K2, Defs) -> case {pget(K1, Defs), pget(K2, Defs)} of end. notify(VHost, <<"shovel">>, Name, Definition, _Username) -> - rabbit_shovel_dyn_worker_sup_sup:adjust({VHost, Name}, Definition). + case rabbit_shovel_operating_mode:is_standard() of + true -> + rabbit_shovel_dyn_worker_sup_sup:adjust({VHost, Name}, Definition); + false -> + ?LOG_DEBUG("Shovel: ignoring a runtime parameter update, operating mode: ~ts", [rabbit_shovel_operating_mode:operating_mode()]) + end. notify_clear(VHost, <<"shovel">>, Name, _Username) -> - rabbit_shovel_dyn_worker_sup_sup:stop_child({VHost, Name}). + case rabbit_shovel_operating_mode:is_standard() of + true -> + rabbit_shovel_dyn_worker_sup_sup:stop_child({VHost, Name}); + false -> + ?LOG_DEBUG("Shovel: ignoring a cleared runtime parameter, operating mode: ~ts", [rabbit_shovel_operating_mode:operating_mode()]) + end. %%---------------------------------------------------------------------------- diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_sup.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_sup.erl index 70992760ef8..a82a4539454 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_sup.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_sup.erl @@ -21,6 +21,8 @@ start_link() -> end. init([Configurations]) -> + IsStandard = rabbit_shovel_operating_mode:is_standard(), + StaticShovelSpecs = make_child_specs(IsStandard, Configurations), Len = dict:size(Configurations), ChildSpecs = [ #{ @@ -39,11 +41,15 @@ init([Configurations]) -> type => supervisor, modules => [rabbit_shovel_dyn_worker_sup_sup] } - | make_child_specs(Configurations) + | StaticShovelSpecs ], - {ok, {#{strategy => one_for_one, intensity => 2 * Len, period => 2}, ChildSpecs}}. + Opts = #{strategy => one_for_one, intensity => 2 * Len, period => 2}, + {ok, {Opts, ChildSpecs}}. -make_child_specs(Configurations) -> +make_child_specs(false = _StandardOperatingMode, _Configurations) -> + %% when operating in a non-standard mode, do not start any shovels + []; +make_child_specs(true, Configurations) -> dict:fold( fun (ShovelName, ShovelConfig, Acc) -> [ From dbff137e4a2850db08288c4e8d4384bd4073a834 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 25 Aug 2025 14:14:45 -0400 Subject: [PATCH 2/3] Address review feedback #14425 --- .../src/rabbit_shovel_dyn_worker_sup_sup.erl | 12 +++---- .../src/rabbit_shovel_operating_mode.erl | 12 +------ .../src/rabbit_shovel_parameters.erl | 36 ++++++++++--------- .../rabbitmq_shovel/src/rabbit_shovel_sup.erl | 15 ++++---- 4 files changed, 35 insertions(+), 40 deletions(-) diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl index ecd39aef4a7..630f09b072e 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl @@ -26,13 +26,13 @@ start_link() -> {ok, Pid0} -> Pid0; {error, {already_started, Pid0}} -> Pid0 end, - IsStandard = rabbit_shovel_operating_mode:is_standard(), - Shovels = case IsStandard of - false -> + OpMode = rabbit_shovel_operating_mode:operating_mode(), + Shovels = case OpMode of + standard -> + rabbit_runtime_parameters:list_component(<<"shovel">>); + _Other -> %% when operating in a non-standard mode, do not start any shovels - []; - true -> - rabbit_runtime_parameters:list_component(<<"shovel">>) + [] end, [start_child({pget(vhost, Shovel), pget(name, Shovel)}, pget(value, Shovel)) || Shovel <- Shovels], diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_operating_mode.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_operating_mode.erl index 1f9dca7d8c2..66c4980e278 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_operating_mode.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_operating_mode.erl @@ -10,9 +10,7 @@ -include("rabbit_shovel.hrl"). -export([ - operating_mode/0, - is_standard/0, - is_alternative/0 + operating_mode/0 ]). -type operating_mode() :: 'standard' | atom(). @@ -24,11 +22,3 @@ -spec operating_mode() -> operating_mode(). operating_mode() -> application:get_env(?SHOVEL_APP, operating_mode, standard). - --spec is_standard() -> boolean(). -is_standard() -> - operating_mode() =:= standard. - --spec is_alternative() -> boolean(). -is_alternative() -> - operating_mode() =/= standard. diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl index acc4eba7cd3..36e4b45796b 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl @@ -39,19 +39,21 @@ {enables, recovery}]}). register() -> - case rabbit_shovel_operating_mode:is_standard() of - true -> + OpMode = rabbit_shovel_operating_mode:operating_mode(), + case OpMode of + standard -> rabbit_registry:register(runtime_parameter, <<"shovel">>, ?MODULE); - false -> - ?LOG_DEBUG("Shovel: skipping runtime parameter registration, operating mode: ~ts", [rabbit_shovel_operating_mode:operating_mode()]) + _Other -> + ?LOG_DEBUG("Shovel: skipping runtime parameter registration, operating mode: ~ts", [OpMode]) end. unregister() -> - case rabbit_shovel_operating_mode:is_standard() of - true -> + OpMode = rabbit_shovel_operating_mode:operating_mode(), + case OpMode of + standard -> rabbit_registry:unregister(runtime_parameter, <<"shovel">>); - false -> - ?LOG_DEBUG("Shovel: skipping runtime parameter deregistration, operating mode: ~ts", [rabbit_shovel_operating_mode:operating_mode()]) + _Other -> + ?LOG_DEBUG("Shovel: skipping runtime parameter deregistration, operating mode: ~ts", [OpMode]) end. validate(_VHost, <<"shovel">>, Name, Def0, User) -> @@ -75,19 +77,21 @@ pget2(K1, K2, Defs) -> case {pget(K1, Defs), pget(K2, Defs)} of end. notify(VHost, <<"shovel">>, Name, Definition, _Username) -> - case rabbit_shovel_operating_mode:is_standard() of - true -> + OpMode = rabbit_shovel_operating_mode:operating_mode(), + case OpMode of + standard -> rabbit_shovel_dyn_worker_sup_sup:adjust({VHost, Name}, Definition); - false -> - ?LOG_DEBUG("Shovel: ignoring a runtime parameter update, operating mode: ~ts", [rabbit_shovel_operating_mode:operating_mode()]) + _Other -> + ?LOG_DEBUG("Shovel: ignoring a runtime parameter update, operating mode: ~ts", [OpMode]) end. notify_clear(VHost, <<"shovel">>, Name, _Username) -> - case rabbit_shovel_operating_mode:is_standard() of - true -> + OpMode = rabbit_shovel_operating_mode:operating_mode(), + case OpMode of + standard -> rabbit_shovel_dyn_worker_sup_sup:stop_child({VHost, Name}); - false -> - ?LOG_DEBUG("Shovel: ignoring a cleared runtime parameter, operating mode: ~ts", [rabbit_shovel_operating_mode:operating_mode()]) + _Other -> + ?LOG_DEBUG("Shovel: ignoring a cleared runtime parameter, operating mode: ~ts", [OpMode]) end. %%---------------------------------------------------------------------------- diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_sup.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_sup.erl index a82a4539454..c2ee7925070 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_sup.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_sup.erl @@ -21,8 +21,8 @@ start_link() -> end. init([Configurations]) -> - IsStandard = rabbit_shovel_operating_mode:is_standard(), - StaticShovelSpecs = make_child_specs(IsStandard, Configurations), + OpMode = rabbit_shovel_operating_mode:operating_mode(), + StaticShovelSpecs = make_child_specs(OpMode, Configurations), Len = dict:size(Configurations), ChildSpecs = [ #{ @@ -46,10 +46,8 @@ init([Configurations]) -> Opts = #{strategy => one_for_one, intensity => 2 * Len, period => 2}, {ok, {Opts, ChildSpecs}}. -make_child_specs(false = _StandardOperatingMode, _Configurations) -> - %% when operating in a non-standard mode, do not start any shovels - []; -make_child_specs(true, Configurations) -> + +make_child_specs(standard, Configurations) -> dict:fold( fun (ShovelName, ShovelConfig, Acc) -> [ @@ -63,7 +61,10 @@ make_child_specs(true, Configurations) -> } | Acc ] - end, [], Configurations). + end, [], Configurations); +make_child_specs(_NonStandardOpMode, _Configurations) -> + %% when operating in a non-standard mode, do not start any shovels + []. parse_configuration(undefined) -> {ok, dict:new()}; From e61eb7a45e305d96a9e22444ede6f9955127b9cc Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 25 Aug 2025 14:41:55 -0400 Subject: [PATCH 3/3] More logging --- deps/rabbitmq_shovel/src/rabbit_shovel_sup.erl | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_sup.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_sup.erl index c2ee7925070..7b9142fb3de 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_sup.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_sup.erl @@ -8,6 +8,9 @@ -module(rabbit_shovel_sup). -behaviour(supervisor). +-include_lib("kernel/include/logger.hrl"). +-include_lib("logging.hrl"). + -export([start_link/0, init/1]). -import(rabbit_shovel_config, []). @@ -22,6 +25,7 @@ start_link() -> init([Configurations]) -> OpMode = rabbit_shovel_operating_mode:operating_mode(), + ?LOG_DEBUG("Shovel: operating mode set to ~ts", [OpMode]), StaticShovelSpecs = make_child_specs(OpMode, Configurations), Len = dict:size(Configurations), ChildSpecs = [