Skip to content

Commit e5f4b62

Browse files
committed
mmaprototype: add max frac pending threshold to rebalanceEnv
This allows adjusting it in tests. The const is now only available in the newRebalanceEnv constructor, preventing accidental use elsewhere.
1 parent 6d74a68 commit e5f4b62

File tree

2 files changed

+25
-22
lines changed

2 files changed

+25
-22
lines changed

pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -212,13 +212,6 @@ type allocatorState struct {
212212

213213
var _ Allocator = &allocatorState{}
214214

215-
// TODO(sumeer): temporary constants.
216-
// TODO(tbg): avoid direct access to this constant so that it can be configured
217-
// in tests. As is, it can be overridden in some places but not others.
218-
const (
219-
maxFractionPendingThreshold = 0.1
220-
)
221-
222215
func NewAllocatorState(ts timeutil.TimeSource, rand *rand.Rand) *allocatorState {
223216
interner := newStringInterner()
224217
cs := newClusterState(ts, interner)
@@ -500,6 +493,7 @@ func sortTargetCandidateSetAndPick(
500493
ignoreLevel ignoreLevel,
501494
overloadedDim LoadDimension,
502495
rng *rand.Rand,
496+
maxFractionPendingThreshold float64,
503497
) roachpb.StoreID {
504498
var b strings.Builder
505499
for i := range cands.candidates {

pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ type rebalanceEnv struct {
5050
// rebalanceStores was called, this limit applies to this particular one
5151
// store.
5252
maxLeaseTransferCount int
53+
// If a store's maxFractionPendingDecrease is at least this threshold, no
54+
// further changes should be made at this time. This is because the inflight
55+
// operation's changes to the load are estimated, and we don't want to pile up
56+
// too many possibly inaccurate estimates.
57+
fractionPendingIncreaseOrDecreaseThreshold float64
5358
// lastFailedChangeDelayDuration is the delay after a failed change before retrying.
5459
lastFailedChangeDelayDuration time.Duration
5560
// now is the timestamp representing the start time of the current
@@ -84,18 +89,22 @@ func newRebalanceEnv(
8489
// trips through rebalanceStores which could cause some logging noise.
8590
maxRangeMoveCount = 1
8691
maxLeaseTransferCount = 8
92+
// TODO(sumeer): revisit this value.
93+
fractionPendingIncreaseOrDecreaseThreshold = 0.1
94+
8795
// See the long comment where rangeState.lastFailedChange is declared.
8896
lastFailedChangeDelayDuration time.Duration = 60 * time.Second
8997
)
9098

9199
re := &rebalanceEnv{
92-
clusterState: cs,
93-
rng: rng,
94-
dsm: dsm,
95-
now: now,
96-
maxRangeMoveCount: maxRangeMoveCount,
97-
maxLeaseTransferCount: maxLeaseTransferCount,
98-
lastFailedChangeDelayDuration: lastFailedChangeDelayDuration,
100+
clusterState: cs,
101+
rng: rng,
102+
dsm: dsm,
103+
now: now,
104+
maxRangeMoveCount: maxRangeMoveCount,
105+
maxLeaseTransferCount: maxLeaseTransferCount,
106+
fractionPendingIncreaseOrDecreaseThreshold: fractionPendingIncreaseOrDecreaseThreshold,
107+
lastFailedChangeDelayDuration: lastFailedChangeDelayDuration,
99108
}
100109
re.scratch.nodes = map[roachpb.NodeID]*NodeLoad{}
101110
re.scratch.stores = map[roachpb.StoreID]struct{}{}
@@ -175,15 +184,15 @@ func (re *rebalanceEnv) rebalanceStores(
175184
ss.overloadEndTime = time.Time{}
176185
}
177186
// The pending decrease must be small enough to continue shedding
178-
if ss.maxFractionPendingDecrease < maxFractionPendingThreshold &&
187+
if ss.maxFractionPendingDecrease < re.fractionPendingIncreaseOrDecreaseThreshold &&
179188
// There should be no pending increase, since that can be an overestimate.
180189
ss.maxFractionPendingIncrease < epsilon {
181190
log.KvDistribution.VEventf(ctx, 2, "store s%v was added to shedding store list", storeID)
182191
sheddingStores = append(sheddingStores, sheddingStore{StoreID: storeID, storeLoadSummary: sls})
183192
} else {
184193
log.KvDistribution.VEventf(ctx, 2,
185194
"skipping overloaded store s%d (worst dim: %s): pending decrease %.2f >= threshold %.2f or pending increase %.2f >= epsilon",
186-
storeID, sls.worstDim, ss.maxFractionPendingDecrease, maxFractionPendingThreshold, ss.maxFractionPendingIncrease)
195+
storeID, sls.worstDim, ss.maxFractionPendingDecrease, re.fractionPendingIncreaseOrDecreaseThreshold, ss.maxFractionPendingIncrease)
187196
}
188197
} else if sls.sls < loadNoChange && ss.overloadEndTime == (time.Time{}) {
189198
// NB: we don't stop the overloaded interval if the store is at
@@ -413,7 +422,7 @@ func (re *rebalanceEnv) rebalanceReplicas(
413422
ignoreLevel, ssSLS.sls, rangeID, overloadDur)
414423
}
415424
targetStoreID := sortTargetCandidateSetAndPick(
416-
ctx, cands, ssSLS.sls, ignoreLevel, loadDim, re.rng)
425+
ctx, cands, ssSLS.sls, ignoreLevel, loadDim, re.rng, re.fractionPendingIncreaseOrDecreaseThreshold)
417426
if targetStoreID == 0 {
418427
log.KvDistribution.VEventf(ctx, 2, "result(failed): no suitable target found among candidates for r%d "+
419428
"(threshold %s; %s)", rangeID, ssSLS.sls, ignoreLevel)
@@ -459,10 +468,10 @@ func (re *rebalanceEnv) rebalanceReplicas(
459468
log.KvDistribution.VEventf(ctx, 2, "s%d has reached max range move count %d: mma returning", store.StoreID, re.maxRangeMoveCount)
460469
return
461470
}
462-
doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold
471+
doneShedding = ss.maxFractionPendingDecrease >= re.fractionPendingIncreaseOrDecreaseThreshold
463472
if doneShedding {
464473
log.KvDistribution.VEventf(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after rebalancing: done shedding with %d left in topk",
465-
store.StoreID, ss.maxFractionPendingDecrease, maxFractionPendingThreshold, n-(i+1))
474+
store.StoreID, ss.maxFractionPendingDecrease, re.fractionPendingIncreaseOrDecreaseThreshold, n-(i+1))
466475
break
467476
}
468477
}
@@ -586,7 +595,7 @@ func (re *rebalanceEnv) rebalanceLeases(
586595
// will only add CPU to the target store (so it is ok to ignore other
587596
// dimensions on the target).
588597
targetStoreID := sortTargetCandidateSetAndPick(
589-
ctx, candsSet, sls.sls, ignoreHigherThanLoadThreshold, CPURate, re.rng)
598+
ctx, candsSet, sls.sls, ignoreHigherThanLoadThreshold, CPURate, re.rng, re.fractionPendingIncreaseOrDecreaseThreshold)
590599
if targetStoreID == 0 {
591600
log.KvDistribution.Infof(
592601
ctx,
@@ -641,10 +650,10 @@ func (re *rebalanceEnv) rebalanceLeases(
641650
log.KvDistribution.VEventf(ctx, 2, "reached max lease transfer count %d, returning", re.maxLeaseTransferCount)
642651
break
643652
}
644-
doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold
653+
doneShedding = ss.maxFractionPendingDecrease >= re.fractionPendingIncreaseOrDecreaseThreshold
645654
if doneShedding {
646655
log.KvDistribution.VEventf(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after lease transfers: done shedding with %d left in topK",
647-
store.StoreID, ss.maxFractionPendingDecrease, maxFractionPendingThreshold, n-(i+1))
656+
store.StoreID, ss.maxFractionPendingDecrease, re.fractionPendingIncreaseOrDecreaseThreshold, n-(i+1))
648657
break
649658
}
650659
}

0 commit comments

Comments
 (0)