Skip to content

Commit cb183d9

Browse files
committed
Add auto reconciliation feature to stream queue type
1 parent a66c716 commit cb183d9

File tree

5 files changed

+634
-0
lines changed

5 files changed

+634
-0
lines changed

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2688,6 +2688,29 @@ end}.
26882688
]}.
26892689

26902690

2691+
%%
2692+
%% Stream membership reconciliation
2693+
%%
2694+
2695+
{mapping, "stream.continuous_membership_reconciliation.enabled", "rabbit.stream_membership_reconciliation_enabled", [
2696+
{datatype, {enum, [true, false]}}]}.
2697+
2698+
{mapping, "stream.continuous_membership_reconciliation.auto_remove", "rabbit.stream_membership_reconciliation_auto_remove", [
2699+
{datatype, {enum, [true, false]}}]}.
2700+
2701+
{mapping, "stream.continuous_membership_reconciliation.interval", "rabbit.stream_membership_reconciliation_interval", [
2702+
{datatype, integer}, {validators, ["non_negative_integer"]}
2703+
]}.
2704+
2705+
{mapping, "stream.continuous_membership_reconciliation.trigger_interval", "rabbit.stream_membership_reconciliation_trigger_interval", [
2706+
{datatype, integer}, {validators, ["non_negative_integer"]}
2707+
]}.
2708+
2709+
{mapping, "stream.continuous_membership_reconciliation.target_group_size", "rabbit.stream_membership_reconciliation_target_group_size", [
2710+
{datatype, integer}, {validators, ["non_negative_integer"]}
2711+
]}.
2712+
2713+
26912714
%%
26922715
%% Runtime parameters
26932716
%%

deps/rabbit/src/rabbit.erl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,12 @@
183183
[rabbit_quorum_queue_periodic_membership_reconciliation]}},
184184
{requires, [database]}]}).
185185

