Skip to content

Commit 8b6f928

Browse files
committed
mmaprototype: rename rebalanceState -> rebalanceEnv
Wenyi correctly pointed out that (rs rebalanceState) is too reminiscent of (rs rangeState) (re rebalanceEnv) has no such problem. Epic: CRDB-55052
1 parent be8f0ee commit 8b6f928

File tree

1 file changed

+65
-71
lines changed

1 file changed

+65
-71
lines changed

pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go

Lines changed: 65 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ import (
2323

2424
var mmaid = atomic.Int64{}
2525

26-
// rebalanceState tracks the state and outcomes of a rebalanceStores invocation.
27-
type rebalanceState struct {
26+
// rebalanceEnv tracks the state and outcomes of a rebalanceStores invocation.
27+
type rebalanceEnv struct {
2828
cs *clusterState
2929
// rng is the random number generator for non-deterministic decisions.
3030
rng *rand.Rand
@@ -170,7 +170,7 @@ func (cs *clusterState) rebalanceStores(
170170
const maxLeaseTransferCount = 8
171171
// See the long comment where rangeState.lastFailedChange is declared.
172172
const lastFailedChangeDelayDuration time.Duration = 60 * time.Second
173-
rs := &rebalanceState{
173+
re := &rebalanceEnv{
174174
cs: cs,
175175
rng: rng,
176176
dsm: dsm,
@@ -182,23 +182,23 @@ func (cs *clusterState) rebalanceStores(
182182
lastFailedChangeDelayDuration: lastFailedChangeDelayDuration,
183183
now: now,
184184
}
185-
rs.scratch.nodes = map[roachpb.NodeID]*NodeLoad{}
186-
rs.scratch.stores = map[roachpb.StoreID]struct{}{}
185+
re.scratch.nodes = map[roachpb.NodeID]*NodeLoad{}
186+
re.scratch.stores = map[roachpb.StoreID]struct{}{}
187187
for _, store := range sheddingStores {
188-
if rs.rangeMoveCount >= rs.maxRangeMoveCount || rs.leaseTransferCount >= rs.maxLeaseTransferCount {
188+
if re.rangeMoveCount >= re.maxRangeMoveCount || re.leaseTransferCount >= re.maxLeaseTransferCount {
189189
break
190190
}
191-
rs.rebalanceStore(ctx, store, localStoreID)
191+
re.rebalanceStore(ctx, store, localStoreID)
192192
}
193-
return rs.changes
193+
return re.changes
194194
}
195195

196-
func (rs *rebalanceState) rebalanceStore(
196+
func (re *rebalanceEnv) rebalanceStore(
197197
ctx context.Context, store sheddingStore, localStoreID roachpb.StoreID,
198198
) {
199199
log.KvDistribution.Infof(ctx, "start processing shedding store s%d: cpu node load %s, store load %s, worst dim %s",
200200
store.StoreID, store.nls, store.sls, store.worstDim)
201-
ss := rs.cs.stores[store.StoreID]
201+
ss := re.cs.stores[store.StoreID]
202202

203203
if true {
204204
// Debug logging.
@@ -208,7 +208,7 @@ func (rs *rebalanceState) rebalanceStore(
208208
var b strings.Builder
209209
for i := 0; i < n; i++ {
210210
rangeID := topKRanges.index(i)
211-
rstate := rs.cs.ranges[rangeID]
211+
rstate := re.cs.ranges[rangeID]
212212
load := rstate.load.Load
213213
if !ss.adjusted.replicas[rangeID].IsLeaseholder {
214214
load[CPURate] = rstate.load.RaftCPU
@@ -229,7 +229,7 @@ func (rs *rebalanceState) rebalanceStore(
229229
// behalf of a particular store (vs. being called on behalf of the set
230230
// of local store IDs)?
231231
if ss.StoreID == localStoreID && store.dimSummary[CPURate] >= overloadSlow {
232-
shouldSkipReplicaMoves := rs.rebalanceLeases(ctx, ss, store, localStoreID)
232+
shouldSkipReplicaMoves := re.rebalanceLeases(ctx, ss, store, localStoreID)
233233
if shouldSkipReplicaMoves {
234234
return
235235
}
@@ -239,35 +239,32 @@ func (rs *rebalanceState) rebalanceStore(
239239
}
240240

241241
log.KvDistribution.VInfof(ctx, 2, "attempting to shed replicas next")
242-
rs.rebalanceReplicas(ctx, store, ss, localStoreID)
242+
re.rebalanceReplicas(ctx, store, ss, localStoreID)
243243
}
244244

245-
func (rs *rebalanceState) rebalanceReplicas(
246-
ctx context.Context,
247-
store sheddingStore,
248-
ss *storeState,
249-
localStoreID roachpb.StoreID,
245+
func (re *rebalanceEnv) rebalanceReplicas(
246+
ctx context.Context, store sheddingStore, ss *storeState, localStoreID roachpb.StoreID,
250247
) {
251248
doneShedding := false
252249
if store.StoreID != localStoreID && store.dimSummary[CPURate] >= overloadSlow &&
253-
rs.now.Sub(ss.overloadStartTime) < remoteStoreLeaseSheddingGraceDuration {
250+
re.now.Sub(ss.overloadStartTime) < remoteStoreLeaseSheddingGraceDuration {
254251
log.KvDistribution.VInfof(ctx, 2, "skipping remote store s%d: in lease shedding grace period", store.StoreID)
255252
return
256253
}
257254
// If the node is cpu overloaded, or the store/node is not fdOK, exclude
258255
// the other stores on this node from receiving replicas shed by this
259256
// store.
260257
excludeStoresOnNode := store.nls > overloadSlow
261-
rs.scratch.storesToExclude = rs.scratch.storesToExclude[:0]
258+
re.scratch.storesToExclude = re.scratch.storesToExclude[:0]
262259
if excludeStoresOnNode {
263260
nodeID := ss.NodeID
264-
for _, storeID := range rs.cs.nodes[nodeID].stores {
265-
rs.scratch.storesToExclude.insert(storeID)
261+
for _, storeID := range re.cs.nodes[nodeID].stores {
262+
re.scratch.storesToExclude.insert(storeID)
266263
}
267264
log.KvDistribution.VInfof(ctx, 2, "excluding all stores on n%d due to overload/fd status", nodeID)
268265
} else {
269266
// This store is excluded of course.
270-
rs.scratch.storesToExclude.insert(store.StoreID)
267+
re.scratch.storesToExclude.insert(store.StoreID)
271268
}
272269

273270
// Iterate over top-K ranges first and try to move them.
@@ -278,17 +275,17 @@ func (rs *rebalanceState) rebalanceReplicas(
278275
rangeID := topKRanges.index(i)
279276
// TODO(sumeer): the following code belongs in a closure, since we will
280277
// repeat it for some random selection of non topKRanges.
281-
rstate := rs.cs.ranges[rangeID]
278+
rstate := re.cs.ranges[rangeID]
282279
if len(rstate.pendingChanges) > 0 {
283280
// If the range has pending changes, don't make more changes.
284281
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: has pending changes", rangeID)
285282
continue
286283
}
287-
if rs.now.Sub(rstate.lastFailedChange) < rs.lastFailedChangeDelayDuration {
284+
if re.now.Sub(rstate.lastFailedChange) < re.lastFailedChangeDelayDuration {
288285
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: too soon after failed change", rangeID)
289286
continue
290287
}
291-
if !rs.cs.ensureAnalyzedConstraints(rstate) {
288+
if !re.cs.ensureAnalyzedConstraints(rstate) {
292289
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraints analysis failed", rangeID)
293290
continue
294291
}
@@ -316,8 +313,8 @@ func (rs *rebalanceState) rebalanceReplicas(
316313
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraint violation needs fixing first: %v", rangeID, err)
317314
continue
318315
}
319-
rs.scratch.disj[0] = conj
320-
rs.scratch.storesToExcludeForRange = append(rs.scratch.storesToExcludeForRange[:0], rs.scratch.storesToExclude...)
316+
re.scratch.disj[0] = conj
317+
re.scratch.storesToExcludeForRange = append(re.scratch.storesToExcludeForRange[:0], re.scratch.storesToExclude...)
321318
// Also exclude all stores on nodes that have existing replicas.
322319
for _, replica := range rstate.replicas {
323320
storeID := replica.StoreID
@@ -327,13 +324,13 @@ func (rs *rebalanceState) rebalanceReplicas(
327324
// we have already excluded those stores above.
328325
continue
329326
}
330-
nodeID := rs.cs.stores[storeID].NodeID
331-
for _, storeID := range rs.cs.nodes[nodeID].stores {
332-
rs.scratch.storesToExcludeForRange.insert(storeID)
327+
nodeID := re.cs.stores[storeID].NodeID
328+
for _, storeID := range re.cs.nodes[nodeID].stores {
329+
re.scratch.storesToExcludeForRange.insert(storeID)
333330
}
334331
}
335332
// TODO(sumeer): eliminate cands allocations by passing a scratch slice.
336-
cands, ssSLS := rs.cs.computeCandidatesForRange(ctx, rs.scratch.disj[:], rs.scratch.storesToExcludeForRange, store.StoreID)
333+
cands, ssSLS := re.cs.computeCandidatesForRange(ctx, re.scratch.disj[:], re.scratch.storesToExcludeForRange, store.StoreID)
337334
log.KvDistribution.VInfof(ctx, 2, "considering replica-transfer r%v from s%v: store load %v",
338335
rangeID, store.StoreID, ss.adjusted.load)
339336
if log.V(2) {
@@ -353,15 +350,15 @@ func (rs *rebalanceState) rebalanceReplicas(
353350
} else {
354351
rlocalities = rstate.constraints.replicaLocalityTiers
355352
}
356-
localities := rs.dsm.getExistingReplicaLocalities(rlocalities)
353+
localities := re.dsm.getExistingReplicaLocalities(rlocalities)
357354
isLeaseholder := rstate.constraints.leaseholderID == store.StoreID
358355
// Set the diversity score and lease preference index of the candidates.
359356
for _, cand := range cands.candidates {
360357
cand.diversityScore = localities.getScoreChangeForRebalance(
361-
ss.localityTiers, rs.cs.stores[cand.StoreID].localityTiers)
358+
ss.localityTiers, re.cs.stores[cand.StoreID].localityTiers)
362359
if isLeaseholder {
363360
cand.leasePreferenceIndex = matchedLeasePreferenceIndex(
364-
cand.StoreID, rstate.constraints.spanConfig.leasePreferences, rs.cs.constraintMatcher)
361+
cand.StoreID, rstate.constraints.spanConfig.leasePreferences, re.cs.constraintMatcher)
365362
}
366363
}
367364
// Consider a cluster where s1 is overloadSlow, s2 is loadNoChange, and
@@ -375,7 +372,7 @@ func (rs *rebalanceState) rebalanceReplicas(
375372
// simple but effective manner. For now, we capture this using these
376373
// grace duration thresholds.
377374
ignoreLevel := ignoreLoadNoChangeAndHigher
378-
overloadDur := rs.now.Sub(ss.overloadStartTime)
375+
overloadDur := re.now.Sub(ss.overloadStartTime)
379376
if overloadDur > ignoreHigherThanLoadThresholdGraceDuration {
380377
ignoreLevel = ignoreHigherThanLoadThreshold
381378
log.KvDistribution.VInfof(ctx, 3, "using level %v (threshold:%v) for r%d based on overload duration %v",
@@ -386,18 +383,18 @@ func (rs *rebalanceState) rebalanceReplicas(
386383
ignoreLevel, ssSLS.sls, rangeID, overloadDur)
387384
}
388385
targetStoreID := sortTargetCandidateSetAndPick(
389-
ctx, cands, ssSLS.sls, ignoreLevel, loadDim, rs.rng)
386+
ctx, cands, ssSLS.sls, ignoreLevel, loadDim, re.rng)
390387
if targetStoreID == 0 {
391388
log.KvDistribution.VInfof(ctx, 2, "result(failed): no suitable target found among candidates for r%d "+
392389
"(threshold %s; %s)", rangeID, ssSLS.sls, ignoreLevel)
393390
continue
394391
}
395-
targetSS := rs.cs.stores[targetStoreID]
392+
targetSS := re.cs.stores[targetStoreID]
396393
addedLoad := rstate.load.Load
397394
if !isLeaseholder {
398395
addedLoad[CPURate] = rstate.load.RaftCPU
399396
}
400-
if !rs.cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, cands.means, false, loadDim) {
397+
if !re.cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, cands.means, false, loadDim) {
401398
log.KvDistribution.VInfof(ctx, 2, "result(failed): cannot shed from s%d to s%d for r%d: delta load %v",
402399
store.StoreID, targetStoreID, rangeID, addedLoad)
403400
continue
@@ -418,18 +415,18 @@ func (rs *rebalanceState) rebalanceReplicas(
418415
replicaChanges := makeRebalanceReplicaChanges(
419416
rangeID, rstate.replicas, rstate.load, addTarget, removeTarget)
420417
rangeChange := MakePendingRangeChange(rangeID, replicaChanges[:])
421-
if err = rs.cs.preCheckOnApplyReplicaChanges(rangeChange.pendingReplicaChanges); err != nil {
418+
if err = re.cs.preCheckOnApplyReplicaChanges(rangeChange.pendingReplicaChanges); err != nil {
422419
panic(errors.Wrapf(err, "pre-check failed for replica changes: %v for %v",
423420
replicaChanges, rangeID))
424421
}
425-
rs.cs.addPendingRangeChange(rangeChange)
426-
rs.changes = append(rs.changes, rangeChange)
427-
rs.rangeMoveCount++
422+
re.cs.addPendingRangeChange(rangeChange)
423+
re.changes = append(re.changes, rangeChange)
424+
re.rangeMoveCount++
428425
log.KvDistribution.VInfof(ctx, 2,
429426
"result(success): rebalancing r%v from s%v to s%v [change: %v] with resulting loads source: %v target: %v",
430-
rangeID, removeTarget.StoreID, addTarget.StoreID, rs.changes[len(rs.changes)-1], ss.adjusted.load, targetSS.adjusted.load)
431-
if rs.rangeMoveCount >= rs.maxRangeMoveCount {
432-
log.KvDistribution.VInfof(ctx, 2, "s%d has reached max range move count %d: mma returning", store.StoreID, rs.maxRangeMoveCount)
427+
rangeID, removeTarget.StoreID, addTarget.StoreID, re.changes[len(re.changes)-1], ss.adjusted.load, targetSS.adjusted.load)
428+
if re.rangeMoveCount >= re.maxRangeMoveCount {
429+
log.KvDistribution.VInfof(ctx, 2, "s%d has reached max range move count %d: mma returning", store.StoreID, re.maxRangeMoveCount)
433430
return
434431
}
435432
doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold
@@ -450,11 +447,8 @@ func (rs *rebalanceState) rebalanceReplicas(
450447
}
451448
}
452449

453-
func (rs *rebalanceState) rebalanceLeases(
454-
ctx context.Context,
455-
ss *storeState,
456-
store sheddingStore,
457-
localStoreID roachpb.StoreID,
450+
func (re *rebalanceEnv) rebalanceLeases(
451+
ctx context.Context, ss *storeState, store sheddingStore, localStoreID roachpb.StoreID,
458452
) bool {
459453
log.KvDistribution.VInfof(ctx, 2, "local store s%d is CPU overloaded (%v >= %v), attempting lease transfers first",
460454
store.StoreID, store.dimSummary[CPURate], overloadSlow)
@@ -468,7 +462,7 @@ func (rs *rebalanceState) rebalanceLeases(
468462
doneShedding := false
469463
for i := 0; i < n; i++ {
470464
rangeID := topKRanges.index(i)
471-
rstate := rs.cs.ranges[rangeID]
465+
rstate := re.cs.ranges[rangeID]
472466
if len(rstate.pendingChanges) > 0 {
473467
// If the range has pending changes, don't make more changes.
474468
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: has pending changes", rangeID)
@@ -490,11 +484,11 @@ func (rs *rebalanceState) rebalanceLeases(
490484
" changes but is not leaseholder: %+v", rstate)
491485
}
492486
}
493-
if rs.now.Sub(rstate.lastFailedChange) < rs.lastFailedChangeDelayDuration {
487+
if re.now.Sub(rstate.lastFailedChange) < re.lastFailedChangeDelayDuration {
494488
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: too soon after failed change", rangeID)
495489
continue
496490
}
497-
if !rs.cs.ensureAnalyzedConstraints(rstate) {
491+
if !re.cs.ensureAnalyzedConstraints(rstate) {
498492
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraints analysis failed", rangeID)
499493
continue
500494
}
@@ -526,9 +520,9 @@ func (rs *rebalanceState) rebalanceLeases(
526520
if len(candsPL) <= 1 {
527521
continue // leaseholder is the only candidate
528522
}
529-
clear(rs.scratch.nodes)
530-
means := computeMeansForStoreSet(rs.cs, candsPL, rs.scratch.nodes, rs.scratch.stores)
531-
sls := rs.cs.computeLoadSummary(ctx, store.StoreID, &means.storeLoad, &means.nodeLoad)
523+
clear(re.scratch.nodes)
524+
means := computeMeansForStoreSet(re.cs, candsPL, re.scratch.nodes, re.scratch.stores)
525+
sls := re.cs.computeLoadSummary(ctx, store.StoreID, &means.storeLoad, &means.nodeLoad)
532526
log.KvDistribution.VInfof(ctx, 2, "considering lease-transfer r%v from s%v: candidates are %v", rangeID, store.StoreID, candsPL)
533527
if sls.dimSummary[CPURate] < overloadSlow {
534528
// This store is not cpu overloaded relative to these candidates for
@@ -538,13 +532,13 @@ func (rs *rebalanceState) rebalanceLeases(
538532
}
539533
var candsSet candidateSet
540534
for _, cand := range cands {
541-
if disp := rs.cs.stores[cand.storeID].adjusted.replicas[rangeID].LeaseDisposition; disp != LeaseDispositionOK {
535+
if disp := re.cs.stores[cand.storeID].adjusted.replicas[rangeID].LeaseDisposition; disp != LeaseDispositionOK {
542536
// Don't transfer lease to a store that is lagging.
543537
log.KvDistribution.Infof(ctx, "skipping store s%d for lease transfer: lease disposition %v",
544538
cand.storeID, disp)
545539
continue
546540
}
547-
candSls := rs.cs.computeLoadSummary(ctx, cand.storeID, &means.storeLoad, &means.nodeLoad)
541+
candSls := re.cs.computeLoadSummary(ctx, cand.storeID, &means.storeLoad, &means.nodeLoad)
548542
candsSet.candidates = append(candsSet.candidates, candidateInfo{
549543
StoreID: cand.storeID,
550544
storeLoadSummary: candSls,
@@ -565,15 +559,15 @@ func (rs *rebalanceState) rebalanceLeases(
565559
// will only add CPU to the target store (so it is ok to ignore other
566560
// dimensions on the target).
567561
targetStoreID := sortTargetCandidateSetAndPick(
568-
ctx, candsSet, sls.sls, ignoreHigherThanLoadThreshold, CPURate, rs.rng)
562+
ctx, candsSet, sls.sls, ignoreHigherThanLoadThreshold, CPURate, re.rng)
569563
if targetStoreID == 0 {
570564
log.KvDistribution.Infof(
571565
ctx,
572566
"result(failed): no candidates to move lease from n%vs%v for r%v after sortTargetCandidateSetAndPick",
573567
ss.NodeID, ss.StoreID, rangeID)
574568
continue
575569
}
576-
targetSS := rs.cs.stores[targetStoreID]
570+
targetSS := re.cs.stores[targetStoreID]
577571
var addedLoad LoadVector
578572
// Only adding leaseholder CPU.
579573
addedLoad[CPURate] = rstate.load.Load[CPURate] - rstate.load.RaftCPU
@@ -583,7 +577,7 @@ func (rs *rebalanceState) rebalanceLeases(
583577
addedLoad[CPURate] = 0
584578
panic("raft cpu higher than total cpu")
585579
}
586-
if !rs.cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, &means, true, CPURate) {
580+
if !re.cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, &means, true, CPURate) {
587581
log.KvDistribution.VInfof(ctx, 2, "result(failed): cannot shed from s%d to s%d for r%d: delta load %v",
588582
store.StoreID, targetStoreID, rangeID, addedLoad)
589583
continue
@@ -599,25 +593,25 @@ func (rs *rebalanceState) rebalanceLeases(
599593
replicaChanges := MakeLeaseTransferChanges(
600594
rangeID, rstate.replicas, rstate.load, addTarget, removeTarget)
601595
leaseChange := MakePendingRangeChange(rangeID, replicaChanges[:])
602-
if err := rs.cs.preCheckOnApplyReplicaChanges(leaseChange.pendingReplicaChanges); err != nil {
596+
if err := re.cs.preCheckOnApplyReplicaChanges(leaseChange.pendingReplicaChanges); err != nil {
603597
panic(errors.Wrapf(err, "pre-check failed for lease transfer %v", leaseChange))
604598
}
605-
rs.cs.addPendingRangeChange(leaseChange)
606-
rs.changes = append(rs.changes, leaseChange)
607-
rs.leaseTransferCount++
599+
re.cs.addPendingRangeChange(leaseChange)
600+
re.changes = append(re.changes, leaseChange)
601+
re.leaseTransferCount++
608602
localLeaseTransferCount++
609-
if rs.changes[len(rs.changes)-1].IsChangeReplicas() || !rs.changes[len(rs.changes)-1].IsTransferLease() {
610-
panic(fmt.Sprintf("lease transfer is invalid: %v", rs.changes[len(rs.changes)-1]))
603+
if re.changes[len(re.changes)-1].IsChangeReplicas() || !re.changes[len(re.changes)-1].IsTransferLease() {
604+
panic(fmt.Sprintf("lease transfer is invalid: %v", re.changes[len(re.changes)-1]))
611605
}
612606
log.KvDistribution.Infof(ctx,
613607
"result(success): shedding r%v lease from s%v to s%v [change:%v] with "+
614608
"resulting loads source:%v target:%v (means: %v) (frac_pending: (src:%.2f,target:%.2f) (src:%.2f,target:%.2f))",
615-
rangeID, removeTarget.StoreID, addTarget.StoreID, rs.changes[len(rs.changes)-1],
609+
rangeID, removeTarget.StoreID, addTarget.StoreID, re.changes[len(re.changes)-1],
616610
ss.adjusted.load, targetSS.adjusted.load, means.storeLoad.load,
617611
ss.maxFractionPendingIncrease, ss.maxFractionPendingDecrease,
618612
targetSS.maxFractionPendingIncrease, targetSS.maxFractionPendingDecrease)
619-
if rs.leaseTransferCount >= rs.maxLeaseTransferCount {
620-
log.KvDistribution.VInfof(ctx, 2, "reached max lease transfer count %d, returning", rs.maxLeaseTransferCount)
613+
if re.leaseTransferCount >= re.maxLeaseTransferCount {
614+
log.KvDistribution.VInfof(ctx, 2, "reached max lease transfer count %d, returning", re.maxLeaseTransferCount)
621615
break
622616
}
623617
doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold

0 commit comments

Comments
 (0)