Skip to content

Commit ac2e740

Browse files
committed
mmaprototype: add code block wrapper for replica rebalancing logic
Add a new code block (braces) around the replica rebalancing logic in rebalanceStore to prepare for extraction into a separate method. This is a mechanical whitespace change that will be followed by further refactoring steps.
1 parent c2c6ad7 commit ac2e740

File tree

1 file changed

+188
-187
lines changed

1 file changed

+188
-187
lines changed

pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go

Lines changed: 188 additions & 187 deletions
Original file line numberDiff line numberDiff line change
@@ -237,206 +237,207 @@ func (rs *rebalanceState) rebalanceStore(
237237
}
238238

239239
log.KvDistribution.VInfof(ctx, 2, "attempting to shed replicas next")
240-
241-
if store.StoreID != localStoreID && store.dimSummary[CPURate] >= overloadSlow &&
242-
now.Sub(ss.overloadStartTime) < remoteStoreLeaseSheddingGraceDuration {
243-
log.KvDistribution.VInfof(ctx, 2, "skipping remote store s%d: in lease shedding grace period", store.StoreID)
244-
return
245-
}
246-
// If the node is cpu overloaded, or the store/node is not fdOK, exclude
247-
// the other stores on this node from receiving replicas shed by this
248-
// store.
249-
excludeStoresOnNode := store.nls > overloadSlow
250-
rs.scratch.storesToExclude = rs.scratch.storesToExclude[:0]
251-
if excludeStoresOnNode {
252-
nodeID := ss.NodeID
253-
for _, storeID := range rs.cs.nodes[nodeID].stores {
254-
rs.scratch.storesToExclude.insert(storeID)
255-
}
256-
log.KvDistribution.VInfof(ctx, 2, "excluding all stores on n%d due to overload/fd status", nodeID)
257-
} else {
258-
// This store is excluded of course.
259-
rs.scratch.storesToExclude.insert(store.StoreID)
260-
}
261-
262-
// Iterate over top-K ranges first and try to move them.
263-
topKRanges := ss.adjusted.topKRanges[localStoreID]
264-
n := topKRanges.len()
265-
loadDim := topKRanges.dim
266-
for i := 0; i < n; i++ {
267-
rangeID := topKRanges.index(i)
268-
// TODO(sumeer): the following code belongs in a closure, since we will
269-
// repeat it for some random selection of non topKRanges.
270-
rstate := rs.cs.ranges[rangeID]
271-
if len(rstate.pendingChanges) > 0 {
272-
// If the range has pending changes, don't make more changes.
273-
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: has pending changes", rangeID)
274-
continue
275-
}
276-
if now.Sub(rstate.lastFailedChange) < rs.lastFailedChangeDelayDuration {
277-
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: too soon after failed change", rangeID)
278-
continue
279-
}
280-
if !rs.cs.ensureAnalyzedConstraints(rstate) {
281-
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraints analysis failed", rangeID)
282-
continue
283-
}
284-
isVoter, isNonVoter := rstate.constraints.replicaRole(store.StoreID)
285-
if !isVoter && !isNonVoter {
286-
// We should not panic here since the replicateQueue may have shed the
287-
// lease and informed MMA, since the last time MMA computed the top-k
288-
// ranges. This is useful for debugging in the prototype, due to the
289-
// lack of unit tests.
290-
panic(fmt.Sprintf("internal state inconsistency: "+
291-
"store=%v range_id=%v pending-changes=%v "+
292-
"rstate_replicas=%v rstate_constraints=%v",
293-
store.StoreID, rangeID, rstate.pendingChanges, rstate.replicas, rstate.constraints))
240+
{
241+
if store.StoreID != localStoreID && store.dimSummary[CPURate] >= overloadSlow &&
242+
now.Sub(ss.overloadStartTime) < remoteStoreLeaseSheddingGraceDuration {
243+
log.KvDistribution.VInfof(ctx, 2, "skipping remote store s%d: in lease shedding grace period", store.StoreID)
244+
return
294245
}
295-
var conj constraintsConj
296-
var err error
297-
if isVoter {
298-
conj, err = rstate.constraints.candidatesToReplaceVoterForRebalance(store.StoreID)
246+
// If the node is cpu overloaded, or the store/node is not fdOK, exclude
247+
// the other stores on this node from receiving replicas shed by this
248+
// store.
249+
excludeStoresOnNode := store.nls > overloadSlow
250+
rs.scratch.storesToExclude = rs.scratch.storesToExclude[:0]
251+
if excludeStoresOnNode {
252+
nodeID := ss.NodeID
253+
for _, storeID := range rs.cs.nodes[nodeID].stores {
254+
rs.scratch.storesToExclude.insert(storeID)
255+
}
256+
log.KvDistribution.VInfof(ctx, 2, "excluding all stores on n%d due to overload/fd status", nodeID)
299257
} else {
300-
conj, err = rstate.constraints.candidatesToReplaceNonVoterForRebalance(store.StoreID)
258+
// This store is excluded of course.
259+
rs.scratch.storesToExclude.insert(store.StoreID)
301260
}
302-
if err != nil {
303-
// This range has some constraints that are violated. Let those be
304-
// fixed first.
305-
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraint violation needs fixing first: %v", rangeID, err)
306-
continue
307-
}
308-
rs.scratch.disj[0] = conj
309-
rs.scratch.storesToExcludeForRange = append(rs.scratch.storesToExcludeForRange[:0], rs.scratch.storesToExclude...)
310-
// Also exclude all stores on nodes that have existing replicas.
311-
for _, replica := range rstate.replicas {
312-
storeID := replica.StoreID
313-
if storeID == store.StoreID {
314-
// We don't exclude other stores on this node, since we are allowed to
315-
// transfer the range to them. If the node is overloaded or not fdOK,
316-
// we have already excluded those stores above.
261+
262+
// Iterate over top-K ranges first and try to move them.
263+
topKRanges := ss.adjusted.topKRanges[localStoreID]
264+
n := topKRanges.len()
265+
loadDim := topKRanges.dim
266+
for i := 0; i < n; i++ {
267+
rangeID := topKRanges.index(i)
268+
// TODO(sumeer): the following code belongs in a closure, since we will
269+
// repeat it for some random selection of non topKRanges.
270+
rstate := rs.cs.ranges[rangeID]
271+
if len(rstate.pendingChanges) > 0 {
272+
// If the range has pending changes, don't make more changes.
273+
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: has pending changes", rangeID)
317274
continue
318275
}
319-
nodeID := rs.cs.stores[storeID].NodeID
320-
for _, storeID := range rs.cs.nodes[nodeID].stores {
321-
rs.scratch.storesToExcludeForRange.insert(storeID)
276+
if now.Sub(rstate.lastFailedChange) < rs.lastFailedChangeDelayDuration {
277+
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: too soon after failed change", rangeID)
278+
continue
322279
}
323-
}
324-
// TODO(sumeer): eliminate cands allocations by passing a scratch slice.
325-
cands, ssSLS := rs.cs.computeCandidatesForRange(ctx, rs.scratch.disj[:], rs.scratch.storesToExcludeForRange, store.StoreID)
326-
log.KvDistribution.VInfof(ctx, 2, "considering replica-transfer r%v from s%v: store load %v",
327-
rangeID, store.StoreID, ss.adjusted.load)
328-
if log.V(2) {
329-
log.KvDistribution.Infof(ctx, "candidates are:")
330-
for _, c := range cands.candidates {
331-
log.KvDistribution.Infof(ctx, " s%d: %s", c.StoreID, c.storeLoadSummary)
280+
if !rs.cs.ensureAnalyzedConstraints(rstate) {
281+
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraints analysis failed", rangeID)
282+
continue
283+
}
284+
isVoter, isNonVoter := rstate.constraints.replicaRole(store.StoreID)
285+
if !isVoter && !isNonVoter {
286+
// We should not panic here since the replicateQueue may have shed the
287+
// lease and informed MMA, since the last time MMA computed the top-k
288+
// ranges. This is useful for debugging in the prototype, due to the
289+
// lack of unit tests.
290+
panic(fmt.Sprintf("internal state inconsistency: "+
291+
"store=%v range_id=%v pending-changes=%v "+
292+
"rstate_replicas=%v rstate_constraints=%v",
293+
store.StoreID, rangeID, rstate.pendingChanges, rstate.replicas, rstate.constraints))
294+
}
295+
var conj constraintsConj
296+
var err error
297+
if isVoter {
298+
conj, err = rstate.constraints.candidatesToReplaceVoterForRebalance(store.StoreID)
299+
} else {
300+
conj, err = rstate.constraints.candidatesToReplaceNonVoterForRebalance(store.StoreID)
301+
}
302+
if err != nil {
303+
// This range has some constraints that are violated. Let those be
304+
// fixed first.
305+
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraint violation needs fixing first: %v", rangeID, err)
306+
continue
307+
}
308+
rs.scratch.disj[0] = conj
309+
rs.scratch.storesToExcludeForRange = append(rs.scratch.storesToExcludeForRange[:0], rs.scratch.storesToExclude...)
310+
// Also exclude all stores on nodes that have existing replicas.
311+
for _, replica := range rstate.replicas {
312+
storeID := replica.StoreID
313+
if storeID == store.StoreID {
314+
// We don't exclude other stores on this node, since we are allowed to
315+
// transfer the range to them. If the node is overloaded or not fdOK,
316+
// we have already excluded those stores above.
317+
continue
318+
}
319+
nodeID := rs.cs.stores[storeID].NodeID
320+
for _, storeID := range rs.cs.nodes[nodeID].stores {
321+
rs.scratch.storesToExcludeForRange.insert(storeID)
322+
}
323+
}
324+
// TODO(sumeer): eliminate cands allocations by passing a scratch slice.
325+
cands, ssSLS := rs.cs.computeCandidatesForRange(ctx, rs.scratch.disj[:], rs.scratch.storesToExcludeForRange, store.StoreID)
326+
log.KvDistribution.VInfof(ctx, 2, "considering replica-transfer r%v from s%v: store load %v",
327+
rangeID, store.StoreID, ss.adjusted.load)
328+
if log.V(2) {
329+
log.KvDistribution.Infof(ctx, "candidates are:")
330+
for _, c := range cands.candidates {
331+
log.KvDistribution.Infof(ctx, " s%d: %s", c.StoreID, c.storeLoadSummary)
332+
}
332333
}
333-
}
334334

335-
if len(cands.candidates) == 0 {
336-
log.KvDistribution.VInfof(ctx, 2, "result(failed): no candidates found for r%d after exclusions", rangeID)
337-
continue
338-
}
339-
var rlocalities replicasLocalityTiers
340-
if isVoter {
341-
rlocalities = rstate.constraints.voterLocalityTiers
342-
} else {
343-
rlocalities = rstate.constraints.replicaLocalityTiers
344-
}
345-
localities := rs.dsm.getExistingReplicaLocalities(rlocalities)
346-
isLeaseholder := rstate.constraints.leaseholderID == store.StoreID
347-
// Set the diversity score and lease preference index of the candidates.
348-
for _, cand := range cands.candidates {
349-
cand.diversityScore = localities.getScoreChangeForRebalance(
350-
ss.localityTiers, rs.cs.stores[cand.StoreID].localityTiers)
351-
if isLeaseholder {
352-
cand.leasePreferenceIndex = matchedLeasePreferenceIndex(
353-
cand.StoreID, rstate.constraints.spanConfig.leasePreferences, rs.cs.constraintMatcher)
335+
if len(cands.candidates) == 0 {
336+
log.KvDistribution.VInfof(ctx, 2, "result(failed): no candidates found for r%d after exclusions", rangeID)
337+
continue
338+
}
339+
var rlocalities replicasLocalityTiers
340+
if isVoter {
341+
rlocalities = rstate.constraints.voterLocalityTiers
342+
} else {
343+
rlocalities = rstate.constraints.replicaLocalityTiers
344+
}
345+
localities := rs.dsm.getExistingReplicaLocalities(rlocalities)
346+
isLeaseholder := rstate.constraints.leaseholderID == store.StoreID
347+
// Set the diversity score and lease preference index of the candidates.
348+
for _, cand := range cands.candidates {
349+
cand.diversityScore = localities.getScoreChangeForRebalance(
350+
ss.localityTiers, rs.cs.stores[cand.StoreID].localityTiers)
351+
if isLeaseholder {
352+
cand.leasePreferenceIndex = matchedLeasePreferenceIndex(
353+
cand.StoreID, rstate.constraints.spanConfig.leasePreferences, rs.cs.constraintMatcher)
354+
}
355+
}
356+
// Consider a cluster where s1 is overloadSlow, s2 is loadNoChange, and
357+
// s3, s4 are loadNormal. Now s4 is considering rebalancing load away
358+
// from s1, but the candidate top-k range has replicas {s1, s3, s4}. So
359+
// the only way to shed load from s1 is a s1 => s2 move. But there may
360+
// be other ranges at other leaseholder stores which can be moved from
361+
// s1 => {s3, s4}. So we should not be doing this sub-optimal transfer
362+
// of load from s1 => s2 unless s1 is not seeing any load shedding for
363+
// some interval of time. We need a way to capture this information in a
364+
// simple but effective manner. For now, we capture this using these
365+
// grace duration thresholds.
366+
ignoreLevel := ignoreLoadNoChangeAndHigher
367+
overloadDur := now.Sub(ss.overloadStartTime)
368+
if overloadDur > ignoreHigherThanLoadThresholdGraceDuration {
369+
ignoreLevel = ignoreHigherThanLoadThreshold
370+
log.KvDistribution.VInfof(ctx, 3, "using level %v (threshold:%v) for r%d based on overload duration %v",
371+
ignoreLevel, ssSLS.sls, rangeID, overloadDur)
372+
} else if overloadDur > ignoreLoadThresholdAndHigherGraceDuration {
373+
ignoreLevel = ignoreLoadThresholdAndHigher
374+
log.KvDistribution.VInfof(ctx, 3, "using level %v (threshold:%v) for r%d based on overload duration %v",
375+
ignoreLevel, ssSLS.sls, rangeID, overloadDur)
376+
}
377+
targetStoreID := sortTargetCandidateSetAndPick(
378+
ctx, cands, ssSLS.sls, ignoreLevel, loadDim, rs.rng)
379+
if targetStoreID == 0 {
380+
log.KvDistribution.VInfof(ctx, 2, "result(failed): no suitable target found among candidates for r%d "+
381+
"(threshold %s; %s)", rangeID, ssSLS.sls, ignoreLevel)
382+
continue
383+
}
384+
targetSS := rs.cs.stores[targetStoreID]
385+
addedLoad := rstate.load.Load
386+
if !isLeaseholder {
387+
addedLoad[CPURate] = rstate.load.RaftCPU
388+
}
389+
if !rs.cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, cands.means, false, loadDim) {
390+
log.KvDistribution.VInfof(ctx, 2, "result(failed): cannot shed from s%d to s%d for r%d: delta load %v",
391+
store.StoreID, targetStoreID, rangeID, addedLoad)
392+
continue
393+
}
394+
addTarget := roachpb.ReplicationTarget{
395+
NodeID: targetSS.NodeID,
396+
StoreID: targetSS.StoreID,
397+
}
398+
removeTarget := roachpb.ReplicationTarget{
399+
NodeID: ss.NodeID,
400+
StoreID: ss.StoreID,
401+
}
402+
if addTarget.StoreID == removeTarget.StoreID {
403+
panic(fmt.Sprintf("internal state inconsistency: "+
404+
"add=%v==remove_target=%v range_id=%v candidates=%v",
405+
addTarget, removeTarget, rangeID, cands.candidates))
406+
}
407+
replicaChanges := makeRebalanceReplicaChanges(
408+
rangeID, rstate.replicas, rstate.load, addTarget, removeTarget)
409+
rangeChange := MakePendingRangeChange(rangeID, replicaChanges[:])
410+
if err = rs.cs.preCheckOnApplyReplicaChanges(rangeChange.pendingReplicaChanges); err != nil {
411+
panic(errors.Wrapf(err, "pre-check failed for replica changes: %v for %v",
412+
replicaChanges, rangeID))
413+
}
414+
rs.cs.addPendingRangeChange(rangeChange)
415+
rs.changes = append(rs.changes, rangeChange)
416+
rs.rangeMoveCount++
417+
log.KvDistribution.VInfof(ctx, 2,
418+
"result(success): rebalancing r%v from s%v to s%v [change: %v] with resulting loads source: %v target: %v",
419+
rangeID, removeTarget.StoreID, addTarget.StoreID, rs.changes[len(rs.changes)-1], ss.adjusted.load, targetSS.adjusted.load)
420+
if rs.rangeMoveCount >= rs.maxRangeMoveCount {
421+
log.KvDistribution.VInfof(ctx, 2, "s%d has reached max range move count %d: mma returning", store.StoreID, rs.maxRangeMoveCount)
422+
return
423+
}
424+
doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold
425+
if doneShedding {
426+
log.KvDistribution.VInfof(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after rebalancing: done shedding with %d left in topk",
427+
store.StoreID, ss.maxFractionPendingDecrease, maxFractionPendingThreshold, n-(i+1))
428+
break
354429
}
355430
}
356-
// Consider a cluster where s1 is overloadSlow, s2 is loadNoChange, and
357-
// s3, s4 are loadNormal. Now s4 is considering rebalancing load away
358-
// from s1, but the candidate top-k range has replicas {s1, s3, s4}. So
359-
// the only way to shed load from s1 is a s1 => s2 move. But there may
360-
// be other ranges at other leaseholder stores which can be moved from
361-
// s1 => {s3, s4}. So we should not be doing this sub-optimal transfer
362-
// of load from s1 => s2 unless s1 is not seeing any load shedding for
363-
// some interval of time. We need a way to capture this information in a
364-
// simple but effective manner. For now, we capture this using these
365-
// grace duration thresholds.
366-
ignoreLevel := ignoreLoadNoChangeAndHigher
367-
overloadDur := now.Sub(ss.overloadStartTime)
368-
if overloadDur > ignoreHigherThanLoadThresholdGraceDuration {
369-
ignoreLevel = ignoreHigherThanLoadThreshold
370-
log.KvDistribution.VInfof(ctx, 3, "using level %v (threshold:%v) for r%d based on overload duration %v",
371-
ignoreLevel, ssSLS.sls, rangeID, overloadDur)
372-
} else if overloadDur > ignoreLoadThresholdAndHigherGraceDuration {
373-
ignoreLevel = ignoreLoadThresholdAndHigher
374-
log.KvDistribution.VInfof(ctx, 3, "using level %v (threshold:%v) for r%d based on overload duration %v",
375-
ignoreLevel, ssSLS.sls, rangeID, overloadDur)
376-
}
377-
targetStoreID := sortTargetCandidateSetAndPick(
378-
ctx, cands, ssSLS.sls, ignoreLevel, loadDim, rs.rng)
379-
if targetStoreID == 0 {
380-
log.KvDistribution.VInfof(ctx, 2, "result(failed): no suitable target found among candidates for r%d "+
381-
"(threshold %s; %s)", rangeID, ssSLS.sls, ignoreLevel)
382-
continue
383-
}
384-
targetSS := rs.cs.stores[targetStoreID]
385-
addedLoad := rstate.load.Load
386-
if !isLeaseholder {
387-
addedLoad[CPURate] = rstate.load.RaftCPU
388-
}
389-
if !rs.cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, cands.means, false, loadDim) {
390-
log.KvDistribution.VInfof(ctx, 2, "result(failed): cannot shed from s%d to s%d for r%d: delta load %v",
391-
store.StoreID, targetStoreID, rangeID, addedLoad)
392-
continue
393-
}
394-
addTarget := roachpb.ReplicationTarget{
395-
NodeID: targetSS.NodeID,
396-
StoreID: targetSS.StoreID,
397-
}
398-
removeTarget := roachpb.ReplicationTarget{
399-
NodeID: ss.NodeID,
400-
StoreID: ss.StoreID,
401-
}
402-
if addTarget.StoreID == removeTarget.StoreID {
403-
panic(fmt.Sprintf("internal state inconsistency: "+
404-
"add=%v==remove_target=%v range_id=%v candidates=%v",
405-
addTarget, removeTarget, rangeID, cands.candidates))
406-
}
407-
replicaChanges := makeRebalanceReplicaChanges(
408-
rangeID, rstate.replicas, rstate.load, addTarget, removeTarget)
409-
rangeChange := MakePendingRangeChange(rangeID, replicaChanges[:])
410-
if err = rs.cs.preCheckOnApplyReplicaChanges(rangeChange.pendingReplicaChanges); err != nil {
411-
panic(errors.Wrapf(err, "pre-check failed for replica changes: %v for %v",
412-
replicaChanges, rangeID))
413-
}
414-
rs.cs.addPendingRangeChange(rangeChange)
415-
rs.changes = append(rs.changes, rangeChange)
416-
rs.rangeMoveCount++
417-
log.KvDistribution.VInfof(ctx, 2,
418-
"result(success): rebalancing r%v from s%v to s%v [change: %v] with resulting loads source: %v target: %v",
419-
rangeID, removeTarget.StoreID, addTarget.StoreID, rs.changes[len(rs.changes)-1], ss.adjusted.load, targetSS.adjusted.load)
420-
if rs.rangeMoveCount >= rs.maxRangeMoveCount {
421-
log.KvDistribution.VInfof(ctx, 2, "s%d has reached max range move count %d: mma returning", store.StoreID, rs.maxRangeMoveCount)
422-
return
423-
}
424-
doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold
431+
// TODO(sumeer): For regular rebalancing, we will wait until those top-K
432+
// move and then continue with the rest. There is a risk that the top-K
433+
// have some constraint that prevents rebalancing, while the rest can be
434+
// moved. Running with underprovisioned clusters and expecting load-based
435+
// rebalancing to work well is not in scope.
425436
if doneShedding {
426-
log.KvDistribution.VInfof(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after rebalancing: done shedding with %d left in topk",
427-
store.StoreID, ss.maxFractionPendingDecrease, maxFractionPendingThreshold, n-(i+1))
428-
break
437+
log.KvDistribution.VInfof(ctx, 2, "store s%d is done shedding, moving to next store", store.StoreID)
438+
return
429439
}
430440
}
431-
// TODO(sumeer): For regular rebalancing, we will wait until those top-K
432-
// move and then continue with the rest. There is a risk that the top-K
433-
// have some constraint that prevents rebalancing, while the rest can be
434-
// moved. Running with underprovisioned clusters and expecting load-based
435-
// rebalancing to work well is not in scope.
436-
if doneShedding {
437-
log.KvDistribution.VInfof(ctx, 2, "store s%d is done shedding, moving to next store", store.StoreID)
438-
return
439-
}
440441
}
441442

442443
func (rs *rebalanceState) rebalanceLeases(

0 commit comments

Comments
 (0)