Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/templates/test.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:
run: |
echo "value=bazel-repo-cache-${{ hashFiles('MODULE.bazel') }}" | tee -a $GITHUB_OUTPUT
- name: AUTHENTICATE TO GOOGLE CLOUD
uses: google-github-actions/[email protected].2
uses: google-github-actions/[email protected].3
with:
credentials_json: ${{ secrets.REMOTE_CACHE_CREDENTIALS_JSON }}
- name: REPO CACHE
Expand Down
6 changes: 5 additions & 1 deletion deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,11 @@ _APP_ENV = """[
{dead_letter_worker_publisher_confirm_timeout, 180000},

%% EOL date for the current release series, if known/announced
{release_series_eol_date, none}
{release_series_eol_date, none},

{vhost_process_reconciliation_run_interval, 30},
%% for testing
{vhost_process_reconciliation_enabled, true}
]
"""

Expand Down
15 changes: 9 additions & 6 deletions deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,15 @@ define PROJECT_ENV
%% interval at which connection/channel tracking executes post operations
{tracking_execution_timeout, 15000},
{stream_messages_soft_limit, 256},
{track_auth_attempt_source, false},
{credentials_obfuscation_fallback_secret, <<"nocookie">>},
{dead_letter_worker_consumer_prefetch, 32},
{dead_letter_worker_publisher_confirm_timeout, 180000},
%% EOL date for the current release series, if known/announced
{release_series_eol_date, none}
{track_auth_attempt_source, false},
{credentials_obfuscation_fallback_secret, <<"nocookie">>},
{dead_letter_worker_consumer_prefetch, 32},
{dead_letter_worker_publisher_confirm_timeout, 180000},
%% EOL date for the current release series, if known/announced
{release_series_eol_date, none},
{vhost_process_reconciliation_run_interval, 30},
%% for testing
{vhost_process_reconciliation_enabled, true}
]
endef

Expand Down
3 changes: 3 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_vhost_sup.erl",
"src/rabbit_vhost_sup_sup.erl",
"src/rabbit_vhost_sup_wrapper.erl",
"src/rabbit_vhosts.erl",
"src/rabbit_vm.erl",
"src/supervised_lifecycle.erl",
"src/tcp_listener.erl",
Expand Down Expand Up @@ -499,6 +500,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_vhost_sup.erl",
"src/rabbit_vhost_sup_sup.erl",
"src/rabbit_vhost_sup_wrapper.erl",
"src/rabbit_vhosts.erl",
"src/rabbit_vm.erl",
"src/supervised_lifecycle.erl",
"src/tcp_listener.erl",
Expand Down Expand Up @@ -783,6 +785,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_vhost_sup.erl",
"src/rabbit_vhost_sup_sup.erl",
"src/rabbit_vhost_sup_wrapper.erl",
"src/rabbit_vhosts.erl",
"src/rabbit_vm.erl",
"src/supervised_lifecycle.erl",
"src/tcp_listener.erl",
Expand Down
9 changes: 9 additions & 0 deletions deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,15 @@
{mfa, {logger, debug, ["'networking' boot step skipped and moved to end of startup", [], #{domain => ?RMQLOG_DOMAIN_GLOBAL}]}},
{requires, notify_cluster}]}).

%% This mechanism is necessary in environments where a cluster is formed in parallel,
%% which is the case with many container orchestration tools.
%% In such scenarios, a virtual host can be declared before the cluster is formed and all
%% cluster members are known, e.g. via definition import.
-rabbit_boot_step({virtual_host_reconciliation,
[{description, "makes sure all virtual host have running processes on all nodes"},
{mfa, {rabbit_vhosts, boot, []}},
{requires, notify_cluster}]}).

%%---------------------------------------------------------------------------

-include_lib("rabbit_common/include/rabbit_framing.hrl").
Expand Down
1 change: 1 addition & 0 deletions deps/rabbit/src/rabbit_node_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,7 @@ handle_live_rabbit(Node) ->
true -> ok;
false -> on_node_up_using_mnesia(Node)
end,
ok = rabbit_vhosts:on_node_up(Node),
ok = rabbit_quorum_queue_periodic_membership_reconciliation:on_node_up(Node).

on_node_up_using_mnesia(Node) ->
Expand Down
166 changes: 166 additions & 0 deletions deps/rabbit/src/rabbit_vhosts.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

%% This module exists to avoid circular module dependencies between
%% several others virtual hosts-related modules.
-module(rabbit_vhosts).

-define(PERSISTENT_TERM_COUNTER_KEY, rabbit_vhosts_reconciliation_run_counter).

%% API

