Skip to content

Commit d236fa5

Browse files
committed
mmaprototype: embed clusterState in rebalanceEnv
It's referenced a lot, so this declutters the code.
1 parent 8b6f928 commit d236fa5

File tree

1 file changed

+26
-26
lines changed

1 file changed

+26
-26
lines changed

pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ var mmaid = atomic.Int64{}
2525

2626
// rebalanceEnv tracks the state and outcomes of a rebalanceStores invocation.
2727
type rebalanceEnv struct {
28-
cs *clusterState
28+
*clusterState
2929
// rng is the random number generator for non-deterministic decisions.
3030
rng *rand.Rand
3131
// dsm is the diversity scoring memo for computing diversity scores.
@@ -171,7 +171,7 @@ func (cs *clusterState) rebalanceStores(
171171
// See the long comment where rangeState.lastFailedChange is declared.
172172
const lastFailedChangeDelayDuration time.Duration = 60 * time.Second
173173
re := &rebalanceEnv{
174-
cs: cs,
174+
clusterState: cs,
175175
rng: rng,
176176
dsm: dsm,
177177
changes: []PendingRangeChange{},
@@ -198,7 +198,7 @@ func (re *rebalanceEnv) rebalanceStore(
198198
) {
199199
log.KvDistribution.Infof(ctx, "start processing shedding store s%d: cpu node load %s, store load %s, worst dim %s",
200200
store.StoreID, store.nls, store.sls, store.worstDim)
201-
ss := re.cs.stores[store.StoreID]
201+
ss := re.stores[store.StoreID]
202202

203203
if true {
204204
// Debug logging.
@@ -208,7 +208,7 @@ func (re *rebalanceEnv) rebalanceStore(
208208
var b strings.Builder
209209
for i := 0; i < n; i++ {
210210
rangeID := topKRanges.index(i)
211-
rstate := re.cs.ranges[rangeID]
211+
rstate := re.ranges[rangeID]
212212
load := rstate.load.Load
213213
if !ss.adjusted.replicas[rangeID].IsLeaseholder {
214214
load[CPURate] = rstate.load.RaftCPU
@@ -258,7 +258,7 @@ func (re *rebalanceEnv) rebalanceReplicas(
258258
re.scratch.storesToExclude = re.scratch.storesToExclude[:0]
259259
if excludeStoresOnNode {
260260
nodeID := ss.NodeID
261-
for _, storeID := range re.cs.nodes[nodeID].stores {
261+
for _, storeID := range re.nodes[nodeID].stores {
262262
re.scratch.storesToExclude.insert(storeID)
263263
}
264264
log.KvDistribution.VInfof(ctx, 2, "excluding all stores on n%d due to overload/fd status", nodeID)
@@ -275,7 +275,7 @@ func (re *rebalanceEnv) rebalanceReplicas(
275275
rangeID := topKRanges.index(i)
276276
// TODO(sumeer): the following code belongs in a closure, since we will
277277
// repeat it for some random selection of non topKRanges.
278-
rstate := re.cs.ranges[rangeID]
278+
rstate := re.ranges[rangeID]
279279
if len(rstate.pendingChanges) > 0 {
280280
// If the range has pending changes, don't make more changes.
281281
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: has pending changes", rangeID)
@@ -285,7 +285,7 @@ func (re *rebalanceEnv) rebalanceReplicas(
285285
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: too soon after failed change", rangeID)
286286
continue
287287
}
288-
if !re.cs.ensureAnalyzedConstraints(rstate) {
288+
if !re.ensureAnalyzedConstraints(rstate) {
289289
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraints analysis failed", rangeID)
290290
continue
291291
}
@@ -324,13 +324,13 @@ func (re *rebalanceEnv) rebalanceReplicas(
324324
// we have already excluded those stores above.
325325
continue
326326
}
327-
nodeID := re.cs.stores[storeID].NodeID
328-
for _, storeID := range re.cs.nodes[nodeID].stores {
327+
nodeID := re.stores[storeID].NodeID
328+
for _, storeID := range re.nodes[nodeID].stores {
329329
re.scratch.storesToExcludeForRange.insert(storeID)
330330
}
331331
}
332332
// TODO(sumeer): eliminate cands allocations by passing a scratch slice.
333-
cands, ssSLS := re.cs.computeCandidatesForRange(ctx, re.scratch.disj[:], re.scratch.storesToExcludeForRange, store.StoreID)
333+
cands, ssSLS := re.computeCandidatesForRange(ctx, re.scratch.disj[:], re.scratch.storesToExcludeForRange, store.StoreID)
334334
log.KvDistribution.VInfof(ctx, 2, "considering replica-transfer r%v from s%v: store load %v",
335335
rangeID, store.StoreID, ss.adjusted.load)
336336
if log.V(2) {
@@ -355,10 +355,10 @@ func (re *rebalanceEnv) rebalanceReplicas(
355355
// Set the diversity score and lease preference index of the candidates.
356356
for _, cand := range cands.candidates {
357357
cand.diversityScore = localities.getScoreChangeForRebalance(
358-
ss.localityTiers, re.cs.stores[cand.StoreID].localityTiers)
358+
ss.localityTiers, re.stores[cand.StoreID].localityTiers)
359359
if isLeaseholder {
360360
cand.leasePreferenceIndex = matchedLeasePreferenceIndex(
361-
cand.StoreID, rstate.constraints.spanConfig.leasePreferences, re.cs.constraintMatcher)
361+
cand.StoreID, rstate.constraints.spanConfig.leasePreferences, re.constraintMatcher)
362362
}
363363
}
364364
// Consider a cluster where s1 is overloadSlow, s2 is loadNoChange, and
@@ -389,12 +389,12 @@ func (re *rebalanceEnv) rebalanceReplicas(
389389
"(threshold %s; %s)", rangeID, ssSLS.sls, ignoreLevel)
390390
continue
391391
}
392-
targetSS := re.cs.stores[targetStoreID]
392+
targetSS := re.stores[targetStoreID]
393393
addedLoad := rstate.load.Load
394394
if !isLeaseholder {
395395
addedLoad[CPURate] = rstate.load.RaftCPU
396396
}
397-
if !re.cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, cands.means, false, loadDim) {
397+
if !re.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, cands.means, false, loadDim) {
398398
log.KvDistribution.VInfof(ctx, 2, "result(failed): cannot shed from s%d to s%d for r%d: delta load %v",
399399
store.StoreID, targetStoreID, rangeID, addedLoad)
400400
continue
@@ -415,11 +415,11 @@ func (re *rebalanceEnv) rebalanceReplicas(
415415
replicaChanges := makeRebalanceReplicaChanges(
416416
rangeID, rstate.replicas, rstate.load, addTarget, removeTarget)
417417
rangeChange := MakePendingRangeChange(rangeID, replicaChanges[:])
418-
if err = re.cs.preCheckOnApplyReplicaChanges(rangeChange.pendingReplicaChanges); err != nil {
418+
if err = re.preCheckOnApplyReplicaChanges(rangeChange.pendingReplicaChanges); err != nil {
419419
panic(errors.Wrapf(err, "pre-check failed for replica changes: %v for %v",
420420
replicaChanges, rangeID))
421421
}
422-
re.cs.addPendingRangeChange(rangeChange)
422+
re.addPendingRangeChange(rangeChange)
423423
re.changes = append(re.changes, rangeChange)
424424
re.rangeMoveCount++
425425
log.KvDistribution.VInfof(ctx, 2,
@@ -462,7 +462,7 @@ func (re *rebalanceEnv) rebalanceLeases(
462462
doneShedding := false
463463
for i := 0; i < n; i++ {
464464
rangeID := topKRanges.index(i)
465-
rstate := re.cs.ranges[rangeID]
465+
rstate := re.ranges[rangeID]
466466
if len(rstate.pendingChanges) > 0 {
467467
// If the range has pending changes, don't make more changes.
468468
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: has pending changes", rangeID)
@@ -488,7 +488,7 @@ func (re *rebalanceEnv) rebalanceLeases(
488488
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: too soon after failed change", rangeID)
489489
continue
490490
}
491-
if !re.cs.ensureAnalyzedConstraints(rstate) {
491+
if !re.ensureAnalyzedConstraints(rstate) {
492492
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraints analysis failed", rangeID)
493493
continue
494494
}
@@ -521,8 +521,8 @@ func (re *rebalanceEnv) rebalanceLeases(
521521
continue // leaseholder is the only candidate
522522
}
523523
clear(re.scratch.nodes)
524-
means := computeMeansForStoreSet(re.cs, candsPL, re.scratch.nodes, re.scratch.stores)
525-
sls := re.cs.computeLoadSummary(ctx, store.StoreID, &means.storeLoad, &means.nodeLoad)
524+
means := computeMeansForStoreSet(re, candsPL, re.scratch.nodes, re.scratch.stores)
525+
sls := re.computeLoadSummary(ctx, store.StoreID, &means.storeLoad, &means.nodeLoad)
526526
log.KvDistribution.VInfof(ctx, 2, "considering lease-transfer r%v from s%v: candidates are %v", rangeID, store.StoreID, candsPL)
527527
if sls.dimSummary[CPURate] < overloadSlow {
528528
// This store is not cpu overloaded relative to these candidates for
@@ -532,13 +532,13 @@ func (re *rebalanceEnv) rebalanceLeases(
532532
}
533533
var candsSet candidateSet
534534
for _, cand := range cands {
535-
if disp := re.cs.stores[cand.storeID].adjusted.replicas[rangeID].LeaseDisposition; disp != LeaseDispositionOK {
535+
if disp := re.stores[cand.storeID].adjusted.replicas[rangeID].LeaseDisposition; disp != LeaseDispositionOK {
536536
// Don't transfer lease to a store that is lagging.
537537
log.KvDistribution.Infof(ctx, "skipping store s%d for lease transfer: lease disposition %v",
538538
cand.storeID, disp)
539539
continue
540540
}
541-
candSls := re.cs.computeLoadSummary(ctx, cand.storeID, &means.storeLoad, &means.nodeLoad)
541+
candSls := re.computeLoadSummary(ctx, cand.storeID, &means.storeLoad, &means.nodeLoad)
542542
candsSet.candidates = append(candsSet.candidates, candidateInfo{
543543
StoreID: cand.storeID,
544544
storeLoadSummary: candSls,
@@ -567,7 +567,7 @@ func (re *rebalanceEnv) rebalanceLeases(
567567
ss.NodeID, ss.StoreID, rangeID)
568568
continue
569569
}
570-
targetSS := re.cs.stores[targetStoreID]
570+
targetSS := re.stores[targetStoreID]
571571
var addedLoad LoadVector
572572
// Only adding leaseholder CPU.
573573
addedLoad[CPURate] = rstate.load.Load[CPURate] - rstate.load.RaftCPU
@@ -577,7 +577,7 @@ func (re *rebalanceEnv) rebalanceLeases(
577577
addedLoad[CPURate] = 0
578578
panic("raft cpu higher than total cpu")
579579
}
580-
if !re.cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, &means, true, CPURate) {
580+
if !re.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, &means, true, CPURate) {
581581
log.KvDistribution.VInfof(ctx, 2, "result(failed): cannot shed from s%d to s%d for r%d: delta load %v",
582582
store.StoreID, targetStoreID, rangeID, addedLoad)
583583
continue
@@ -593,10 +593,10 @@ func (re *rebalanceEnv) rebalanceLeases(
593593
replicaChanges := MakeLeaseTransferChanges(
594594
rangeID, rstate.replicas, rstate.load, addTarget, removeTarget)
595595
leaseChange := MakePendingRangeChange(rangeID, replicaChanges[:])
596-
if err := re.cs.preCheckOnApplyReplicaChanges(leaseChange.pendingReplicaChanges); err != nil {
596+
if err := re.preCheckOnApplyReplicaChanges(leaseChange.pendingReplicaChanges); err != nil {
597597
panic(errors.Wrapf(err, "pre-check failed for lease transfer %v", leaseChange))
598598
}
599-
re.cs.addPendingRangeChange(leaseChange)
599+
re.addPendingRangeChange(leaseChange)
600600
re.changes = append(re.changes, leaseChange)
601601
re.leaseTransferCount++
602602
localLeaseTransferCount++

0 commit comments

Comments
 (0)