186+
-rabbit_boot_step({rabbit_stream_periodic_membership_reconciliation,
187+
[{description, "Stream membership reconciliation"},
188+
{mfa, {rabbit_sup, start_restartable_child,
189+
[rabbit_stream_periodic_membership_reconciliation]}},
190+
{requires, [recovery]}]}).
191+
186192
-rabbit_boot_step({rabbit_epmd_monitor,
187193
[{description, "epmd monitor"},
188194
{mfa, {rabbit_sup, start_restartable_child,
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(rabbit_stream_event_subscriber).
9+
10+
-behaviour(gen_event).
11+
12+
-export([init/1, handle_event/2, handle_call/2, handle_info/2]).
13+
-export([register/0, unregister/0]).
14+
15+
-include_lib("rabbit_common/include/rabbit.hrl").
16+
17+
-rabbit_boot_step({rabbit_stream_event_subscriber,
18+
[{description, "stream event subscriber"},
19+
{mfa, {?MODULE, register, []}},
20+
{cleanup, {?MODULE, unregister, []}},
21+
{requires, rabbit_event},
22+
{enables, recovery}]}).
23+
24+
register() ->
25+
gen_event:add_handler(rabbit_alarm, ?MODULE, []),
26+
gen_event:add_handler(rabbit_event, ?MODULE, []).
27+
28+
unregister() ->
29+
gen_event:delete_handler(rabbit_alarm, ?MODULE, []),
30+
gen_event:delete_handler(rabbit_event, ?MODULE, []).
31+
32+
init([]) ->
33+
{ok, []}.
34+
35+
handle_call( _, State) ->
36+
{ok, ok, State}.
37+
38+
handle_event({node_up, Node}, State) ->
39+
rabbit_stream_periodic_membership_reconciliation:on_node_up(Node),
40+
{ok, State};
41+
handle_event({node_down, Node}, State) ->
42+
rabbit_stream_periodic_membership_reconciliation:on_node_down(Node),
43+
{ok, State};
44+
handle_event(#event{type = policy_set}, State) ->
45+
rabbit_stream_periodic_membership_reconciliation:policy_set(),
46+
{ok, State};
47+
handle_event(#event{type = operator_policy_set}, State) ->
48+
rabbit_stream_periodic_membership_reconciliation:policy_set(),
49+
{ok, State};
50+
handle_event(_, State) ->
51+
{ok, State}.
52+
53+
handle_info(_, State) ->
54+
{ok, State}.
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(rabbit_stream_periodic_membership_reconciliation).
9+
10+
-feature(maybe_expr, enable).
11+
12+
-behaviour(gen_server).
13+
14+
-export([on_node_up/1, on_node_down/1, queue_created/1, policy_set/0]).
15+
16+
-export([start_link/0]).
17+
18+
%% gen_server callbacks
19+
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
20+
code_change/3]).
21+
22+
-include_lib("rabbit_common/include/rabbit.hrl").
23+
-include_lib("kernel/include/logger.hrl").
24+
25+
-define(SERVER, ?MODULE).
26+
-define(DEFAULT_INTERVAL, 60_000*60).
27+
-define(DEFAULT_TRIGGER_INTERVAL, 10_000).
28+
-define(QUEUE_COUNT_START_RANDOM_SELECTION, 1_000).
29+
30+
-define(EVAL_MSG, membership_reconciliation).
31+
32+
-record(state, {timer_ref :: reference() | undefined,
33+
interval :: non_neg_integer(),
34+
trigger_interval :: non_neg_integer(),
35+
target_group_size :: non_neg_integer() | undefined,
36+
enabled :: boolean(),
37+
auto_remove :: boolean()}).
38+
39+
%%----------------------------------------------------------------------------
40+
%% Start
41+
%%----------------------------------------------------------------------------
42+
43+
-spec start_link() -> rabbit_types:ok_pid_or_error().
44+
start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
45+
46+
%%----------------------------------------------------------------------------
47+
%% API
48+
%%----------------------------------------------------------------------------
49+
50+
on_node_up(Node) ->
51+
gen_server:cast(?SERVER, {membership_reconciliation_trigger, {node_up, Node}}).
52+
53+
on_node_down(Node) ->
54+
gen_server:cast(?SERVER, {membership_reconciliation_trigger, {node_down, Node}}).
55+
56+
queue_created(Q) ->
57+
gen_server:cast(?SERVER, {membership_reconciliation_trigger, {queue_created, Q}}).
58+
59+
policy_set() ->
60+
gen_server:cast(?SERVER, {membership_reconciliation_trigger, policy_set}).
61+
62+
%%----------------------------------------------------------------------------
63+
%% gen_server callbacks
64+
%%----------------------------------------------------------------------------
65+
66+
init([]) ->
67+
Enabled = rabbit_misc:get_env(rabbit, stream_membership_reconciliation_enabled,
68+
false),
69+
AutoRemove = rabbit_misc:get_env(rabbit, stream_membership_reconciliation_auto_remove,
70+
false),
71+
Interval = rabbit_misc:get_env(rabbit, stream_membership_reconciliation_interval,
72+
?DEFAULT_INTERVAL),
73+
TriggerInterval = rabbit_misc:get_env(rabbit, stream_membership_reconciliation_trigger_interval,
74+
?DEFAULT_TRIGGER_INTERVAL),
75+
TargetGroupSize = rabbit_misc:get_env(rabbit, stream_membership_reconciliation_target_group_size,
76+
undefined),
77+
State = #state{interval = Interval,
78+
trigger_interval = TriggerInterval,
79+
target_group_size = TargetGroupSize,
80+
enabled = Enabled,
81+
auto_remove = AutoRemove},
82+
case Enabled of
83+
true ->
84+
Ref = erlang:send_after(Interval, self(), ?EVAL_MSG),
85+
{ok, State#state{timer_ref = Ref}};
86+
false ->
87+
{ok, State, hibernate}
88+
end.
89+
90+
handle_call(_Request, _From, State) ->
91+
{reply, ok, State}.
92+
93+
handle_cast({membership_reconciliation_trigger, _Reason}, #state{enabled = false} = State) ->
94+
{noreply, State, hibernate};
95+
handle_cast({membership_reconciliation_trigger, Reason}, #state{timer_ref = OldRef,
96+
trigger_interval = Time} = State) ->
97+
?LOG_DEBUG("Stream membership reconciliation scheduled: ~p", [Reason]),
98+
_ = erlang:cancel_timer(OldRef),
99+
Ref = erlang:send_after(Time, self(), ?EVAL_MSG),
100+
{noreply, State#state{timer_ref = Ref}};
101+
handle_cast(_Msg, State) ->
102+
{noreply, State}.
103+
104+
handle_info(?EVAL_MSG, #state{interval = Interval,
105+
trigger_interval = TriggerInterval} = State) ->
106+
Res = reconciliate_stream_membership(State),
107+
NewTimeout = case Res of
108+
noop ->
109+
Interval;
110+
_ ->
111+
TriggerInterval
112+
end,
113+
Ref = erlang:send_after(NewTimeout, self(), ?EVAL_MSG),
114+
{noreply, State#state{timer_ref = Ref}};
115+
handle_info(_Info, #state{enabled = false} = State) ->
116+
{noreply, State, hibernate};
117+
handle_info(_Info, State) ->
118+
{noreply, State}.
119+
120+
terminate(_Reason, _State) ->
121+
ok.
122+
123+
code_change(_OldVsn, State, _Extra) ->
124+
{ok, State}.
125+
126+
%%----------------------------------------------------------------------------
127+
%% Internal functions
128+
%%----------------------------------------------------------------------------
129+
130+
reconciliate_stream_membership(State) ->
131+
LocalStreams = rabbit_amqqueue:list_local_stream_queues(),
132+
LocalLeaders = lists:filter(fun(Q) ->
133+
#{leader_node := LeaderNode} = amqqueue:get_type_state(Q),
134+
LeaderNode =:= node()
135+
end, LocalStreams),
136+
ExpectedNodes = rabbit_nodes:list_members(),
137+
Running = rabbit_nodes:list_running(),
138+
reconciliate_stream_members(ExpectedNodes, Running, LocalLeaders, State, noop).
139+
140+
reconciliate_stream_members([], _Running, _, _State, Result) ->
141+
%% if there are no expected nodes rabbit_nodes:list_running/0 encountered
142+
%% an error during query and returned the empty list which is case we need
143+
%% to handle
144+
Result;
145+
reconciliate_stream_members(_ExpectedNodes, _Running, [], _State, Result) ->
146+
Result;
147+
reconciliate_stream_members(ExpectedNodes, Running, [Q | LocalLeaders],
148+
#state{target_group_size = TargetSize} = State,
149+
OldResult) ->
150+
Result =
151+
maybe
152+
#{name := _StreamId, nodes := MemberNodes, leader_node := LeaderNode} = amqqueue:get_type_state(Q),
153+
%% Check if Leader is indeed this node
154+
LeaderNode ?= node(),
155+
%% And that this node is not in maintenance mode
156+
true ?= not rabbit_maintenance:is_being_drained_local_read(node()),
157+
DanglingNodes = MemberNodes -- ExpectedNodes,
158+
case maybe_remove(DanglingNodes, State) of
159+
false ->
160+
maybe_add_member(Q, Running, MemberNodes, get_target_size(Q, TargetSize));
161+
true ->
162+
remove_members(Q, DanglingNodes)
163+
end
164+
else
165+
_ ->
166+
noop
167+
end,
168+
reconciliate_stream_members(ExpectedNodes, Running, LocalLeaders, State,
169+
update_result(OldResult, Result)).
170+
171+
maybe_remove(_, #state{auto_remove = false}) ->
172+
false;
173+
maybe_remove([], #state{auto_remove = true}) ->
174+
false;
175+
maybe_remove(_Nodes, #state{auto_remove = true}) ->
176+
true.
177+
178+
maybe_add_member(Q, Running, MemberNodes, TargetSize) ->
179+
%% Filter out any new nodes under maintenance
180+
New = rabbit_maintenance:filter_out_drained_nodes_local_read(Running -- MemberNodes),
181+
case should_add_node(MemberNodes, New, TargetSize) of
182+
true ->
183+
%% In the future, sort the list of new nodes based on load,
184+
%% availability zones etc
185+
Node = select_node(New),
186+
QName = #resource{name = Name, virtual_host = VHost} = amqqueue:get_name(Q),
187+
case rabbit_stream_queue:add_replica(VHost, Name, Node) of
188+
ok ->
189+
?LOG_DEBUG(
190+
"Added node ~ts as a replica to ~ts as "
191+
"the streams target group size(#~w) is not met and "
192+
"there are enough new nodes(#~w) in the cluster",
193+
[Node, rabbit_misc:rs(QName), TargetSize, length(New)]);
194+
{error, Err} ->
195+
?LOG_WARNING(
196+
"~ts: failed to add replica on node ~w, error: ~w",
197+
[rabbit_misc:rs(QName), Node, Err])
198+
end,
199+
ok;
200+
false ->
201+
noop
202+
end.
203+
204+
should_add_node(MemberNodes, New, TargetSize) ->
205+
CurrentSize = length(MemberNodes),
206+
NumberOfNewNodes = length(New),
207+
maybe
208+
true ?= NumberOfNewNodes > 0, %% There are new nodes to grow to
209+
true ?= CurrentSize < TargetSize, %% Target size not reached
210+
true ?= rabbit_misc:is_even(CurrentSize) orelse NumberOfNewNodes > 1, %% Enough nodes to grow to odd member size
211+
true ?= rabbit_nodes:is_running(lists:delete(node(), MemberNodes))
212+
end.
213+
214+
get_target_size(Q, undefined) ->
215+
get_target_size(Q);
216+
get_target_size(Q, N) when N > 0 ->
217+
max(N, get_target_size(Q)).
218+
219+
get_target_size(Q) ->
220+
PolicyValue = case rabbit_policy:get(<<"target-group-size">>, Q) of
221+
undefined ->
222+
0;
223+
PolicyN ->
224+
PolicyN
225+
end,
226+
Arguments = amqqueue:get_arguments(Q),
227+
case rabbit_misc:table_lookup(Arguments, <<"x-stream-target-group-size">>) of
228+
undefined ->
229+
PolicyValue;
230+
ArgN ->
231+
max(ArgN, PolicyValue)
232+
end.
233+
234+
remove_members(_Q, []) ->
235+
ok;
236+
remove_members(Q, [Node | Nodes]) ->
237+
QName = #resource{name = Name, virtual_host = VHost} = amqqueue:get_name(Q),
238+
case rabbit_stream_queue:delete_replica(VHost, Name, Node) of
239+
ok ->
240+
QName = amqqueue:get_name(Q),
241+
?LOG_DEBUG("~ts: Successfully removed replica on node ~w",
242+
[rabbit_misc:rs(QName), Node]),
243+
ok;
244+
{error, Err} ->
245+
QName = amqqueue:get_name(Q),
246+
?LOG_DEBUG("~ts: failed to remove replica on node "
247+
"~w, error: ~w",
248+
[rabbit_misc:rs(QName), Node, Err])
249+
end,
250+
remove_members(Q, Nodes).
251+
252+
253+
%% Make sure any non-noop result is stored.
254+
update_result(noop, Result) ->
255+
Result;
256+
update_result(Result, noop) ->
257+
Result;
258+
update_result(Result, Result) ->
259+
Result.
260+
261+
select_node([Node]) ->
262+
Node;
263+
select_node(Nodes) ->
264+
lists:nth(rand:uniform(length(Nodes)), Nodes).

0 commit comments

Comments
 (0)