@@ -168,7 +168,7 @@ type allocatorState struct {
168168 // Option 1 Release mu frequently:
169169 //
170170 // The allocatorState is deliberately tightly coupled and we want it to be
171- // internally consistent. We expect allocatorState .rebalanceStores to be the
171+ // internally consistent. We expect clusterState .rebalanceStores to be the
172172 // longest lived holder of mu. We could modify it so that after it computes
173173 // the cluster means and computes the overloaded stores it releases mu, and
174174 // then reacquires it when trying to shed for each overloaded store. Note
@@ -252,10 +252,10 @@ var mmaid = atomic.Int64{}
252252//
253253// We do not want to shed replicas for CPU from a remote store until its had a
254254// chance to shed leases.
255- func (a * allocatorState ) rebalanceStores (
256- ctx context.Context , localStoreID roachpb.StoreID ,
255+ func (cs * clusterState ) rebalanceStores (
256+ ctx context.Context , localStoreID roachpb.StoreID , rng * rand. Rand , dsm * diversityScoringMemo ,
257257) []PendingRangeChange {
258- now := a . cs .ts .Now ()
258+ now := cs .ts .Now ()
259259 ctx = logtags .AddTag (ctx , "mmaid" , mmaid .Add (1 ))
260260 log .KvDistribution .VInfof (ctx , 2 , "rebalanceStores begins" )
261261 // To select which stores are overloaded, we use a notion of overload that
@@ -276,7 +276,7 @@ func (a *allocatorState) rebalanceStores(
276276 // responsible for equalizing load across two nodes that have 30% and 50%
277277 // cpu utilization while the cluster mean is 70% utilization (as an
278278 // example).
279- clusterMeans := a . cs .meansMemo .getMeans (nil )
279+ clusterMeans := cs .meansMemo .getMeans (nil )
280280 type sheddingStore struct {
281281 roachpb.StoreID
282282 storeLoadSummary
@@ -290,8 +290,8 @@ func (a *allocatorState) rebalanceStores(
290290 // fdDrain or fdDead, nor do we attempt to shed replicas from a store which
291291 // is storeMembershipRemoving (decommissioning). These are currently handled
292292 // via replicate_queue.go.
293- for storeID , ss := range a . cs .stores {
294- sls := a . cs .meansMemo .getStoreLoadSummary (ctx , clusterMeans , storeID , ss .loadSeqNum )
293+ for storeID , ss := range cs .stores {
294+ sls := cs .meansMemo .getStoreLoadSummary (ctx , clusterMeans , storeID , ss .loadSeqNum )
295295 log .KvDistribution .VInfof (ctx , 2 , "evaluating s%d: node load %s, store load %s, worst dim %s" ,
296296 storeID , sls .nls , sls .sls , sls .worstDim )
297297
@@ -374,7 +374,7 @@ func (a *allocatorState) rebalanceStores(
374374 for idx /*logging only*/ , store := range sheddingStores {
375375 log .KvDistribution .Infof (ctx , "start processing shedding store s%d: cpu node load %s, store load %s, worst dim %s" ,
376376 store .StoreID , store .nls , store .sls , store .worstDim )
377- ss := a . cs .stores [store .StoreID ]
377+ ss := cs .stores [store .StoreID ]
378378
379379 doneShedding := false
380380 if true {
@@ -385,7 +385,7 @@ func (a *allocatorState) rebalanceStores(
385385 var b strings.Builder
386386 for i := 0 ; i < n ; i ++ {
387387 rangeID := topKRanges .index (i )
388- rstate := a . cs .ranges [rangeID ]
388+ rstate := cs .ranges [rangeID ]
389389 load := rstate .load .Load
390390 if ! ss .adjusted .replicas [rangeID ].IsLeaseholder {
391391 load [CPURate ] = rstate .load .RaftCPU
@@ -416,7 +416,7 @@ func (a *allocatorState) rebalanceStores(
416416 n := topKRanges .len ()
417417 for i := 0 ; i < n ; i ++ {
418418 rangeID := topKRanges .index (i )
419- rstate := a . cs .ranges [rangeID ]
419+ rstate := cs .ranges [rangeID ]
420420 if len (rstate .pendingChanges ) > 0 {
421421 // If the range has pending changes, don't make more changes.
422422 log .KvDistribution .VInfof (ctx , 2 , "skipping r%d: has pending changes" , rangeID )
@@ -442,7 +442,7 @@ func (a *allocatorState) rebalanceStores(
442442 log .KvDistribution .VInfof (ctx , 2 , "skipping r%d: too soon after failed change" , rangeID )
443443 continue
444444 }
445- if ! a .ensureAnalyzedConstraints (rstate ) {
445+ if ! cs .ensureAnalyzedConstraints (rstate ) {
446446 log .KvDistribution .VInfof (ctx , 2 , "skipping r%d: constraints analysis failed" , rangeID )
447447 continue
448448 }
@@ -475,8 +475,8 @@ func (a *allocatorState) rebalanceStores(
475475 continue // leaseholder is the only candidate
476476 }
477477 clear (scratchNodes )
478- means := computeMeansForStoreSet (a . cs , candsPL , scratchNodes , scratchStores )
479- sls := a . cs .computeLoadSummary (ctx , store .StoreID , & means .storeLoad , & means .nodeLoad )
478+ means := computeMeansForStoreSet (cs , candsPL , scratchNodes , scratchStores )
479+ sls := cs .computeLoadSummary (ctx , store .StoreID , & means .storeLoad , & means .nodeLoad )
480480 log .KvDistribution .VInfof (ctx , 2 , "considering lease-transfer r%v from s%v: candidates are %v" , rangeID , store .StoreID , candsPL )
481481 if sls .dimSummary [CPURate ] < overloadSlow {
482482 // This store is not cpu overloaded relative to these candidates for
@@ -486,13 +486,13 @@ func (a *allocatorState) rebalanceStores(
486486 }
487487 var candsSet candidateSet
488488 for _ , cand := range cands {
489- if disp := a . cs .stores [cand .storeID ].adjusted .replicas [rangeID ].LeaseDisposition ; disp != LeaseDispositionOK {
489+ if disp := cs .stores [cand .storeID ].adjusted .replicas [rangeID ].LeaseDisposition ; disp != LeaseDispositionOK {
490490 // Don't transfer lease to a store that is lagging.
491491 log .KvDistribution .Infof (ctx , "skipping store s%d for lease transfer: lease disposition %v" ,
492492 cand .storeID , disp )
493493 continue
494494 }
495- candSls := a . cs .computeLoadSummary (ctx , cand .storeID , & means .storeLoad , & means .nodeLoad )
495+ candSls := cs .computeLoadSummary (ctx , cand .storeID , & means .storeLoad , & means .nodeLoad )
496496 candsSet .candidates = append (candsSet .candidates , candidateInfo {
497497 StoreID : cand .storeID ,
498498 storeLoadSummary : candSls ,
@@ -513,15 +513,15 @@ func (a *allocatorState) rebalanceStores(
513513 // will only add CPU to the target store (so it is ok to ignore other
514514 // dimensions on the target).
515515 targetStoreID := sortTargetCandidateSetAndPick (
516- ctx , candsSet , sls .sls , ignoreHigherThanLoadThreshold , CPURate , a . rand )
516+ ctx , candsSet , sls .sls , ignoreHigherThanLoadThreshold , CPURate , rng )
517517 if targetStoreID == 0 {
518518 log .KvDistribution .Infof (
519519 ctx ,
520520 "result(failed): no candidates to move lease from n%vs%v for r%v after sortTargetCandidateSetAndPick" ,
521521 ss .NodeID , ss .StoreID , rangeID )
522522 continue
523523 }
524- targetSS := a . cs .stores [targetStoreID ]
524+ targetSS := cs .stores [targetStoreID ]
525525 var addedLoad LoadVector
526526 // Only adding leaseholder CPU.
527527 addedLoad [CPURate ] = rstate .load .Load [CPURate ] - rstate .load .RaftCPU
@@ -531,7 +531,7 @@ func (a *allocatorState) rebalanceStores(
531531 addedLoad [CPURate ] = 0
532532 panic ("raft cpu higher than total cpu" )
533533 }
534- if ! a . cs .canShedAndAddLoad (ctx , ss , targetSS , addedLoad , & means , true , CPURate ) {
534+ if ! cs .canShedAndAddLoad (ctx , ss , targetSS , addedLoad , & means , true , CPURate ) {
535535 log .KvDistribution .VInfof (ctx , 2 , "result(failed): cannot shed from s%d to s%d for r%d: delta load %v" ,
536536 store .StoreID , targetStoreID , rangeID , addedLoad )
537537 continue
@@ -546,10 +546,10 @@ func (a *allocatorState) rebalanceStores(
546546 }
547547 leaseChanges := MakeLeaseTransferChanges (
548548 rangeID , rstate .replicas , rstate .load , addTarget , removeTarget )
549- if err := a . cs .preCheckOnApplyReplicaChanges (leaseChanges [:]); err != nil {
549+ if err := cs .preCheckOnApplyReplicaChanges (leaseChanges [:]); err != nil {
550550 panic (errors .Wrapf (err , "pre-check failed for lease transfer %v" , leaseChanges ))
551551 }
552- pendingChanges := a . cs .createPendingChanges (leaseChanges [:]... )
552+ pendingChanges := cs .createPendingChanges (leaseChanges [:]... )
553553 changes = append (changes , PendingRangeChange {
554554 RangeID : rangeID ,
555555 pendingReplicaChanges : pendingChanges [:],
@@ -607,7 +607,7 @@ func (a *allocatorState) rebalanceStores(
607607 storesToExclude = storesToExclude [:0 ]
608608 if excludeStoresOnNode {
609609 nodeID := ss .NodeID
610- for _ , storeID := range a . cs .nodes [nodeID ].stores {
610+ for _ , storeID := range cs .nodes [nodeID ].stores {
611611 storesToExclude .insert (storeID )
612612 }
613613 log .KvDistribution .VInfof (ctx , 2 , "excluding all stores on n%d due to overload/fd status" , nodeID )
@@ -624,7 +624,7 @@ func (a *allocatorState) rebalanceStores(
624624 rangeID := topKRanges .index (i )
625625 // TODO(sumeer): the following code belongs in a closure, since we will
626626 // repeat it for some random selection of non topKRanges.
627- rstate := a . cs .ranges [rangeID ]
627+ rstate := cs .ranges [rangeID ]
628628 if len (rstate .pendingChanges ) > 0 {
629629 // If the range has pending changes, don't make more changes.
630630 log .KvDistribution .VInfof (ctx , 2 , "skipping r%d: has pending changes" , rangeID )
@@ -634,7 +634,7 @@ func (a *allocatorState) rebalanceStores(
634634 log .KvDistribution .VInfof (ctx , 2 , "skipping r%d: too soon after failed change" , rangeID )
635635 continue
636636 }
637- if ! a .ensureAnalyzedConstraints (rstate ) {
637+ if ! cs .ensureAnalyzedConstraints (rstate ) {
638638 log .KvDistribution .VInfof (ctx , 2 , "skipping r%d: constraints analysis failed" , rangeID )
639639 continue
640640 }
@@ -673,13 +673,13 @@ func (a *allocatorState) rebalanceStores(
673673 // we have already excluded those stores above.
674674 continue
675675 }
676- nodeID := a . cs .stores [storeID ].NodeID
677- for _ , storeID := range a . cs .nodes [nodeID ].stores {
676+ nodeID := cs .stores [storeID ].NodeID
677+ for _ , storeID := range cs .nodes [nodeID ].stores {
678678 storesToExcludeForRange .insert (storeID )
679679 }
680680 }
681681 // TODO(sumeer): eliminate cands allocations by passing a scratch slice.
682- cands , ssSLS := a .computeCandidatesForRange (ctx , disj [:], storesToExcludeForRange , store .StoreID )
682+ cands , ssSLS := cs .computeCandidatesForRange (ctx , disj [:], storesToExcludeForRange , store .StoreID )
683683 log .KvDistribution .VInfof (ctx , 2 , "considering replica-transfer r%v from s%v: store load %v" ,
684684 rangeID , store .StoreID , ss .adjusted .load )
685685 if log .V (2 ) {
@@ -699,15 +699,15 @@ func (a *allocatorState) rebalanceStores(
699699 } else {
700700 rlocalities = rstate .constraints .replicaLocalityTiers
701701 }
702- localities := a . diversityScoringMemo .getExistingReplicaLocalities (rlocalities )
702+ localities := dsm .getExistingReplicaLocalities (rlocalities )
703703 isLeaseholder := rstate .constraints .leaseholderID == store .StoreID
704704 // Set the diversity score and lease preference index of the candidates.
705705 for _ , cand := range cands .candidates {
706706 cand .diversityScore = localities .getScoreChangeForRebalance (
707- ss .localityTiers , a . cs .stores [cand .StoreID ].localityTiers )
707+ ss .localityTiers , cs .stores [cand .StoreID ].localityTiers )
708708 if isLeaseholder {
709709 cand .leasePreferenceIndex = matchedLeasePreferenceIndex (
710- cand .StoreID , rstate .constraints .spanConfig .leasePreferences , a . cs .constraintMatcher )
710+ cand .StoreID , rstate .constraints .spanConfig .leasePreferences , cs .constraintMatcher )
711711 }
712712 }
713713 // Consider a cluster where s1 is overloadSlow, s2 is loadNoChange, and
@@ -732,18 +732,18 @@ func (a *allocatorState) rebalanceStores(
732732 ignoreLevel , ssSLS .sls , rangeID , overloadDur )
733733 }
734734 targetStoreID := sortTargetCandidateSetAndPick (
735- ctx , cands , ssSLS .sls , ignoreLevel , loadDim , a . rand )
735+ ctx , cands , ssSLS .sls , ignoreLevel , loadDim , rng )
736736 if targetStoreID == 0 {
737737 log .KvDistribution .VInfof (ctx , 2 , "result(failed): no suitable target found among candidates for r%d " +
738738 "(threshold %s; %s)" , rangeID , ssSLS .sls , ignoreLevel )
739739 continue
740740 }
741- targetSS := a . cs .stores [targetStoreID ]
741+ targetSS := cs .stores [targetStoreID ]
742742 addedLoad := rstate .load .Load
743743 if ! isLeaseholder {
744744 addedLoad [CPURate ] = rstate .load .RaftCPU
745745 }
746- if ! a . cs .canShedAndAddLoad (ctx , ss , targetSS , addedLoad , cands .means , false , loadDim ) {
746+ if ! cs .canShedAndAddLoad (ctx , ss , targetSS , addedLoad , cands .means , false , loadDim ) {
747747 log .KvDistribution .VInfof (ctx , 2 , "result(failed): cannot shed from s%d to s%d for r%d: delta load %v" ,
748748 store .StoreID , targetStoreID , rangeID , addedLoad )
749749 continue
@@ -763,11 +763,11 @@ func (a *allocatorState) rebalanceStores(
763763 }
764764 replicaChanges := makeRebalanceReplicaChanges (
765765 rangeID , rstate .replicas , rstate .load , addTarget , removeTarget )
766- if err = a . cs .preCheckOnApplyReplicaChanges (replicaChanges [:]); err != nil {
766+ if err = cs .preCheckOnApplyReplicaChanges (replicaChanges [:]); err != nil {
767767 panic (errors .Wrapf (err , "pre-check failed for replica changes: %v for %v" ,
768768 replicaChanges , rangeID ))
769769 }
770- pendingChanges := a . cs .createPendingChanges (replicaChanges [:]... )
770+ pendingChanges := cs .createPendingChanges (replicaChanges [:]... )
771771 changes = append (changes , PendingRangeChange {
772772 RangeID : rangeID ,
773773 pendingReplicaChanges : pendingChanges [:],
@@ -906,7 +906,7 @@ func (a *allocatorState) ComputeChanges(
906906 panic (fmt .Sprintf ("ComputeChanges: expected StoreID %d, got %d" , opts .LocalStoreID , msg .StoreID ))
907907 }
908908 a .cs .processStoreLeaseholderMsg (ctx , msg , a .mmaMetrics )
909- return a .rebalanceStores (ctx , opts .LocalStoreID )
909+ return a .cs . rebalanceStores (ctx , opts .LocalStoreID , a . rand , a . diversityScoringMemo )
910910}
911911
912912// AdminRelocateOne implements the Allocator interface.
@@ -1276,7 +1276,7 @@ func sortTargetCandidateSetAndPick(
12761276 return cands .candidates [j ].StoreID
12771277}
12781278
1279- func (a * allocatorState ) ensureAnalyzedConstraints (rstate * rangeState ) bool {
1279+ func (cs * clusterState ) ensureAnalyzedConstraints (rstate * rangeState ) bool {
12801280 if rstate .constraints != nil {
12811281 return true
12821282 }
@@ -1286,7 +1286,7 @@ func (a *allocatorState) ensureAnalyzedConstraints(rstate *rangeState) bool {
12861286 leaseholder := roachpb .StoreID (- 1 )
12871287 for _ , replica := range rstate .replicas {
12881288 buf .tryAddingStore (replica .StoreID , replica .ReplicaIDAndType .ReplicaType .ReplicaType ,
1289- a . cs .stores [replica .StoreID ].localityTiers )
1289+ cs .stores [replica .StoreID ].localityTiers )
12901290 if replica .IsLeaseholder {
12911291 leaseholder = replica .StoreID
12921292 }
@@ -1298,7 +1298,7 @@ func (a *allocatorState) ensureAnalyzedConstraints(rstate *rangeState) bool {
12981298 releaseRangeAnalyzedConstraints (rac )
12991299 return false
13001300 }
1301- rac .finishInit (rstate .conf , a . cs .constraintMatcher , leaseholder )
1301+ rac .finishInit (rstate .conf , cs .constraintMatcher , leaseholder )
13021302 rstate .constraints = rac
13031303 return true
13041304}
@@ -1365,16 +1365,16 @@ func (a *allocatorState) ensureAnalyzedConstraints(rstate *rangeState) bool {
13651365
13661366// loadSheddingStore is only specified if this candidate computation is
13671367// happening because of overload.
1368- func (a * allocatorState ) computeCandidatesForRange (
1368+ func (cs * clusterState ) computeCandidatesForRange (
13691369 ctx context.Context ,
13701370 expr constraintsDisj ,
13711371 storesToExclude storeSet ,
13721372 loadSheddingStore roachpb.StoreID ,
13731373) (_ candidateSet , sheddingSLS storeLoadSummary ) {
1374- means := a . cs .meansMemo .getMeans (expr )
1374+ means := cs .meansMemo .getMeans (expr )
13751375 if loadSheddingStore > 0 {
1376- sheddingSS := a . cs .stores [loadSheddingStore ]
1377- sheddingSLS = a . cs .meansMemo .getStoreLoadSummary (ctx , means , loadSheddingStore , sheddingSS .loadSeqNum )
1376+ sheddingSS := cs .stores [loadSheddingStore ]
1377+ sheddingSLS = cs .meansMemo .getStoreLoadSummary (ctx , means , loadSheddingStore , sheddingSS .loadSeqNum )
13781378 if sheddingSLS .sls <= loadNoChange && sheddingSLS .nls <= loadNoChange {
13791379 // In this set of stores, this store no longer looks overloaded.
13801380 return candidateSet {}, sheddingSLS
@@ -1387,8 +1387,8 @@ func (a *allocatorState) computeCandidatesForRange(
13871387 if storesToExclude .contains (storeID ) {
13881388 continue
13891389 }
1390- ss := a . cs .stores [storeID ]
1391- csls := a . cs .meansMemo .getStoreLoadSummary (ctx , means , storeID , ss .loadSeqNum )
1390+ ss := cs .stores [storeID ]
1391+ csls := cs .meansMemo .getStoreLoadSummary (ctx , means , storeID , ss .loadSeqNum )
13921392 cset .candidates = append (cset .candidates , candidateInfo {
13931393 StoreID : storeID ,
13941394 storeLoadSummary : csls ,
0 commit comments