|
| 1 | +%% This Source Code Form is subject to the terms of the Mozilla Public |
| 2 | +%% License, v. 2.0. If a copy of the MPL was not distributed with this |
| 3 | +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. |
| 4 | +%% |
| 5 | +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. |
| 6 | +%% |
| 7 | + |
| 8 | +%% This module exists to avoid circular module dependencies between |
| 9 | +%% several others virtual hosts-related modules. |
| 10 | +-module(rabbit_vhosts). |
| 11 | + |
| 12 | +-define(PERSISTENT_TERM_COUNTER_KEY, rabbit_vhosts_reconciliation_run_counter). |
| 13 | + |
| 14 | +%% API |
| 15 | + |
| 16 | +-export([ |
| 17 | + list_names/0, |
| 18 | + exists/1, |
| 19 | + boot/0, |
| 20 | + reconcile/0, |
| 21 | + reconcile_once/0, |
| 22 | + is_reconciliation_enabled/0, |
| 23 | + disable_reconciliation/0, |
| 24 | + enable_reconciliation/0, |
| 25 | + start_processes_for_all/0, |
| 26 | + start_on_all_nodes/2, |
| 27 | + on_node_up/1 |
| 28 | +]). |
| 29 | + |
| 30 | +%% Same as rabbit_vhost:exists/1. |
| 31 | +-spec exists(vhost:name()) -> boolean(). |
| 32 | +exists(VirtualHost) -> |
| 33 | + rabbit_db_vhost:exists(VirtualHost). |
| 34 | + |
| 35 | +%% Same as rabbit_vhost:list_names/0. |
| 36 | +-spec list_names() -> [vhost:name()]. |
| 37 | +list_names() -> rabbit_db_vhost:list(). |
| 38 | + |
| 39 | +-spec boot() -> 'ok'. |
| 40 | +boot() -> |
| 41 | + _ = start_processes_for_all(), |
| 42 | + _ = increment_run_counter(), |
| 43 | + _ = case is_reconciliation_enabled() of |
| 44 | + false -> ok; |
| 45 | + true -> maybe_start_timer(reconcile) |
| 46 | + end, |
| 47 | + ok. |
| 48 | + |
| 49 | +%% Performs a round of virtual host process reconciliation and sets up a timer to |
| 50 | +%% re-run this operation again unless it has been run 10 or more times since cluster boot. |
| 51 | +%% See start_processes_for_all/1. |
| 52 | +-spec reconcile() -> 'ok'. |
| 53 | +reconcile() -> |
| 54 | + case is_reconciliation_enabled() of |
| 55 | + false -> ok; |
| 56 | + true -> |
| 57 | + _ = reconcile_once(), |
| 58 | + _ = maybe_start_timer(?FUNCTION_NAME), |
| 59 | + ok |
| 60 | + end. |
| 61 | + |
| 62 | +%% Performs a round of virtual host process reconciliation but does not schedule any future runs. |
| 63 | +%% See start_processes_for_all/1. |
| 64 | +-spec reconcile_once() -> 'ok'. |
| 65 | +reconcile_once() -> |
| 66 | + rabbit_log:debug("Will reconcile virtual host processes on all cluster members..."), |
| 67 | + _ = start_processes_for_all(), |
| 68 | + _ = increment_run_counter(), |
| 69 | + N = get_run_counter(), |
| 70 | + rabbit_log:debug("Done with virtual host processes reconciliation (run ~tp)", [N]), |
| 71 | + ok. |
| 72 | + |
| 73 | +-spec on_node_up(Node :: node()) -> 'ok'. |
| 74 | +on_node_up(_Node) -> |
| 75 | + case is_reconciliation_enabled() of |
| 76 | + false -> ok; |
| 77 | + true -> |
| 78 | + DelayInSeconds = 10, |
| 79 | + Delay = DelayInSeconds * 1000, |
| 80 | + rabbit_log:debug("Will reschedule virtual host process reconciliation after ~b seconds", [DelayInSeconds]), |
| 81 | + _ = timer:apply_after(Delay, ?MODULE, reconcile_once, []), |
| 82 | + ok |
| 83 | + end. |
| 84 | + |
| 85 | +-spec is_reconciliation_enabled() -> boolean(). |
| 86 | +is_reconciliation_enabled() -> |
| 87 | + application:get_env(rabbit, vhost_process_reconciliation_enabled, true). |
| 88 | + |
| 89 | +-spec enable_reconciliation() -> 'ok'. |
| 90 | +enable_reconciliation() -> |
| 91 | + %% reset the auto-stop counter |
| 92 | + persistent_term:put(?PERSISTENT_TERM_COUNTER_KEY, 0), |
| 93 | + application:set_env(rabbit, vhost_process_reconciliation_enabled, true). |
| 94 | + |
| 95 | +-spec disable_reconciliation() -> 'ok'. |
| 96 | +disable_reconciliation() -> |
| 97 | + application:set_env(rabbit, vhost_process_reconciliation_enabled, false). |
| 98 | + |
| 99 | +-spec reconciliation_interval() -> non_neg_integer(). |
| 100 | +reconciliation_interval() -> |
| 101 | + application:get_env(rabbit, vhost_process_reconciliation_run_interval, 30). |
| 102 | + |
| 103 | +%% Starts a virtual host process on every specified nodes. |
| 104 | +%% Only exists to allow for "virtual host process repair" |
| 105 | +%% in clusters where nodes a booted in parallel and seeded |
| 106 | +%% (e.g. using definitions) at the same time. |
| 107 | +%% |
| 108 | +%% In that case, during virtual host insertion into the schema database, |
| 109 | +%% some processes predictably won't be started on the yet-to-be-discovered nodes. |
| 110 | +-spec start_processes_for_all([node()]) -> 'ok'. |
| 111 | +start_processes_for_all(Nodes) -> |
| 112 | + Names = list_names(), |
| 113 | + N = length(Names), |
| 114 | + rabbit_log:debug("Will make sure that processes of ~p virtual hosts are running on all reachable cluster nodes", [N]), |
| 115 | + [begin |
| 116 | + try |
| 117 | + start_on_all_nodes(VH, Nodes) |
| 118 | + catch |
| 119 | + _:Err:_Stacktrace -> |
| 120 | + rabbit_log:error("Could not reconcile virtual host ~ts: ~tp", [VH, Err]) |
| 121 | + end |
| 122 | + end || VH <- Names], |
| 123 | + ok. |
| 124 | + |
| 125 | +-spec start_processes_for_all() -> 'ok'. |
| 126 | +start_processes_for_all() -> |
| 127 | + start_processes_for_all(rabbit_nodes:list_reachable()). |
| 128 | + |
| 129 | +%% Same as rabbit_vhost_sup_sup:start_on_all_nodes/0. |
| 130 | +-spec start_on_all_nodes(vhost:name(), [node()]) -> 'ok'. |
| 131 | +start_on_all_nodes(VirtualHost, Nodes) -> |
| 132 | + _ = rabbit_vhost_sup_sup:start_on_all_nodes(VirtualHost, Nodes), |
| 133 | + ok. |
| 134 | + |
| 135 | +%% |
| 136 | +%% Implementation |
| 137 | +%% |
| 138 | + |
| 139 | +-spec get_run_counter() -> non_neg_integer(). |
| 140 | +get_run_counter() -> |
| 141 | + persistent_term:get(?PERSISTENT_TERM_COUNTER_KEY, 0). |
| 142 | + |
| 143 | +-spec increment_run_counter() -> non_neg_integer(). |
| 144 | +increment_run_counter() -> |
| 145 | + N = get_run_counter(), |
| 146 | + persistent_term:put(?PERSISTENT_TERM_COUNTER_KEY, N + 1), |
| 147 | + N. |
| 148 | + |
| 149 | +-spec maybe_start_timer(atom()) -> ok | {ok, timer:tref()} | {error, any()}. |
| 150 | +maybe_start_timer(FunName) -> |
| 151 | + N = get_run_counter(), |
| 152 | + DelayInSeconds = reconciliation_interval(), |
| 153 | + case N >= 10 of |
| 154 | + true -> |
| 155 | + %% Stop after ten runs |
| 156 | + rabbit_log:debug("Will stop virtual host process reconciliation after ~tp runs", [N]), |
| 157 | + ok; |
| 158 | + false -> |
| 159 | + case is_reconciliation_enabled() of |
| 160 | + false -> ok; |
| 161 | + true -> |
| 162 | + Delay = DelayInSeconds * 1000, |
| 163 | + rabbit_log:debug("Will reschedule virtual host process reconciliation after ~b seconds", [DelayInSeconds]), |
| 164 | + timer:apply_after(Delay, ?MODULE, FunName, []) |
| 165 | + end |
| 166 | + end. |
0 commit comments