-export([
list_names/0,
exists/1,
boot/0,
reconcile/0,
reconcile_once/0,
is_reconciliation_enabled/0,
disable_reconciliation/0,
enable_reconciliation/0,
start_processes_for_all/0,
start_on_all_nodes/2,
on_node_up/1
]).

%% Same as rabbit_vhost:exists/1.
-spec exists(vhost:name()) -> boolean().
exists(VirtualHost) ->
rabbit_db_vhost:exists(VirtualHost).

%% Same as rabbit_vhost:list_names/0.
-spec list_names() -> [vhost:name()].
list_names() -> rabbit_db_vhost:list().

-spec boot() -> 'ok'.
boot() ->
_ = start_processes_for_all(),
_ = increment_run_counter(),
_ = case is_reconciliation_enabled() of
false -> ok;
true -> maybe_start_timer(reconcile)
end,
ok.

%% Performs a round of virtual host process reconciliation and sets up a timer to
%% re-run this operation again unless it has been run 10 or more times since cluster boot.
%% See start_processes_for_all/1.
-spec reconcile() -> 'ok'.
reconcile() ->
case is_reconciliation_enabled() of
false -> ok;
true ->
_ = reconcile_once(),
_ = maybe_start_timer(?FUNCTION_NAME),
ok
end.

%% Performs a round of virtual host process reconciliation but does not schedule any future runs.
%% See start_processes_for_all/1.
-spec reconcile_once() -> 'ok'.
reconcile_once() ->
rabbit_log:debug("Will reconcile virtual host processes on all cluster members..."),
_ = start_processes_for_all(),
_ = increment_run_counter(),
N = get_run_counter(),
rabbit_log:debug("Done with virtual host processes reconciliation (run ~tp)", [N]),
ok.

-spec on_node_up(Node :: node()) -> 'ok'.
on_node_up(_Node) ->
case is_reconciliation_enabled() of
false -> ok;
true ->
DelayInSeconds = 10,
Delay = DelayInSeconds * 1000,
rabbit_log:debug("Will reschedule virtual host process reconciliation after ~b seconds", [DelayInSeconds]),
_ = timer:apply_after(Delay, ?MODULE, reconcile_once, []),
ok
end.

-spec is_reconciliation_enabled() -> boolean().
is_reconciliation_enabled() ->
application:get_env(rabbit, vhost_process_reconciliation_enabled, true).

-spec enable_reconciliation() -> 'ok'.
enable_reconciliation() ->
%% reset the auto-stop counter
persistent_term:put(?PERSISTENT_TERM_COUNTER_KEY, 0),
application:set_env(rabbit, vhost_process_reconciliation_enabled, true).

-spec disable_reconciliation() -> 'ok'.
disable_reconciliation() ->
application:set_env(rabbit, vhost_process_reconciliation_enabled, false).

-spec reconciliation_interval() -> non_neg_integer().
reconciliation_interval() ->
application:get_env(rabbit, vhost_process_reconciliation_run_interval, 30).

%% Starts a virtual host process on every specified nodes.
%% Only exists to allow for "virtual host process repair"
%% in clusters where nodes a booted in parallel and seeded
%% (e.g. using definitions) at the same time.
%%
%% In that case, during virtual host insertion into the schema database,
%% some processes predictably won't be started on the yet-to-be-discovered nodes.
-spec start_processes_for_all([node()]) -> 'ok'.
start_processes_for_all(Nodes) ->
Names = list_names(),
N = length(Names),
rabbit_log:debug("Will make sure that processes of ~p virtual hosts are running on all reachable cluster nodes", [N]),
[begin
try
start_on_all_nodes(VH, Nodes)
catch
_:Err:_Stacktrace ->
rabbit_log:error("Could not reconcile virtual host ~ts: ~tp", [VH, Err])
end
end || VH <- Names],
ok.

-spec start_processes_for_all() -> 'ok'.
start_processes_for_all() ->
start_processes_for_all(rabbit_nodes:list_reachable()).

%% Same as rabbit_vhost_sup_sup:start_on_all_nodes/0.
-spec start_on_all_nodes(vhost:name(), [node()]) -> 'ok'.
start_on_all_nodes(VirtualHost, Nodes) ->
_ = rabbit_vhost_sup_sup:start_on_all_nodes(VirtualHost, Nodes),
ok.

%%
%% Implementation
%%

-spec get_run_counter() -> non_neg_integer().
get_run_counter() ->
persistent_term:get(?PERSISTENT_TERM_COUNTER_KEY, 0).

-spec increment_run_counter() -> non_neg_integer().
increment_run_counter() ->
N = get_run_counter(),
persistent_term:put(?PERSISTENT_TERM_COUNTER_KEY, N + 1),
N.

