4747 stats :: [# replica_building_stats {}],
4848 move = # stat_info {}}).
4949
50+ -record (total_stat_info , {total_time = 0 ,
51+ completed_count = 0 }).
52+
53+ -record (vbucket_level_info , {move = # total_stat_info {},
54+ vbucket_info = dict :new ()}).
55+
56+ -record (bucket_level_info , {bucket_name ,
57+ vbucket_level_info = # vbucket_level_info {}}).
58+
5059-record (state , {bucket :: bucket_name () | undefined ,
5160 buckets_count :: pos_integer (),
5261 bucket_number :: non_neg_integer (),
5362 stage_info :: rebalance_progress :stage_info (),
5463 nodes_info :: [{atom (), [node ()]}],
5564 type :: atom (),
56- moves :: dict :dict ()}).
65+ bucket_info :: dict :dict ()}).
5766
5867start_link (Stages , NodesInfo , Type ) ->
5968 gen_server :start_link (? SERVER , ? MODULE , {Stages , NodesInfo , Type }, []).
@@ -136,7 +145,11 @@ init({Services, NodesInfo, Type}) ->
136145 end , []),
137146
138147 StageInfo = rebalance_stage_info :init (get_stage_nodes (Services , NodesInfo )),
139- BucketsCount = length (ns_bucket :get_bucket_names ()),
148+ Buckets = ns_bucket :get_bucket_names (),
149+ BucketsCount = length (Buckets ),
150+ BucketLevelInfo = dict :from_list ([{BN ,
151+ # bucket_level_info {bucket_name = BN }} ||
152+ BN <- Buckets ]),
140153 proc_lib :spawn_link (erlang , apply , [fun docs_left_updater_init /1 , [Self ]]),
141154
142155 {ok , # state {bucket = undefined ,
@@ -145,7 +158,7 @@ init({Services, NodesInfo, Type}) ->
145158 stage_info = StageInfo ,
146159 nodes_info = NodesInfo ,
147160 type = Type ,
148- moves = dict : new () }}.
161+ bucket_info = BucketLevelInfo }}.
149162
150163handle_call (get , _From , State ) ->
151164 {reply , State , State };
@@ -165,10 +178,10 @@ handle_cast({note, Fun, Ev}, State) ->
165178 {noreply , NewState } = Fun (Ev , State ),
166179 {noreply , NewState };
167180
168- handle_cast ({update_stats , VBucket , NodeToDocsLeft }, State ) ->
181+ handle_cast ({update_stats , BucketName , VBucket , NodeToDocsLeft }, State ) ->
169182 ? log_debug (" Got update_stats: ~p , ~p " , [VBucket , NodeToDocsLeft ]),
170183 {noreply , update_move (
171- State , VBucket ,
184+ State , BucketName , VBucket ,
172185 fun (Move ) ->
173186 NewStats =
174187 [case lists :keyfind (Stat # replica_building_stats .node , 1 , NodeToDocsLeft ) of
@@ -301,8 +314,8 @@ initiate_bucket_rebalance(BucketName, OldState) ->
301314 end || {VB , [MasterNode |_ ] = ChainBefore , ChainAfter } <- Diff ],
302315
303316 ? log_debug (" Moves:~n~p " , [Moves ]),
304- OldState # state { bucket = BucketName ,
305- moves = dict : from_list ( Moves ) }.
317+ TmpState = update_all_vb_info ( OldState , BucketName , dict : from_list ( Moves )) ,
318+ TmpState # state { bucket = BucketName }.
306319
307320handle_rebalance_stage_started ({TS , rebalance_stage_started , Stage },
308321 # state {stage_info = Old } = State ) ->
@@ -329,27 +342,29 @@ handle_bucket_rebalance_started({_, bucket_rebalance_started, _BucketName, _Pid}
329342handle_set_ff_map ({_ , set_ff_map , BucketName , _Diff }, State ) ->
330343 {noreply , initiate_bucket_rebalance (BucketName , State )}.
331344
332- handle_vbucket_move_start ({TS , vbucket_move_start , _Pid , _BucketName ,
345+ handle_vbucket_move_start ({TS , vbucket_move_start , _Pid , BucketName ,
333346 _Node , VBucketId , _ , _ },
334347 State ) ->
335348 ? log_debug (" Noted vbucket move start (vbucket ~p )" , [VBucketId ]),
336349 {noreply , update_info (vbucket_move_start , State ,
337- {TS , VBucketId })}.
350+ {TS , BucketName , VBucketId })}.
338351
339- handle_vbucket_move_done ({TS , vbucket_move_done , _BucketName , VBucket },
352+ handle_vbucket_move_done ({TS , vbucket_move_done , BucketName , VBucket },
340353 State ) ->
341- State1 = update_move (State , VBucket ,
354+ State1 = update_move (State , BucketName , VBucket ,
342355 fun (# vbucket_info {stats = Stats } = Move ) ->
343356 Stats1 = [S # replica_building_stats {docs_left = 0 } ||
344357 S <- Stats ],
345358 Move # vbucket_info {stats = Stats1 }
346359 end ),
347360 ? log_debug (" Noted vbucket move end (vbucket ~p )" , [VBucket ]),
348361 {noreply , update_info (vbucket_move_done , State1 ,
349- {TS , VBucket })}.
362+ {TS , BucketName , VBucket })}.
350363
351- update_move (State , VBucket , Fun ) ->
352- update_all_vb_info (State , dict :update (VBucket , Fun , get_all_vb_info (State ))).
364+ update_move (State , BucketName , VBucket , Fun ) ->
365+ update_all_vb_info (State , BucketName ,
366+ dict :update (VBucket , Fun ,
367+ get_all_vb_info (State , BucketName ))).
353368
354369handle_info (Msg , State ) ->
355370 ? log_error (" Got unexpected message: ~p " , [Msg ]),
@@ -368,7 +383,7 @@ docs_left_updater_init(Parent) ->
368383docs_left_updater_loop (Parent ) ->
369384 State = gen_server :call (Parent , get , infinity ),
370385 BucketName = State # state .bucket ,
371- Moves = dict :to_list (get_all_vb_info (State )),
386+ Moves = dict :to_list (get_all_vb_info (State , BucketName )),
372387 case BucketName of
373388 undefined ->
374389 ok ;
@@ -413,7 +428,8 @@ update_docs_left_for_move(Parent, BucketName, VBucket,
413428 [] ->
414429 ok ;
415430 _ ->
416- gen_server :cast (Parent , {update_stats , VBucket , Stuff })
431+ gen_server :cast (Parent ,
432+ {update_stats , BucketName , VBucket , Stuff })
417433 end
418434 catch error :{janitor_agent_servant_died , _ } ->
419435 ? log_debug (" Apparently move of ~p is already done" , [VBucket ]),
@@ -437,7 +453,7 @@ do_get_detailed_progress(#state{bucket = undefined}) ->
437453do_get_detailed_progress (# state {bucket = Bucket ,
438454 buckets_count = BucketsCount ,
439455 bucket_number = BucketNumber } = State ) ->
440- AllMoves = get_all_vb_info (State ),
456+ AllMoves = get_all_vb_info (State , Bucket ),
441457 {CurrentMoves , PendingMoves } =
442458 dict :fold (
443459 fun (_ , # vbucket_info {move = MoveStat } = VBInfo , {CM , PM }) ->
@@ -549,19 +565,98 @@ moves_stats(Moves) ->
549565 end , Acc , Stats )
550566 end , {dict :new (), dict :new ()}, Moves ).
551567
552- update_info (vbucket_move_done , State , {TS , VB }) ->
553- update_move (State , VB ,
554- fun (# vbucket_info {move = Move } = VBInfo ) ->
555- VBInfo # vbucket_info {move = Move # stat_info {end_time = TS }}
556- end );
557- update_info (vbucket_move_start , State , {TS , VB }) ->
558- update_move (State , VB ,
559- fun (VBInfo ) ->
560- VBInfo # vbucket_info {move = # stat_info {start_time = TS }}
561- end ).
562-
563- get_all_vb_info (# state {moves = Moves }) ->
564- Moves .
565-
566- update_all_vb_info (State , NewMoves ) ->
567- State # state {moves = NewMoves }.
568+ update_info (Event ,
569+ # state {bucket_info = OldBucketLevelInfo } = State ,
570+ {_TS , BucketName , _VB } = UpdateArgs ) ->
571+ NewBucketLevelInfo =
572+ dict :update (
573+ BucketName ,
574+ fun (BLI ) ->
575+ update_vbucket_level_info (Event , BLI , UpdateArgs )
576+ end , OldBucketLevelInfo ),
577+ State # state {bucket_info = NewBucketLevelInfo }.
578+
579+ get_all_vb_info (_ , undefined ) ->
580+ dict :new ();
581+ get_all_vb_info (# state {bucket_info = BucketInfo }, BucketName ) ->
582+ {ok , BucketLevelInfo } = dict :find (BucketName , BucketInfo ),
583+ get_all_vb_info (BucketLevelInfo ).
584+
585+ get_all_vb_info (BucketLevelInfo ) ->
586+ BucketLevelInfo # bucket_level_info .vbucket_level_info # vbucket_level_info .vbucket_info .
587+
588+ update_all_vb_info (# state {bucket_info = OldBucketLevelInfo } = State , BucketName ,
589+ NewAllVBInfo ) ->
590+ NewBucketLevelInfo = dict :update (BucketName ,
591+ ? cut (update_all_vb_info (_ , NewAllVBInfo )),
592+ OldBucketLevelInfo ),
593+ State # state {bucket_info = NewBucketLevelInfo }.
594+
595+ update_all_vb_info (# bucket_level_info {
596+ vbucket_level_info = VBLevelInfo } = BucketLevelInfo ,
597+ AllVBInfo ) ->
598+ NewVBLevelInfo = VBLevelInfo # vbucket_level_info {vbucket_info = AllVBInfo },
599+ BucketLevelInfo # bucket_level_info {vbucket_level_info = NewVBLevelInfo }.
600+
601+ update_vbucket_level_info (Event , BucketLevelInfo ,
602+ {_TS , _BucketName , VB } = UpdateArgs ) ->
603+ AllVBInfo = get_all_vb_info (BucketLevelInfo ),
604+ case dict :find (VB , AllVBInfo ) of
605+ {ok , VBInfo } ->
606+ NewVBInfo = update_vbucket_info (Event , VBInfo , UpdateArgs ),
607+ BLI = update_vbucket_level_info_inner (Event , BucketLevelInfo , NewVBInfo ),
608+ update_all_vb_info (BLI , dict :store (VB , NewVBInfo , AllVBInfo ));
609+ _ ->
610+ BucketLevelInfo
611+ end .
612+
613+ find_event_action (Event ) ->
614+ % % {Event, TotalElement, StatElement, stat_op}
615+ EventAction = [
616+ {vbucket_move_start , undefined ,
617+ # vbucket_info .move , start_time },
618+ {vbucket_move_done , # vbucket_level_info .move ,
619+ # vbucket_info .move , end_time }
620+ ],
621+ lists :keyfind (Event , 1 , EventAction ).
622+
623+ update_stat (start_time , _ , TS ) ->
624+ # stat_info {start_time = TS };
625+ update_stat (end_time , Stat , TS ) ->
626+ Stat # stat_info {end_time = TS }.
627+
628+ update_vbucket_info (Event , VBInfo , {TS , _Bucket , _VB }) ->
629+ case find_event_action (Event ) of
630+ false ->
631+ VBInfo ;
632+ {Event , _TotalElement , StatElement , StatOp } ->
633+ misc :update_field (StatElement , VBInfo ,
634+ ? cut (update_stat (StatOp , _ , TS )))
635+ end .
636+
637+ update_total_stat (TotalStat , # stat_info {end_time = false }) ->
638+ TotalStat ;
639+ update_total_stat (# total_stat_info {total_time = TT , completed_count = C },
640+ # stat_info {start_time = ST , end_time = ET }) ->
641+ NewTime = rebalance_stage_info :diff_timestamp (ET , ST ),
642+ # total_stat_info {total_time = TT + NewTime ,
643+ completed_count = C + 1 }.
644+
645+ update_vbucket_level_info_inner (
646+ Event ,
647+ # bucket_level_info {vbucket_level_info = VBLevelInfo } = BucketLevelInfo ,
648+ NewVBInfo ) ->
649+ case find_event_action (Event ) of
650+ false ->
651+ BucketLevelInfo ;
652+ {_ , undefined , _ , _ } ->
653+ BucketLevelInfo ;
654+ {Event , TotalElement , StatElement , _StatOp } ->
655+ Stat = erlang :element (StatElement , NewVBInfo ),
656+ TotalInfo = erlang :element (TotalElement , VBLevelInfo ),
657+ NewTotalInfo = update_total_stat (TotalInfo , Stat ),
658+ NewVBLevelInfo = erlang :setelement (TotalElement , VBLevelInfo ,
659+ NewTotalInfo ),
660+ BucketLevelInfo # bucket_level_info {
661+ vbucket_level_info = NewVBLevelInfo }
662+ end .
0 commit comments