1919-behavior (gen_server ).
2020
2121-include (" ns_common.hrl" ).
22+ -include (" cut.hrl" ).
2223
2324-export ([start_link /3 ,
2425 get_detailed_progress /0 ,
3435-define (SERVER , {via , leader_registry , ? MODULE }).
3536-define (DOCS_LEFT_REFRESH_INTERVAL , 5000 ).
3637
38+ -record (stat_info , {start_time = false ,
39+ end_time = false }).
40+
3741-record (replica_building_stats , {node :: node (),
3842 docs_total :: non_neg_integer (),
3943 docs_left :: non_neg_integer ()}).
4044
41- -record (vbucket_info , {vbucket :: vbucket_id (),
42- before_chain :: [node ()],
45+ -record (vbucket_info , {before_chain :: [node ()],
4346 after_chain :: [node ()],
44- stats :: [# replica_building_stats {}]}).
47+ stats :: [# replica_building_stats {}],
48+ move = # stat_info {}}).
4549
4650-record (state , {bucket :: bucket_name () | undefined ,
4751 buckets_count :: pos_integer (),
4852 bucket_number :: non_neg_integer (),
4953 stage_info :: rebalance_progress :stage_info (),
5054 nodes_info :: [{atom (), [node ()]}],
5155 type :: atom (),
52- done_moves :: [# vbucket_info {}],
53- current_moves :: [# vbucket_info {}],
54- pending_moves :: [# vbucket_info {}]
55- }).
56+ moves :: dict :dict ()}).
5657
5758start_link (Stages , NodesInfo , Type ) ->
5859 gen_server :start_link (? SERVER , ? MODULE , {Stages , NodesInfo , Type }, []).
@@ -135,7 +136,7 @@ init({Services, NodesInfo, Type}) ->
135136 end , []),
136137
137138 StageInfo = rebalance_stage_info :init (get_stage_nodes (Services , NodesInfo )),
138- BucketsCount = length (ns_bucket :get_buckets ()),
139+ BucketsCount = length (ns_bucket :get_bucket_names ()),
139140 proc_lib :spawn_link (erlang , apply , [fun docs_left_updater_init /1 , [Self ]]),
140141
141142 {ok , # state {bucket = undefined ,
@@ -144,9 +145,7 @@ init({Services, NodesInfo, Type}) ->
144145 stage_info = StageInfo ,
145146 nodes_info = NodesInfo ,
146147 type = Type ,
147- done_moves = [],
148- current_moves = [],
149- pending_moves = []}}.
148+ moves = dict :new ()}}.
150149
151150handle_call (get , _From , State ) ->
152151 {reply , State , State };
@@ -296,18 +295,14 @@ initiate_bucket_rebalance(BucketName, OldState) ->
296295 end || Replica <- ChainAfter ,
297296 Replica =/= undefined ,
298297 Replica =/= MasterNode ],
299- # vbucket_info {vbucket = VB ,
300- before_chain = ChainBefore ,
301- after_chain = ChainAfter ,
302- stats = RBStats }
298+ {VB , # vbucket_info {before_chain = ChainBefore ,
299+ after_chain = ChainAfter ,
300+ stats = RBStats }}
303301 end || {VB , [MasterNode |_ ] = ChainBefore , ChainAfter } <- Diff ],
304302
305303 ? log_debug (" Moves:~n~p " , [Moves ]),
306-
307304 OldState # state {bucket = BucketName ,
308- done_moves = [],
309- current_moves = [],
310- pending_moves = Moves }.
305+ moves = dict :from_list (Moves )}.
311306
312307handle_rebalance_stage_started ({TS , rebalance_stage_started , Stage },
313308 # state {stage_info = Old } = State ) ->
@@ -334,57 +329,27 @@ handle_bucket_rebalance_started({_, bucket_rebalance_started, _BucketName, _Pid}
334329handle_set_ff_map ({_ , set_ff_map , BucketName , _Diff }, State ) ->
335330 {noreply , initiate_bucket_rebalance (BucketName , State )}.
336331
337- handle_vbucket_move_start ({_ , vbucket_move_start , _Pid , _BucketName , _Node , VBucketId , _ , _ } = Ev , State ) ->
338- case ensure_not_pending (State , VBucketId ) of
339- State ->
340- ? log_error (" Weird vbucket move start for move not in pending moves: ~p " , [Ev ]),
341- {noreply , State };
342- NewState ->
343- ? log_debug (" Noted vbucket move start (vbucket ~p )" , [VBucketId ]),
344- {noreply , NewState }
345- end .
332+ handle_vbucket_move_start ({TS , vbucket_move_start , _Pid , _BucketName ,
333+ _Node , VBucketId , _ , _ },
334+ State ) ->
335+ ? log_debug (" Noted vbucket move start (vbucket ~p )" , [VBucketId ]),
336+ {noreply , update_info (vbucket_move_start , State ,
337+ {TS , VBucketId })}.
346338
347- handle_vbucket_move_done ({_ , vbucket_move_done , _BucketName , VBucket } = Ev , State ) ->
339+ handle_vbucket_move_done ({TS , vbucket_move_done , _BucketName , VBucket },
340+ State ) ->
348341 State1 = update_move (State , VBucket ,
349342 fun (# vbucket_info {stats = Stats } = Move ) ->
350343 Stats1 = [S # replica_building_stats {docs_left = 0 } ||
351344 S <- Stats ],
352345 Move # vbucket_info {stats = Stats1 }
353346 end ),
354- case ensure_not_current (State1 , VBucket ) of
355- State1 ->
356- ? log_error (" Weird vbucket_move_done for move not in current_moves: ~p " , [Ev ]),
357- {noreply , State1 };
358- NewState ->
359- ? log_debug (" Noted vbucket move end (vbucket ~p )" , [VBucket ]),
360- {noreply , NewState }
361- end .
362-
363- move_the_move (State , VBucketId , From , To ) ->
364- case lists :keytake (VBucketId , # vbucket_info .vbucket , erlang :element (From , State )) of
365- false ->
366- State ;
367- {value , Move , RestFrom } ->
368- OldTo = erlang :element (To , State ),
369- State1 = erlang :setelement (To , State , [Move | OldTo ]),
370- erlang :setelement (From , State1 , RestFrom )
371- end .
372-
373- ensure_not_pending (State , VBucketId ) ->
374- move_the_move (State , VBucketId , # state .pending_moves , # state .current_moves ).
375-
376- ensure_not_current (State , VBucketId ) ->
377- move_the_move (State , VBucketId , # state .current_moves , # state .done_moves ).
347+ ? log_debug (" Noted vbucket move end (vbucket ~p )" , [VBucket ]),
348+ {noreply , update_info (vbucket_move_done , State1 ,
349+ {TS , VBucket })}.
378350
379- update_move (# state {current_moves = Moves } = State , VBucket , Fun ) ->
380- NewCurrent =
381- [case Move # vbucket_info .vbucket =:= VBucket of
382- false ->
383- Move ;
384- _ ->
385- Fun (Move )
386- end || Move <- Moves ],
387- State # state {current_moves = NewCurrent }.
351+ update_move (State , VBucket , Fun ) ->
352+ update_all_vb_info (State , dict :update (VBucket , Fun , get_all_vb_info (State ))).
388353
389354handle_info (Msg , State ) ->
390355 ? log_error (" Got unexpected message: ~p " , [Msg ]),
@@ -401,31 +366,32 @@ docs_left_updater_init(Parent) ->
401366 docs_left_updater_loop (Parent ).
402367
403368docs_left_updater_loop (Parent ) ->
404- # state {current_moves = CurrentMoves ,
405- bucket = BucketName } = gen_server :call (Parent , get , infinity ),
369+ State = gen_server :call (Parent , get , infinity ),
370+ BucketName = State # state .bucket ,
371+ Moves = dict :to_list (get_all_vb_info (State )),
406372 case BucketName of
407373 undefined ->
408374 ok ;
409375 _ ->
410- ? log_debug (" Starting docs_left_updater_loop:~p~n~p " , [BucketName , CurrentMoves ])
376+ ? log_debug (" Starting docs_left_updater_loop:~p~n~p " ,
377+ [BucketName , Moves ])
411378 end ,
412- [update_docs_left_for_move (Parent , BucketName , M ) || M <- CurrentMoves ],
379+ [update_docs_left_for_move (Parent , BucketName , VB , VBInfo ) ||
380+ {VB , VBInfo } <- Moves ],
413381 receive
414382 refresh ->
415383 _Lost = misc :flush (refresh ),
416384 docs_left_updater_loop (Parent )
417385 end .
418386
419- get_docs_estimate (BucketName , # vbucket_info {vbucket = VBucket ,
420- before_chain = [MasterNode |_ ],
421- stats = RStats }) ->
387+ get_docs_estimate (BucketName , VBucket , # vbucket_info {before_chain = [MasterNode |_ ],
388+ stats = RStats }) ->
422389 ReplicaNodes = [S # replica_building_stats .node || S <- RStats ],
423390 janitor_agent :get_dcp_docs_estimate (BucketName , MasterNode , VBucket , ReplicaNodes ).
424391
425- update_docs_left_for_move (Parent , BucketName ,
426- # vbucket_info {vbucket = VBucket ,
427- stats = RStats } = MoveState ) ->
428- try get_docs_estimate (BucketName , MoveState ) of
392+ update_docs_left_for_move (Parent , BucketName , VBucket ,
393+ # vbucket_info {stats = RStats } = MoveState ) ->
394+ try get_docs_estimate (BucketName , VBucket , MoveState ) of
429395 NewLefts ->
430396 Stuff =
431397 lists :flatmap (
@@ -466,15 +432,26 @@ keygroup_sorted(Items) ->
466432 end , [], Items ).
467433
468434
469- do_get_detailed_progress (# state {bucket = undefined }) ->
435+ do_get_detailed_progress (# state {bucket = undefined }) ->
470436 not_running ;
471- do_get_detailed_progress (# state {bucket = Bucket ,
472- buckets_count = BucketsCount ,
473- bucket_number = BucketNumber ,
474- current_moves = CurrentMoves ,
475- pending_moves = PendingMoves ,
476- done_moves = DoneMoves }) ->
477- AllMoves = lists :append ([CurrentMoves , PendingMoves , DoneMoves ]),
437+ do_get_detailed_progress (# state {bucket = Bucket ,
438+ buckets_count = BucketsCount ,
439+ bucket_number = BucketNumber } = State ) ->
440+ AllMoves = get_all_vb_info (State ),
441+ {CurrentMoves , PendingMoves } =
442+ dict :fold (
443+ fun (_ , # vbucket_info {move = MoveStat } = VBInfo , {CM , PM }) ->
444+ case {MoveStat # stat_info .start_time ,
445+ MoveStat # stat_info .end_time } of
446+ {false , _ } ->
447+ {CM , [VBInfo | PM ]};
448+ {_ , false } ->
449+ {[VBInfo | CM ], PM };
450+ {_ , _ } ->
451+ {CM , PM }
452+ end
453+ end , {[], []}, AllMoves ),
454+
478455 {OutMovesStats , InMovesStats } = moves_stats (AllMoves ),
479456
480457 Inc = fun (undefined , Dict ) ->
@@ -547,9 +524,9 @@ do_get_detailed_progress(#state{bucket=Bucket,
547524
548525
549526moves_stats (Moves ) ->
550- lists : foldl (
551- fun (# vbucket_info {stats = Stats ,
552- before_chain = [OldMaster |_ ]}, Acc ) ->
527+ dict : fold (
528+ fun (_ , # vbucket_info {stats = Stats ,
529+ before_chain = [OldMaster |_ ]}, Acc ) ->
553530 true = (OldMaster =/= undefined ),
554531
555532 lists :foldl (
@@ -571,3 +548,20 @@ moves_stats(Moves) ->
571548 {AccOut1 , AccIn1 }
572549 end , Acc , Stats )
573550 end , {dict :new (), dict :new ()}, Moves ).
551+
552+ update_info (vbucket_move_done , State , {TS , VB }) ->
553+ update_move (State , VB ,
554+ fun (# vbucket_info {move = Move } = VBInfo ) ->
555+ VBInfo # vbucket_info {move = Move # stat_info {end_time = TS }}
556+ end );
557+ update_info (vbucket_move_start , State , {TS , VB }) ->
558+ update_move (State , VB ,
559+ fun (VBInfo ) ->
560+ VBInfo # vbucket_info {move = # stat_info {start_time = TS }}
561+ end ).
562+
563+ get_all_vb_info (# state {moves = Moves }) ->
564+ Moves .
565+
566+ update_all_vb_info (State , NewMoves ) ->
567+ State # state {moves = NewMoves }.
0 commit comments