@@ -23,6 +23,27 @@ import (
2323
2424var mmaid = atomic.Int64 {}
2525
26+ // rebalanceState tracks the state and outcomes of a rebalanceStores invocation.
27+ type rebalanceState struct {
28+ cs * clusterState
29+ // changes accumulates the pending range changes made during rebalancing.
30+ changes []PendingRangeChange
31+ // rangeMoveCount tracks the number of range moves made.
32+ rangeMoveCount int
33+ // leaseTransferCount tracks the number of lease transfers made.
34+ leaseTransferCount int
35+ // shouldReturnEarly indicates the outer loop should return immediately.
36+ shouldReturnEarly bool
37+ // shouldContinue indicates the outer loop should continue to the next iteration.
38+ shouldContinue bool
39+ // maxRangeMoveCount is the maximum number of range moves allowed.
40+ maxRangeMoveCount int
41+ // maxLeaseTransferCount is the maximum number of lease transfers allowed.
42+ maxLeaseTransferCount int
43+ // lastFailedChangeDelayDuration is the delay after a failed change before retrying.
44+ lastFailedChangeDelayDuration time.Duration
45+ }
46+
2647// Called periodically, say every 10s.
2748//
2849// We do not want to shed replicas for CPU from a remote store until its had a
@@ -126,7 +147,6 @@ func (cs *clusterState) rebalanceStores(
126147 }
127148 }
128149
129- var changes []PendingRangeChange
130150 var disj [1 ]constraintsConj
131151 var storesToExclude storeSet
132152 var storesToExcludeForRange storeSet
@@ -144,13 +164,22 @@ func (cs *clusterState) rebalanceStores(
144164 const maxLeaseTransferCount = 8
145165 // See the long comment where rangeState.lastFailedChange is declared.
146166 const lastFailedChangeDelayDuration time.Duration = 60 * time .Second
147- rangeMoveCount := 0
148- leaseTransferCount := 0
167+ rs := & rebalanceState {
168+ cs : cs ,
169+ changes : []PendingRangeChange {},
170+ rangeMoveCount : 0 ,
171+ leaseTransferCount : 0 ,
172+ shouldReturnEarly : false ,
173+ shouldContinue : false ,
174+ maxRangeMoveCount : maxRangeMoveCount ,
175+ maxLeaseTransferCount : maxLeaseTransferCount ,
176+ lastFailedChangeDelayDuration : lastFailedChangeDelayDuration ,
177+ }
149178 for idx /*logging only*/ , store := range sheddingStores {
150- shouldReturnEarly , shouldContinue := func () ( bool , bool ) {
179+ func () {
151180 log .KvDistribution .Infof (ctx , "start processing shedding store s%d: cpu node load %s, store load %s, worst dim %s" ,
152181 store .StoreID , store .nls , store .sls , store .worstDim )
153- ss := cs .stores [store .StoreID ]
182+ ss := rs . cs .stores [store .StoreID ]
154183
155184 doneShedding := false
156185 if true {
@@ -161,7 +190,7 @@ func (cs *clusterState) rebalanceStores(
161190 var b strings.Builder
162191 for i := 0 ; i < n ; i ++ {
163192 rangeID := topKRanges .index (i )
164- rstate := cs .ranges [rangeID ]
193+ rstate := rs . cs .ranges [rangeID ]
165194 load := rstate .load .Load
166195 if ! ss .adjusted .replicas [rangeID ].IsLeaseholder {
167196 load [CPURate ] = rstate .load .RaftCPU
@@ -192,7 +221,7 @@ func (cs *clusterState) rebalanceStores(
192221 n := topKRanges .len ()
193222 for i := 0 ; i < n ; i ++ {
194223 rangeID := topKRanges .index (i )
195- rstate := cs .ranges [rangeID ]
224+ rstate := rs . cs .ranges [rangeID ]
196225 if len (rstate .pendingChanges ) > 0 {
197226 // If the range has pending changes, don't make more changes.
198227 log .KvDistribution .VInfof (ctx , 2 , "skipping r%d: has pending changes" , rangeID )
@@ -214,11 +243,11 @@ func (cs *clusterState) rebalanceStores(
214243 " changes but is not leaseholder: %+v" , rstate )
215244 }
216245 }
217- if now .Sub (rstate .lastFailedChange ) < lastFailedChangeDelayDuration {
246+ if now .Sub (rstate .lastFailedChange ) < rs . lastFailedChangeDelayDuration {
218247 log .KvDistribution .VInfof (ctx , 2 , "skipping r%d: too soon after failed change" , rangeID )
219248 continue
220249 }
221- if ! cs .ensureAnalyzedConstraints (rstate ) {
250+ if ! rs . cs .ensureAnalyzedConstraints (rstate ) {
222251 log .KvDistribution .VInfof (ctx , 2 , "skipping r%d: constraints analysis failed" , rangeID )
223252 continue
224253 }
@@ -251,8 +280,8 @@ func (cs *clusterState) rebalanceStores(
251280 continue // leaseholder is the only candidate
252281 }
253282 clear (scratchNodes )
254- means := computeMeansForStoreSet (cs , candsPL , scratchNodes , scratchStores )
255- sls := cs .computeLoadSummary (ctx , store .StoreID , & means .storeLoad , & means .nodeLoad )
283+ means := computeMeansForStoreSet (rs . cs , candsPL , scratchNodes , scratchStores )
284+ sls := rs . cs .computeLoadSummary (ctx , store .StoreID , & means .storeLoad , & means .nodeLoad )
256285 log .KvDistribution .VInfof (ctx , 2 , "considering lease-transfer r%v from s%v: candidates are %v" , rangeID , store .StoreID , candsPL )
257286 if sls .dimSummary [CPURate ] < overloadSlow {
258287 // This store is not cpu overloaded relative to these candidates for
@@ -262,13 +291,13 @@ func (cs *clusterState) rebalanceStores(
262291 }
263292 var candsSet candidateSet
264293 for _ , cand := range cands {
265- if disp := cs .stores [cand .storeID ].adjusted .replicas [rangeID ].LeaseDisposition ; disp != LeaseDispositionOK {
294+ if disp := rs . cs .stores [cand .storeID ].adjusted .replicas [rangeID ].LeaseDisposition ; disp != LeaseDispositionOK {
266295 // Don't transfer lease to a store that is lagging.
267296 log .KvDistribution .Infof (ctx , "skipping store s%d for lease transfer: lease disposition %v" ,
268297 cand .storeID , disp )
269298 continue
270299 }
271- candSls := cs .computeLoadSummary (ctx , cand .storeID , & means .storeLoad , & means .nodeLoad )
300+ candSls := rs . cs .computeLoadSummary (ctx , cand .storeID , & means .storeLoad , & means .nodeLoad )
272301 candsSet .candidates = append (candsSet .candidates , candidateInfo {
273302 StoreID : cand .storeID ,
274303 storeLoadSummary : candSls ,
@@ -297,7 +326,7 @@ func (cs *clusterState) rebalanceStores(
297326 ss .NodeID , ss .StoreID , rangeID )
298327 continue
299328 }
300- targetSS := cs .stores [targetStoreID ]
329+ targetSS := rs . cs .stores [targetStoreID ]
301330 var addedLoad LoadVector
302331 // Only adding leaseholder CPU.
303332 addedLoad [CPURate ] = rstate .load .Load [CPURate ] - rstate .load .RaftCPU
@@ -307,7 +336,7 @@ func (cs *clusterState) rebalanceStores(
307336 addedLoad [CPURate ] = 0
308337 panic ("raft cpu higher than total cpu" )
309338 }
310- if ! cs .canShedAndAddLoad (ctx , ss , targetSS , addedLoad , & means , true , CPURate ) {
339+ if ! rs . cs .canShedAndAddLoad (ctx , ss , targetSS , addedLoad , & means , true , CPURate ) {
311340 log .KvDistribution .VInfof (ctx , 2 , "result(failed): cannot shed from s%d to s%d for r%d: delta load %v" ,
312341 store .StoreID , targetStoreID , rangeID , addedLoad )
313342 continue
@@ -323,25 +352,26 @@ func (cs *clusterState) rebalanceStores(
323352 replicaChanges := MakeLeaseTransferChanges (
324353 rangeID , rstate .replicas , rstate .load , addTarget , removeTarget )
325354 leaseChange := MakePendingRangeChange (rangeID , replicaChanges [:])
326- if err := cs .preCheckOnApplyReplicaChanges (leaseChange .pendingReplicaChanges ); err != nil {
355+ if err := rs . cs .preCheckOnApplyReplicaChanges (leaseChange .pendingReplicaChanges ); err != nil {
327356 panic (errors .Wrapf (err , "pre-check failed for lease transfer %v" , leaseChange ))
328357 }
329- cs .addPendingRangeChange (leaseChange )
330- changes = append (changes , leaseChange )
331- leaseTransferCount ++
332- if changes [len (changes )- 1 ].IsChangeReplicas () || ! changes [len (changes )- 1 ].IsTransferLease () {
333- panic (fmt .Sprintf ("lease transfer is invalid: %v" , changes [len (changes )- 1 ]))
358+ rs . cs .addPendingRangeChange (leaseChange )
359+ rs . changes = append (rs . changes , leaseChange )
360+ rs . leaseTransferCount ++
361+ if rs . changes [len (rs . changes )- 1 ].IsChangeReplicas () || ! rs . changes [len (rs . changes )- 1 ].IsTransferLease () {
362+ panic (fmt .Sprintf ("lease transfer is invalid: %v" , rs . changes [len (rs . changes )- 1 ]))
334363 }
335364 log .KvDistribution .Infof (ctx ,
336365 "result(success): shedding r%v lease from s%v to s%v [change:%v] with " +
337366 "resulting loads source:%v target:%v (means: %v) (frac_pending: (src:%.2f,target:%.2f) (src:%.2f,target:%.2f))" ,
338- rangeID , removeTarget .StoreID , addTarget .StoreID , changes [len (changes )- 1 ],
367+ rangeID , removeTarget .StoreID , addTarget .StoreID , rs . changes [len (rs . changes )- 1 ],
339368 ss .adjusted .load , targetSS .adjusted .load , means .storeLoad .load ,
340369 ss .maxFractionPendingIncrease , ss .maxFractionPendingDecrease ,
341370 targetSS .maxFractionPendingIncrease , targetSS .maxFractionPendingDecrease )
342- if leaseTransferCount >= maxLeaseTransferCount {
343- log .KvDistribution .VInfof (ctx , 2 , "reached max lease transfer count %d, returning" , maxLeaseTransferCount )
344- return true , false
371+ if rs .leaseTransferCount >= rs .maxLeaseTransferCount {
372+ log .KvDistribution .VInfof (ctx , 2 , "reached max lease transfer count %d, returning" , rs .maxLeaseTransferCount )
373+ rs .shouldReturnEarly = true
374+ return
345375 }
346376 doneShedding = ss .maxFractionPendingDecrease >= maxFractionPendingThreshold
347377 if doneShedding {
@@ -350,7 +380,7 @@ func (cs *clusterState) rebalanceStores(
350380 break
351381 }
352382 }
353- if doneShedding || leaseTransferCount > 0 {
383+ if doneShedding || rs . leaseTransferCount > 0 {
354384 // If managed to transfer a lease, wait for it to be done, before
355385 // shedding replicas from this store (which is more costly). Otherwise
356386 // we may needlessly start moving replicas. Note that the store
@@ -359,8 +389,9 @@ func (cs *clusterState) rebalanceStores(
359389 // pending from a load perspective, so we *may* not be able to do more
360390 // lease transfers -- so be it.
361391 log .KvDistribution .VInfof (ctx , 2 , "skipping replica transfers for s%d: done shedding=%v, lease_transfers=%d" ,
362- store .StoreID , doneShedding , leaseTransferCount )
363- return false , true
392+ store .StoreID , doneShedding , rs .leaseTransferCount )
393+ rs .shouldContinue = true
394+ return
364395 }
365396 } else {
366397 log .KvDistribution .VInfof (ctx , 2 , "skipping lease shedding: s%v != local store s%s or cpu is not overloaded: %v" ,
@@ -372,7 +403,8 @@ func (cs *clusterState) rebalanceStores(
372403 if store .StoreID != localStoreID && store .dimSummary [CPURate ] >= overloadSlow &&
373404 now .Sub (ss .overloadStartTime ) < remoteStoreLeaseSheddingGraceDuration {
374405 log .KvDistribution .VInfof (ctx , 2 , "skipping remote store s%d: in lease shedding grace period" , store .StoreID )
375- return false , true
406+ rs .shouldContinue = true
407+ return
376408 }
377409 // If the node is cpu overloaded, or the store/node is not fdOK, exclude
378410 // the other stores on this node from receiving replicas shed by this
@@ -381,7 +413,7 @@ func (cs *clusterState) rebalanceStores(
381413 storesToExclude = storesToExclude [:0 ]
382414 if excludeStoresOnNode {
383415 nodeID := ss .NodeID
384- for _ , storeID := range cs .nodes [nodeID ].stores {
416+ for _ , storeID := range rs . cs .nodes [nodeID ].stores {
385417 storesToExclude .insert (storeID )
386418 }
387419 log .KvDistribution .VInfof (ctx , 2 , "excluding all stores on n%d due to overload/fd status" , nodeID )
@@ -404,11 +436,11 @@ func (cs *clusterState) rebalanceStores(
404436 log .KvDistribution .VInfof (ctx , 2 , "skipping r%d: has pending changes" , rangeID )
405437 continue
406438 }
407- if now .Sub (rstate .lastFailedChange ) < lastFailedChangeDelayDuration {
439+ if now .Sub (rstate .lastFailedChange ) < rs . lastFailedChangeDelayDuration {
408440 log .KvDistribution .VInfof (ctx , 2 , "skipping r%d: too soon after failed change" , rangeID )
409441 continue
410442 }
411- if ! cs .ensureAnalyzedConstraints (rstate ) {
443+ if ! rs . cs .ensureAnalyzedConstraints (rstate ) {
412444 log .KvDistribution .VInfof (ctx , 2 , "skipping r%d: constraints analysis failed" , rangeID )
413445 continue
414446 }
@@ -447,13 +479,13 @@ func (cs *clusterState) rebalanceStores(
447479 // we have already excluded those stores above.
448480 continue
449481 }
450- nodeID := cs .stores [storeID ].NodeID
451- for _ , storeID := range cs .nodes [nodeID ].stores {
482+ nodeID := rs . cs .stores [storeID ].NodeID
483+ for _ , storeID := range rs . cs .nodes [nodeID ].stores {
452484 storesToExcludeForRange .insert (storeID )
453485 }
454486 }
455487 // TODO(sumeer): eliminate cands allocations by passing a scratch slice.
456- cands , ssSLS := cs .computeCandidatesForRange (ctx , disj [:], storesToExcludeForRange , store .StoreID )
488+ cands , ssSLS := rs . cs .computeCandidatesForRange (ctx , disj [:], storesToExcludeForRange , store .StoreID )
457489 log .KvDistribution .VInfof (ctx , 2 , "considering replica-transfer r%v from s%v: store load %v" ,
458490 rangeID , store .StoreID , ss .adjusted .load )
459491 if log .V (2 ) {
@@ -478,10 +510,10 @@ func (cs *clusterState) rebalanceStores(
478510 // Set the diversity score and lease preference index of the candidates.
479511 for _ , cand := range cands .candidates {
480512 cand .diversityScore = localities .getScoreChangeForRebalance (
481- ss .localityTiers , cs .stores [cand .StoreID ].localityTiers )
513+ ss .localityTiers , rs . cs .stores [cand .StoreID ].localityTiers )
482514 if isLeaseholder {
483515 cand .leasePreferenceIndex = matchedLeasePreferenceIndex (
484- cand .StoreID , rstate .constraints .spanConfig .leasePreferences , cs .constraintMatcher )
516+ cand .StoreID , rstate .constraints .spanConfig .leasePreferences , rs . cs .constraintMatcher )
485517 }
486518 }
487519 // Consider a cluster where s1 is overloadSlow, s2 is loadNoChange, and
@@ -512,12 +544,12 @@ func (cs *clusterState) rebalanceStores(
512544 "(threshold %s; %s)" , rangeID , ssSLS .sls , ignoreLevel )
513545 continue
514546 }
515- targetSS := cs .stores [targetStoreID ]
547+ targetSS := rs . cs .stores [targetStoreID ]
516548 addedLoad := rstate .load .Load
517549 if ! isLeaseholder {
518550 addedLoad [CPURate ] = rstate .load .RaftCPU
519551 }
520- if ! cs .canShedAndAddLoad (ctx , ss , targetSS , addedLoad , cands .means , false , loadDim ) {
552+ if ! rs . cs .canShedAndAddLoad (ctx , ss , targetSS , addedLoad , cands .means , false , loadDim ) {
521553 log .KvDistribution .VInfof (ctx , 2 , "result(failed): cannot shed from s%d to s%d for r%d: delta load %v" ,
522554 store .StoreID , targetStoreID , rangeID , addedLoad )
523555 continue
@@ -538,19 +570,20 @@ func (cs *clusterState) rebalanceStores(
538570 replicaChanges := makeRebalanceReplicaChanges (
539571 rangeID , rstate .replicas , rstate .load , addTarget , removeTarget )
540572 rangeChange := MakePendingRangeChange (rangeID , replicaChanges [:])
541- if err = cs .preCheckOnApplyReplicaChanges (rangeChange .pendingReplicaChanges ); err != nil {
573+ if err = rs . cs .preCheckOnApplyReplicaChanges (rangeChange .pendingReplicaChanges ); err != nil {
542574 panic (errors .Wrapf (err , "pre-check failed for replica changes: %v for %v" ,
543575 replicaChanges , rangeID ))
544576 }
545- cs .addPendingRangeChange (rangeChange )
546- changes = append (changes , rangeChange )
547- rangeMoveCount ++
577+ rs . cs .addPendingRangeChange (rangeChange )
578+ rs . changes = append (rs . changes , rangeChange )
579+ rs . rangeMoveCount ++
548580 log .KvDistribution .VInfof (ctx , 2 ,
549581 "result(success): rebalancing r%v from s%v to s%v [change: %v] with resulting loads source: %v target: %v" ,
550- rangeID , removeTarget .StoreID , addTarget .StoreID , changes [len (changes )- 1 ], ss .adjusted .load , targetSS .adjusted .load )
551- if rangeMoveCount >= maxRangeMoveCount {
552- log .KvDistribution .VInfof (ctx , 2 , "s%d has reached max range move count %d: mma returning with %d stores left in shedding stores" , store .StoreID , maxRangeMoveCount , len (sheddingStores )- (idx + 1 ))
553- return true , false
582+ rangeID , removeTarget .StoreID , addTarget .StoreID , rs .changes [len (rs .changes )- 1 ], ss .adjusted .load , targetSS .adjusted .load )
583+ if rs .rangeMoveCount >= rs .maxRangeMoveCount {
584+ log .KvDistribution .VInfof (ctx , 2 , "s%d has reached max range move count %d: mma returning with %d stores left in shedding stores" , store .StoreID , rs .maxRangeMoveCount , len (sheddingStores )- (idx + 1 ))
585+ rs .shouldReturnEarly = true
586+ return
554587 }
555588 doneShedding = ss .maxFractionPendingDecrease >= maxFractionPendingThreshold
556589 if doneShedding {
@@ -566,16 +599,17 @@ func (cs *clusterState) rebalanceStores(
566599 // rebalancing to work well is not in scope.
567600 if doneShedding {
568601 log .KvDistribution .VInfof (ctx , 2 , "store s%d is done shedding, moving to next store" , store .StoreID )
569- return false , true
602+ rs .shouldContinue = true
603+ return
570604 }
571- return false , false
572605 }()
573- if shouldReturnEarly {
574- return changes
606+ if rs . shouldReturnEarly {
607+ return rs . changes
575608 }
576- if shouldContinue {
609+ if rs .shouldContinue {
610+ rs .shouldContinue = false
577611 continue
578612 }
579613 }
580- return changes
614+ return rs . changes
581615}
0 commit comments