-spec maybe_start_timer(atom()) -> ok | {ok, timer:tref()} | {error, any()}.
maybe_start_timer(FunName) ->
N = get_run_counter(),
DelayInSeconds = reconciliation_interval(),
case N >= 10 of
true ->
%% Stop after ten runs
rabbit_log:debug("Will stop virtual host process reconciliation after ~tp runs", [N]),
ok;
false ->
case is_reconciliation_enabled() of
false -> ok;
true ->
Delay = DelayInSeconds * 1000,
rabbit_log:debug("Will reschedule virtual host process reconciliation after ~b seconds", [DelayInSeconds]),
timer:apply_after(Delay, ?MODULE, FunName, [])
end
end.
38 changes: 18 additions & 20 deletions deps/rabbit/test/rabbitmqctl_integration_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ create_n_node_cluster(Config0, NumNodes) ->
Config1 = rabbit_ct_helpers:set_config(
Config0, [{rmq_nodes_count, NumNodes},
{rmq_nodes_clustered, true}]),
rabbit_ct_helpers:run_steps(Config1,
Config2 = rabbit_ct_helpers:merge_app_env(
Config1, {rabbit, [
{vhost_process_reconciliation_enabled, false}
]}),
rabbit_ct_helpers:run_steps(Config2,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).

Expand Down Expand Up @@ -100,9 +104,12 @@ end_per_group(_, Config) ->
Config.

init_per_testcase(list_queues_stopped, Config0) ->
%% Start node 3 to crash it's queues
%% Start node 3 to kill a few virtual hosts on it
rabbit_ct_broker_helpers:start_node(Config0, 2),
%% Make vhost "down" on nodes 2 and 3
%% Disable virtual host reconciliation
rabbit_ct_broker_helpers:rpc(Config0, 1, rabbit_vhosts, disable_reconciliation, []),
rabbit_ct_broker_helpers:rpc(Config0, 2, rabbit_vhosts, disable_reconciliation, []),
%% Terminate default virtual host's processes on nodes 2 and 3
ok = rabbit_ct_broker_helpers:force_vhost_failure(Config0, 1, <<"/">>),
ok = rabbit_ct_broker_helpers:force_vhost_failure(Config0, 2, <<"/">>),

Expand All @@ -118,6 +125,7 @@ end_per_testcase(Testcase, Config0) ->
%%----------------------------------------------------------------------------
%% Test cases
%%----------------------------------------------------------------------------

list_queues_local(Config) ->
Node1Queues = lists:nth(1, ?config(per_node_queues, Config)),
Node2Queues = lists:nth(2, ?config(per_node_queues, Config)),
Expand All @@ -141,23 +149,13 @@ list_queues_offline(Config) ->
ok.

list_queues_stopped(Config) ->
Node1Queues = lists:nth(1, ?config(per_node_queues, Config)),
Node2Queues = lists:nth(2, ?config(per_node_queues, Config)),
Node3Queues = lists:nth(3, ?config(per_node_queues, Config)),

Expected =
lists:sort([ {Q, <<"running">>} || Q <- Node1Queues ] ++
%% Node is running. Vhost is down
[ {Q, <<"stopped">>} || Q <- Node2Queues ] ++
%% Node is not running. Vhost is down
[ {Q, <<"down">>} || Q <- Node3Queues ]),

?awaitMatch(
Expected,
lists:sort(
[ {Name, State}
|| [Name, State] <- rabbit_ct_broker_helpers:rabbitmqctl_list(Config, 0, ["list_queues", "name", "state", "--no-table-headers"]) ]),
30_000).
rabbit_ct_helpers:await_condition(fun() ->
Listed = rabbit_ct_broker_helpers:rabbitmqctl_list(Config, 0, ["list_queues", "name", "state", "--no-table-headers"]),
%% We expect some queue replicas to be reported as running, some as down and some as stopped,
%% and that CLI tools are capable of handling and formatting such rows. MK.
ReplicaStates = lists:usort([State|| [_Name, State] <- Listed]),
ReplicaStates =:= [<<"down">>, <<"running">>, <<"stopped">>]
end, 30_000).

%%----------------------------------------------------------------------------
%% Helpers
Expand Down
4 changes: 3 additions & 1 deletion deps/rabbitmq_auth_backend_http/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ all_srcs(name = "all_srcs")

test_suite_beam_files(name = "test_suite_beam_files")

# gazelle:erlang_app_extra_app crypto
# gazelle:erlang_app_extra_app inets
# gazelle:erlang_app_extra_app ssl
# gazelle:erlang_app_extra_app public_key
# gazelle:erlang_app_dep rabbit

Expand Down Expand Up @@ -106,7 +108,7 @@ rabbitmq_integration_suite(
"test/auth_http_mock.beam",
],
deps = [
"@cowboy//:erlang_app"
"@cowboy//:erlang_app",
],
)

Expand Down
Loading