Skip to content

Commit 6a85a17

Browse files
committed
Move rebalance progress from ns_orchestrator ...
... to ns_rebalance_observer, so that we can have all rebalance related tracking in one place. Change-Id: Icc2a4ad219770ba794b46c904924c10a1552b907 Reviewed-on: http://review.couchbase.org/101815 Reviewed-by: Aliaksey Artamonau <[email protected]> Tested-by: Abhijeeth Nuthan <[email protected]> Well-Formed: Build Bot <[email protected]>
1 parent 7a46058 commit 6a85a17

File tree

4 files changed

+122
-49
lines changed

4 files changed

+122
-49
lines changed

src/ns_orchestrator.erl

Lines changed: 79 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
-record(janitor_state, {cleanup_id = undefined :: undefined | pid()}).
2626

2727
-record(rebalancing_state, {rebalancer,
28-
progress,
28+
rebalance_observer = undefined,
2929
keep_nodes,
3030
eject_nodes,
3131
failed_nodes,
@@ -53,7 +53,6 @@
5353
start_rebalance/3,
5454
start_rebalance/4,
5555
stop_rebalance/0,
56-
update_progress/2,
5756
is_rebalance_running/0,
5857
start_recovery/1,
5958
stop_recovery/2,
@@ -226,7 +225,20 @@ rebalance_progress_full() ->
226225
{running, [{atom(), float()}]} |
227226
not_running.
228227
rebalance_progress_full(Timeout) ->
229-
gen_statem:call(?SERVER, rebalance_progress, Timeout).
228+
case ns_config:search(rebalancer_pid) of
229+
false ->
230+
not_running;
231+
{value, undefined} ->
232+
not_running;
233+
{value, Pid} when is_pid(Pid) ->
234+
case ns_rebalance_observer:get_aggregated_progress(Timeout) of
235+
not_running ->
236+
?log_error("Couldn't reach ns_rebalance_observer"),
237+
not_running;
238+
Aggr ->
239+
{running, Aggr}
240+
end
241+
end.
230242

231243
-spec rebalance_progress() -> {running, [{atom(), float()}]} | not_running.
232244
rebalance_progress() ->
@@ -463,6 +475,10 @@ handle_info({'EXIT', Pid, Reason}, rebalancing,
463475
#rebalancing_state{rebalancer = Pid} = State) ->
464476
handle_rebalance_completion(Reason, State);
465477

478+
handle_info({'EXIT', ObserverPid, Reason}, rebalancing,
479+
#rebalancing_state{rebalance_observer = ObserverPid} = State) ->
480+
{keep_state, stop_rebalance(State, {rebalance_observer_terminated, Reason})};
481+
466482
handle_info({'EXIT', Pid, Reason}, recovery, #recovery_state{pid = Pid}) ->
467483
ale:error(?USER_LOGGER,
468484
"Recovery process ~p terminated unexpectedly: ~p", [Pid, Reason]),
@@ -671,6 +687,15 @@ idle({start_graceful_failover, Node}, From, _State) when is_atom(Node) ->
671687
{keep_state_and_data,
672688
[{next_event, {call, From}, {start_graceful_failover, [Node]}}]};
673689
idle({start_graceful_failover, Nodes}, From, _State) ->
690+
ActiveNodes = ns_cluster_membership:active_nodes(),
691+
NodesInfo = [{active_nodes, ActiveNodes}],
692+
Services = [kv],
693+
Type = graceful_failover,
694+
{ok, ObserverPid} = ns_rebalance_observer:start_link(
695+
Services,
696+
NodesInfo,
697+
Type),
698+
674699
case ns_rebalancer:start_link_graceful_failover(Nodes) of
675700
{ok, Pid} ->
676701
Id = couch_uuids:random(),
@@ -681,27 +706,36 @@ idle({start_graceful_failover, Nodes}, From, _State) ->
681706
ns_cluster:counter_inc(Type, start),
682707
set_rebalance_status(Type, running, Pid),
683708

684-
ActiveNodes = ns_cluster_membership:active_nodes(),
685-
Progress = rebalance_progress:init(ActiveNodes, [kv]),
686-
687709
{next_state, rebalancing,
688710
#rebalancing_state{rebalancer = Pid,
711+
rebalance_observer = ObserverPid,
689712
eject_nodes = [],
690713
keep_nodes = [],
691714
failed_nodes = [],
692715
abort_reason = undefined,
693-
progress = Progress,
694716
type = Type,
695717
rebalance_id = Id},
696718
[{reply, From, ok}]};
697719
{error, RV} ->
720+
misc:unlink_terminate_and_wait(ObserverPid, kill),
698721
{keep_state_and_data, [{reply, From, RV}]}
699722
end;
700723
idle(rebalance_progress, From, _State) ->
701724
{keep_state_and_data, [{reply, From, not_running}]};
702725
%% NOTE: this is not remotely called but is used by maybe_start_rebalance
703726
idle({start_rebalance, KeepNodes, EjectNodes, FailedNodes, DeltaNodes,
704727
DeltaRecoveryBuckets, RebalanceId}, From, _State) ->
728+
NodesInfo = [{active_nodes, KeepNodes ++ EjectNodes},
729+
{keep_nodes, KeepNodes},
730+
{eject_nodes, EjectNodes},
731+
{delta_nodes, DeltaNodes},
732+
{failed_nodes, FailedNodes}],
733+
Type = rebalance,
734+
Services = [kv] ++ ns_cluster_membership:topology_aware_services(),
735+
{ok, ObserverPid} = ns_rebalance_observer:start_link(
736+
Services,
737+
NodesInfo,
738+
Type),
705739
case ns_rebalancer:start_link_rebalance(KeepNodes, EjectNodes,
706740
FailedNodes, DeltaNodes,
707741
DeltaRecoveryBuckets) of
@@ -725,14 +759,12 @@ idle({start_rebalance, KeepNodes, EjectNodes, FailedNodes, DeltaNodes,
725759
[KeepNodes, EjectNodes, FailedNodes, RebalanceId])
726760
end,
727761

728-
Type = rebalance,
729762
ns_cluster:counter_inc(Type, start),
730763
set_rebalance_status(Type, running, Pid),
731764

732765
{next_state, rebalancing,
733766
#rebalancing_state{rebalancer = Pid,
734-
progress = rebalance_progress:init(
735-
KeepNodes ++ EjectNodes),
767+
rebalance_observer = ObserverPid,
736768
keep_nodes = KeepNodes,
737769
eject_nodes = EjectNodes,
738770
failed_nodes = FailedNodes,
@@ -741,11 +773,22 @@ idle({start_rebalance, KeepNodes, EjectNodes, FailedNodes, DeltaNodes,
741773
rebalance_id = RebalanceId},
742774
[{reply, From, ok}]};
743775
{error, no_kv_nodes_left} ->
776+
misc:unlink_terminate_and_wait(ObserverPid, kill),
744777
{keep_state_and_data, [{reply, From, no_kv_nodes_left}]};
745778
{error, delta_recovery_not_possible} ->
779+
misc:unlink_terminate_and_wait(ObserverPid, kill),
746780
{keep_state_and_data, [{reply, From, delta_recovery_not_possible}]}
747781
end;
748782
idle({move_vbuckets, Bucket, Moves}, From, _State) ->
783+
KeepNodes = ns_node_disco:nodes_wanted(),
784+
Type = move_vbuckets,
785+
NodesInfo = [{active_nodes, ns_cluster_membership:active_nodes()},
786+
{keep_nodes, KeepNodes}],
787+
Services = [kv],
788+
{ok, ObserverPid} = ns_rebalance_observer:start_link(
789+
Services,
790+
NodesInfo,
791+
Type),
749792
Pid = spawn_link(
750793
fun () ->
751794
ns_rebalancer:move_vbuckets(Bucket, Moves)
@@ -754,16 +797,12 @@ idle({move_vbuckets, Bucket, Moves}, From, _State) ->
754797
Id = couch_uuids:random(),
755798
?log_debug("Moving vBuckets in bucket ~p. Moves ~p. "
756799
"Operation Id = ~s", [Bucket, Moves, Id]),
757-
Type = move_vbuckets,
758800
ns_cluster:counter_inc(Type, start),
759801
set_rebalance_status(Type, running, Pid),
760802

761-
Nodes = ns_cluster_membership:active_nodes(),
762-
Progress = rebalance_progress:init(Nodes, [kv]),
763-
764803
{next_state, rebalancing,
765804
#rebalancing_state{rebalancer = Pid,
766-
progress = Progress,
805+
rebalance_observer = ObserverPid,
767806
keep_nodes = ns_node_disco:nodes_wanted(),
768807
eject_nodes = [],
769808
failed_nodes = [],
@@ -821,11 +860,6 @@ janitor_running(Msg, From, #janitor_state{cleanup_id = ID})
821860
{next_state, idle, #idle_state{}, [{next_event, {call, From}, Msg}]}.
822861

823862
%% Asynchronous rebalancing events
824-
rebalancing({update_progress, Service, ServiceProgress},
825-
#rebalancing_state{progress = Old} = State) ->
826-
NewProgress = rebalance_progress:update(Service, ServiceProgress, Old),
827-
{next_state, rebalancing,
828-
State#rebalancing_state{progress = NewProgress}};
829863
rebalancing({timeout, _Tref, stop_timeout},
830864
#rebalancing_state{rebalancer = Pid} = State) ->
831865
?log_debug("Stop rebalance timeout, brutal kill pid = ~p", [Pid]),
@@ -870,11 +904,12 @@ rebalancing(stop_rebalance, From,
870904
#rebalancing_state{rebalancer = Pid} = State) ->
871905
?log_debug("Sending stop to rebalancer: ~p", [Pid]),
872906
{keep_state, stop_rebalance(State, user_stop), [{reply, From, ok}]};
873-
rebalancing(rebalance_progress, From,
874-
#rebalancing_state{progress = Progress}) ->
875-
AggregatedProgress =
876-
dict:to_list(rebalance_progress:get_progress(Progress)),
877-
{keep_state_and_data, [{reply, From, {running, AggregatedProgress}}]};
907+
rebalancing(rebalance_progress, From, _State) ->
908+
%% Only expect this call if we are pre-madhatter.
909+
false = cluster_compat_mode:is_cluster_madhatter(),
910+
{keep_state_and_data,
911+
[{reply, From,
912+
{running, ns_rebalance_observer:get_aggregated_progress(2000)}}]};
878913
rebalancing(Event, From, _State) ->
879914
?log_warning("Got event ~p while rebalancing.", [Event]),
880915
{keep_state_and_data, [{reply, From, rebalance_running}]}.
@@ -944,10 +979,6 @@ do_request_janitor_run(Item, Fun, FsmState, State) ->
944979
end,
945980
{next_state, FsmState, State}.
946981

947-
-spec update_progress(service(), dict:dict()) -> ok.
948-
update_progress(Service, ServiceProgress) ->
949-
gen_statem:cast(?SERVER, {update_progress, Service, ServiceProgress}).
950-
951982
wait_for_nodes_loop(Nodes) ->
952983
receive
953984
{done, Node} ->
@@ -1142,8 +1173,14 @@ rebalance_completed_next_state({try_autofailover, From, Nodes}) ->
11421173
rebalance_completed_next_state(_) ->
11431174
{next_state, idle, #idle_state{}}.
11441175

1176+
terminate_observer(#rebalancing_state{rebalance_observer = undefined}) ->
1177+
ok;
1178+
terminate_observer(#rebalancing_state{rebalance_observer = ObserverPid}) ->
1179+
misc:unlink_terminate_and_wait(ObserverPid, kill).
1180+
11451181
handle_rebalance_completion(ExitReason, State) ->
11461182
cancel_stop_timer(State),
1183+
terminate_observer(State),
11471184
maybe_reset_autofailover_count(ExitReason, State),
11481185
maybe_reset_reprovision_count(ExitReason, State),
11491186
log_rebalance_completion(ExitReason, State),
@@ -1214,6 +1251,11 @@ log_abort_reason({try_autofailover, _, Nodes}, Type, Id) ->
12141251
"~s interrupted due to auto-failover of nodes ~p. "
12151252
"Operation Id = ~s",
12161253
[rebalance_type2text(Type), Nodes, Id]);
1254+
log_abort_reason({rebalance_observer_terminated, Reason}, Type, Id) ->
1255+
ale:error(?USER_LOGGER,
1256+
"~s interrupted as observer exited with reason ~p. "
1257+
"Operation Id = ~s",
1258+
[rebalance_type2text(Type), Reason, Id]);
12171259
log_abort_reason(user_stop, Type, Id) ->
12181260
ale:info(?USER_LOGGER,
12191261
"~s stopped by user. Operation Id = ~s",
@@ -1272,15 +1314,19 @@ maybe_start_service_upgrader(normal, {changed, OldVersion, NewVersion},
12721314
ale:info(?USER_LOGGER,
12731315
"Starting upgrade for the following services: ~p",
12741316
[Services]),
1317+
Type = service_upgrade,
1318+
NodesInfo = [{active_nodes, KeepNodes},
1319+
{keep_nodes, KeepNodes}],
1320+
{ok, ObserverPid} = ns_rebalance_observer:start_link(
1321+
Services,
1322+
NodesInfo,
1323+
Type),
12751324
Pid = start_service_upgrader(KeepNodes, Services),
12761325

1277-
Type = service_upgrade,
12781326
set_rebalance_status(Type, running, Pid),
12791327
ns_cluster:counter_inc(Type, start),
1280-
Progress = rebalance_progress:init(KeepNodes, Services),
1281-
12821328
NewState = State#rebalancing_state{type = Type,
1283-
progress = Progress,
1329+
rebalance_observer = ObserverPid,
12841330
rebalancer = Pid},
12851331

12861332
{started, NewState}

src/ns_rebalance_observer.erl

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020

2121
-include("ns_common.hrl").
2222

23-
-export([start_link/1, get_detailed_progress/0]).
23+
-export([start_link/3,
24+
get_detailed_progress/0,
25+
get_aggregated_progress/1,
26+
update_progress/2]).
2427

2528
%% gen_server callbacks
2629
-export([code_change/3, init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -41,22 +44,36 @@
4144
-record(state, {bucket :: bucket_name() | undefined,
4245
buckets_count :: pos_integer(),
4346
bucket_number :: non_neg_integer(),
47+
progress :: rebalance_progress:progress(),
48+
nodes_info :: [{atom(), [node()]}],
49+
type :: atom(),
4450
done_moves :: [#move_state{}],
4551
current_moves :: [#move_state{}],
4652
pending_moves :: [#move_state{}]
4753
}).
4854

49-
start_link(BucketsCount) ->
50-
gen_server:start_link(?SERVER, ?MODULE, BucketsCount, []).
55+
start_link(Services, NodesInfo, Type) ->
56+
gen_server:start_link(?SERVER, ?MODULE, {Services, NodesInfo, Type}, []).
5157

52-
get_detailed_progress() ->
58+
generic_get_call(Call) ->
59+
generic_get_call(Call, 10000).
60+
generic_get_call(Call, Timeout) ->
5361
try
54-
gen_server:call(?SERVER, get_detailed_progress, 10000)
62+
gen_server:call(?SERVER, Call, Timeout)
5563
catch
5664
exit:_Reason ->
5765
not_running
5866
end.
5967

68+
get_detailed_progress() ->
69+
generic_get_call(get_detailed_progress).
70+
71+
get_aggregated_progress(Timeout) ->
72+
generic_get_call(get_aggregated_progress, Timeout).
73+
74+
update_progress(Service, ServiceProgress) ->
75+
gen_server:cast(?SERVER, {update_progress, Service, ServiceProgress}).
76+
6077
is_interesting_master_event({_, bucket_rebalance_started, _Bucket, _Pid}) ->
6178
fun handle_bucket_rebalance_started/2;
6279
is_interesting_master_event({_, set_ff_map, _BucketName, _Diff}) ->
@@ -68,8 +85,7 @@ is_interesting_master_event({_, vbucket_move_done, _BucketName, _VBucketId}) ->
6885
is_interesting_master_event(_) ->
6986
undefined.
7087

71-
72-
init(BucketsCount) ->
88+
init({Services, NodesInfo, Type}) ->
7389
Self = self(),
7490
ns_pubsub:subscribe_link(master_activity_events,
7591
fun (Event, _Ignored) ->
@@ -81,11 +97,17 @@ init(BucketsCount) ->
8197
end
8298
end, []),
8399

100+
{active_nodes, ActiveNodes} = lists:keyfind(active_nodes, 1, NodesInfo),
101+
Progress = rebalance_progress:init(ActiveNodes, Services),
102+
BucketsCount = length(ns_bucket:get_buckets()),
84103
proc_lib:spawn_link(erlang, apply, [fun docs_left_updater_init/1, [Self]]),
85104

86105
{ok, #state{bucket = undefined,
87106
buckets_count = BucketsCount,
88107
bucket_number = 0,
108+
progress = Progress,
109+
nodes_info = NodesInfo,
110+
type = Type,
89111
done_moves = [],
90112
current_moves = [],
91113
pending_moves = []}}.
@@ -94,6 +116,9 @@ handle_call(get, _From, State) ->
94116
{reply, State, State};
95117
handle_call(get_detailed_progress, _From, State) ->
96118
{reply, do_get_detailed_progress(State), State};
119+
handle_call(get_aggregated_progress, _From,
120+
#state{progress = Progress} = State) ->
121+
{reply, dict:to_list(rebalance_progress:get_progress(Progress)), State};
97122
handle_call(Req, From, State) ->
98123
?log_error("Got unknown request: ~p from ~p", [Req, From]),
99124
{reply, unknown_request, State}.
@@ -147,6 +172,11 @@ handle_cast({update_stats, VBucket, NodeToDocsLeft}, State) ->
147172
Move#move_state{stats = NewStats}
148173
end)};
149174

175+
handle_cast({update_progress, Service, ServiceProgress},
176+
#state{progress = Old} = State) ->
177+
NewProgress = rebalance_progress:update(Service, ServiceProgress, Old),
178+
{noreply, State#state{progress = NewProgress}};
179+
150180
handle_cast(Req, _State) ->
151181
?log_error("Got unknown cast: ~p", [Req]),
152182
erlang:error({unknown_cast, Req}).

src/ns_rebalancer.erl

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,7 @@ rebalance_topology_aware_services(Config, Services, KeepNodesAll, EjectNodesAll)
607607
rebalance_topology_aware_service(Service, KeepNodes, EjectNodes, DeltaNodes) ->
608608
ProgressCallback =
609609
fun (Progress) ->
610-
ns_orchestrator:update_progress(Service, Progress)
610+
ns_rebalance_observer:update_progress(Service, Progress)
611611
end,
612612

613613
misc:with_trap_exit(
@@ -758,7 +758,7 @@ make_progress_fun(BucketCompletion, NumBuckets) ->
758758
end.
759759

760760
update_kv_progress(Progress) ->
761-
ns_orchestrator:update_progress(kv, Progress).
761+
ns_rebalance_observer:update_progress(kv, Progress).
762762

763763
update_kv_progress(Nodes, Progress) ->
764764
update_kv_progress(dict:from_list([{N, Progress} || N <- Nodes])).
@@ -777,8 +777,6 @@ rebalance_kv(KeepNodes, EjectNodes, BucketConfigs, DeltaRecoveryBuckets) ->
777777
exit(Error)
778778
end,
779779

780-
{ok, RebalanceObserver} = ns_rebalance_observer:start_link(length(BucketConfigs)),
781-
782780
lists:foreach(fun ({I, {BucketName, BucketConfig}}) ->
783781
BucketCompletion = I / NumBuckets,
784782
update_kv_progress(LiveKVNodes, BucketCompletion),
@@ -788,8 +786,7 @@ rebalance_kv(KeepNodes, EjectNodes, BucketConfigs, DeltaRecoveryBuckets) ->
788786
KeepKVNodes, EjectNodes, DeltaRecoveryBuckets)
789787
end, misc:enumerate(BucketConfigs, 0)),
790788

791-
update_kv_progress(LiveKVNodes, 1.0),
792-
misc:unlink_terminate_and_wait(RebalanceObserver, shutdown).
789+
update_kv_progress(LiveKVNodes, 1.0).
793790

794791
rebalance_bucket(BucketName, BucketConfig, ProgressFun,
795792
KeepKVNodes, EjectNodes, DeltaRecoveryBuckets) ->

0 commit comments

Comments
 (0)