1515% %
1616-module (rebalance_stage_info ).
1717
18- -export ([init /2 , get_progress /1 , update_progress /3 ]).
18+ -export ([init /1 ,
19+ get_stage_info /1 ,
20+ get_progress /1 ,
21+ update_progress /3 ,
22+ update_stage_info /3 ,
23+ diff_timestamp /2 ,
24+ binarify_timestamp /1 ]).
1925-export_type ([stage_info / 0 ]).
2026
27+ -record (stage_details , {
28+ start_time = false ,
29+ complete_time = false ,
30+ sub_stages = [] :: [{atom (), # stage_details {}}],
31+ notable_events = []
32+ }).
33+
2134-record (stage_info , {
2235 per_stage_progress :: dict :dict (),
23- aggregated :: dict :dict ()
36+ % % aggregated needed for backward compatibility.
37+ aggregated :: dict :dict (),
38+ per_stage_info :: [{atom (), # stage_details {}}]
2439 }).
2540
2641-type stage_info () :: # stage_info {}.
2742
28- init (LiveNodes , Stages ) ->
29- do_init ([{S , ns_cluster_membership :service_nodes (LiveNodes , S )} ||
30- S <- Stages ]).
31-
32- do_init (Stages ) ->
33- aggregate (init_per_stage (Stages )).
43+ init (Stages ) ->
44+ PerStageProgress = dict :from_list (init_per_stage_progress (Stages )),
45+ Aggregated = aggregate (PerStageProgress ),
46+ StageInfo = init_per_stage_info (Stages ),
47+ # stage_info {per_stage_progress = PerStageProgress ,
48+ aggregated = Aggregated ,
49+ per_stage_info = StageInfo }.
3450
35- init_per_stage (Stages ) ->
36- dict :from_list ([{Stage , init_stage (Nodes )} ||
37- {Stage , Nodes } <- Stages ]).
51+ init_per_stage_progress (Stages ) ->
52+ lists :flatten ([init_stage_progress (S , N , SS ) || {S , N , SS } <- Stages ]).
3853
39- init_stage (Nodes ) ->
40- dict :from_list ([{N , 0 } || N <- Nodes ]).
54+ init_stage_progress (_Stage , [], _SubStage ) ->
55+ [];
56+ init_stage_progress (Stage , Nodes , SubStages ) ->
57+ SubStageNodes = init_per_stage_progress (SubStages ),
58+ [{Stage , dict :from_list ([{N , 0 } || N <- Nodes ])} | SubStageNodes ].
4159
60+ % % For backward compatibility.
4261get_progress (# stage_info {aggregated = Aggregated }) ->
4362 Aggregated .
4463
45- update_progress (Stage , StageProgress ,
46- # stage_info {per_stage_progress = PerStage }) ->
47- aggregate (do_update_progress (Stage , StageProgress , PerStage )).
64+ init_per_stage_info (Stages ) ->
65+ [{Stage , # stage_details {
66+ start_time = false ,
67+ complete_time = false ,
68+ sub_stages = init_per_stage_info (SubStages ),
69+ notable_events = []
70+ }} || {Stage , Nodes , SubStages } <- Stages , Nodes =/= []].
71+
72+ update_progress (
73+ Stage , StageProgress ,
74+ # stage_info {per_stage_progress = OldPerStageProgress } = StageInfo ) ->
75+ NewPerStageProgress = do_update_progress (Stage , StageProgress ,
76+ OldPerStageProgress ),
77+ Aggregated = aggregate (NewPerStageProgress ),
78+ StageInfo # stage_info {
79+ per_stage_progress = NewPerStageProgress ,
80+ aggregated = Aggregated }.
4881
4982do_update_progress (Stage , StageProgress , PerStage ) ->
5083 dict :update (Stage ,
@@ -55,23 +88,150 @@ do_update_progress(Stage, StageProgress, PerStage) ->
5588 end , PerStage ).
5689
5790aggregate (PerStage ) ->
58- Aggregated0 =
59- dict :fold (
60- fun (_ , StageProgress , AggAcc ) ->
61- dict :fold (
62- fun (Node , NodeProgress , Acc ) ->
63- misc :dict_update (
64- Node ,
65- fun ({Count , Sum }) ->
66- {Count + 1 , Sum + NodeProgress }
67- end , {0 , 0 }, Acc )
68- end , AggAcc , StageProgress )
69- end , dict :new (), PerStage ),
70-
71- Aggregated =
72- dict :map (fun (_ , {Count , Sum }) ->
73- Sum / Count
74- end , Aggregated0 ),
75-
76- # stage_info {per_stage_progress = PerStage ,
77- aggregated = Aggregated }.
91+ TmpAggr = dict :fold (
92+ fun (_ , StageProgress , AggAcc ) ->
93+ dict :fold (
94+ fun (Node , NodeProgress , Acc ) ->
95+ misc :dict_update (
96+ Node ,
97+ fun ({Count , Sum }) ->
98+ {Count + 1 , Sum + NodeProgress }
99+ end , {0 , 0 }, Acc )
100+ end , AggAcc , StageProgress )
101+ end , dict :new (), PerStage ),
102+
103+ dict :map (fun (_ , {Count , Sum }) ->
104+ Sum / Count
105+ end , TmpAggr ).
106+
107+ get_stage_info (StageInfo ) ->
108+ {get_per_stage_info (StageInfo )}.
109+
110+ diff_timestamp (false , false ) ->
111+ false ;
112+ diff_timestamp (false , StartTS ) ->
113+ diff_timestamp (os :timestamp (), StartTS );
114+ diff_timestamp (EndTS , StartTS ) ->
115+ round (timer :now_diff (EndTS , StartTS ) / 1000 ).
116+
117+ binarify_timestamp (false ) ->
118+ false ;
119+ binarify_timestamp (Time ) ->
120+ erlang :list_to_binary (misc :timestamp_local_iso8601 (Time )).
121+
122+ get_per_stage_info (# stage_info {
123+ per_stage_progress = PerStageProgress ,
124+ per_stage_info = PerStageInfo }) ->
125+ AllStageProgress = get_per_stage_progress (PerStageProgress ),
126+ lists :map (
127+ fun ({Stage , StageInfo }) ->
128+ construct_per_stage_json (AllStageProgress ,
129+ Stage ,
130+ StageInfo )
131+ end , PerStageInfo ).
132+
133+ construct_per_stage_json (AllStageProgress , Stage , StageInfo ) ->
134+ {ok , PerNodeProgress } = dict :find (Stage , AllStageProgress ),
135+ TotalStageProgress = case lists :foldl (fun ({_ , P }, {Total , Count }) ->
136+ {Total + P , Count + 1 }
137+ end , {0 , 0 }, PerNodeProgress ) of
138+ {_ , 0 } ->
139+ 0 ;
140+ {TotalProgress , NodesCount } ->
141+ TotalProgress * 100.0 / NodesCount
142+ end ,
143+ ProgressInfoJson = construct_per_stage_progress_json (TotalStageProgress ,
144+ PerNodeProgress ,
145+ StageInfo ),
146+ StageInfoJson = construct_per_stage_info_json (StageInfo , AllStageProgress ),
147+ {get_stage_name (Stage ), {ProgressInfoJson ++ StageInfoJson }}.
148+
149+ get_stage_name (Stage ) ->
150+ Stages = [{kv , data },
151+ {n1ql , query },
152+ {index , index },
153+ {fts , search },
154+ {cbas , analytics },
155+ {eventing , eventing },
156+ {kv_delta_recovery , deltaRecovery }],
157+ case lists :keyfind (Stage , 1 , Stages ) of
158+ {Stage , Val } -> Val ;
159+ false -> Stage
160+ end .
161+
162+ construct_per_stage_progress_json (
163+ TSP , _ , # stage_details {complete_time = false }) when TSP == 0 ->
164+ [];
165+ construct_per_stage_progress_json (TotalStageProgress , PerNodeProgress ,
166+ # stage_details {complete_time = false }) ->
167+ [{totalProgress , TotalStageProgress },
168+ {perNodeProgress , {PerNodeProgress }}];
169+ construct_per_stage_progress_json (_ , PerNodeProgress , _StageInfo ) ->
170+ Completed = [{N , 1.0 } || {N , _ } <- PerNodeProgress ],
171+ [{totalProgress , 100.0 },
172+ {perNodeProgress , {Completed }}].
173+
174+ construct_per_stage_info_json (# stage_details {
175+ start_time = StartTime ,
176+ complete_time = EndTime ,
177+ sub_stages = SubStages ,
178+ notable_events = NotableEvents },
179+ AllStageProgress ) ->
180+ SubStagesInfo = case SubStages of
181+ [] -> [];
182+ _ -> [{subStages ,
183+ {[construct_per_stage_json (AllStageProgress ,
184+ SubStage ,
185+ SubStageInfo ) ||
186+ {SubStage , SubStageInfo } <- SubStages ]}}]
187+ end ,
188+ Events = case NotableEvents of
189+ [] -> [];
190+ _ -> [{events , {NotableEvents }}]
191+ end ,
192+
193+ [{startTime , binarify_timestamp (StartTime )},
194+ {completedTime , binarify_timestamp (EndTime )},
195+ {timeTaken , diff_timestamp (EndTime , StartTime )}] ++ SubStagesInfo ++ Events .
196+
197+ get_per_stage_progress (PerStageProgress ) ->
198+ dict :map (fun (_ , StageProgress ) ->
199+ dict :to_list (StageProgress )
200+ end , PerStageProgress ).
201+
202+ update_stage_info ({started , Time }, StageInfo ) ->
203+ StageInfo # stage_details {start_time = Time ,
204+ complete_time = false };
205+ update_stage_info ({completed , Time }, StageInfo ) ->
206+ StageInfo # stage_details {complete_time = Time };
207+ update_stage_info ({notable_event , TS , Text },
208+ # stage_details {notable_events = NotableEvents } = StageInfo ) ->
209+ Time = binarify_timestamp (TS ),
210+ Msg = list_to_binary (Text ),
211+ StageInfo # stage_details {notable_events = [{Time , Msg } | NotableEvents ]}.
212+
213+ update_stage_info (Stage , StageInfoUpdate ,
214+ # stage_info {per_stage_info = PerStageInfo } = StageInfo ) ->
215+ NewPerStageInfo = update_stage_info_rec (Stage , StageInfoUpdate ,
216+ PerStageInfo ),
217+ StageInfo # stage_info {per_stage_info = NewPerStageInfo }.
218+
219+ update_stage_info_rec ([Stage | SubStages ], StageInfoUpdate , AllStageInfo ) ->
220+ case lists :keysearch (Stage , 1 , AllStageInfo ) of
221+ false ->
222+ AllStageInfo ;
223+ {value , {Stage , OldStageInfo }} ->
224+ NewStageInfo =
225+ case SubStages of
226+ [] ->
227+ update_stage_info (StageInfoUpdate , OldStageInfo );
228+ _ ->
229+ NewSubStages = update_stage_info_rec (
230+ SubStages ,
231+ StageInfoUpdate ,
232+ OldStageInfo # stage_details .sub_stages ),
233+
234+ OldStageInfo # stage_details {sub_stages = NewSubStages }
235+ end ,
236+ lists :keyreplace (Stage , 1 , AllStageInfo , {Stage , NewStageInfo })
237+ end .
0 commit comments