Skip to content

Commit c2c6ad7

Browse files
committed
mmaprototype: extract lease rebalancing logic to method
Extract the lease rebalancing logic into a separate method `(*rebalanceState).rebalanceLeases`. This improves code organization by separating the lease rebalancing concern from the main rebalance logic. The method takes all necessary parameters and returns whether to skip replica moves. The extraction was done mechanically by the author using IDE tooling.
1 parent 51e53ae commit c2c6ad7

File tree

1 file changed

+194
-186
lines changed

1 file changed

+194
-186
lines changed

pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go

Lines changed: 194 additions & 186 deletions
Original file line numberDiff line numberDiff line change
@@ -227,192 +227,7 @@ func (rs *rebalanceState) rebalanceStore(
227227
// behalf of a particular store (vs. being called on behalf of the set
228228
// of local store IDs)?
229229
if ss.StoreID == localStoreID && store.dimSummary[CPURate] >= overloadSlow {
230-
shouldSkipReplicaMoves := func(rs *rebalanceState, ss *storeState, store sheddingStore, ctx context.Context, localStoreID roachpb.StoreID, now time.Time) bool {
231-
log.KvDistribution.VInfof(ctx, 2, "local store s%d is CPU overloaded (%v >= %v), attempting lease transfers first",
232-
store.StoreID, store.dimSummary[CPURate], overloadSlow)
233-
// This store is local, and cpu overloaded. Shed leases first.
234-
//
235-
// NB: any ranges at this store that don't have pending changes must
236-
// have this local store as the leaseholder.
237-
localLeaseTransferCount := 0
238-
topKRanges := ss.adjusted.topKRanges[localStoreID]
239-
n := topKRanges.len()
240-
doneShedding := false
241-
for i := 0; i < n; i++ {
242-
rangeID := topKRanges.index(i)
243-
rstate := rs.cs.ranges[rangeID]
244-
if len(rstate.pendingChanges) > 0 {
245-
// If the range has pending changes, don't make more changes.
246-
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: has pending changes", rangeID)
247-
continue
248-
}
249-
for _, repl := range rstate.replicas {
250-
if repl.StoreID != localStoreID { // NB: localStoreID == ss.StoreID == store.StoreID
251-
continue
252-
}
253-
if !repl.IsLeaseholder {
254-
// TODO(tbg): is this true? Can't there be ranges with replicas on
255-
// multiple local stores, and wouldn't this assertion fire in that
256-
// case once rebalanceStores is invoked on whichever of the two
257-
// stores doesn't hold the lease?
258-
//
259-
// TODO(tbg): see also the other assertion below (leaseholderID !=
260-
// store.StoreID) which seems similar to this one.
261-
log.KvDistribution.Fatalf(ctx, "internal state inconsistency: replica considered for lease shedding has no pending"+
262-
" changes but is not leaseholder: %+v", rstate)
263-
}
264-
}
265-
if now.Sub(rstate.lastFailedChange) < rs.lastFailedChangeDelayDuration {
266-
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: too soon after failed change", rangeID)
267-
continue
268-
}
269-
if !rs.cs.ensureAnalyzedConstraints(rstate) {
270-
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraints analysis failed", rangeID)
271-
continue
272-
}
273-
if rstate.constraints.leaseholderID != store.StoreID {
274-
// We should not panic here since the leaseQueue may have shed the
275-
// lease and informed MMA, since the last time MMA computed the
276-
// top-k ranges. This is useful for debugging in the prototype, due
277-
// to the lack of unit tests.
278-
//
279-
// TODO(tbg): can the above scenario currently happen? ComputeChanges
280-
// first processes the leaseholder message and then, still under the
281-
// lock, immediately calls into rebalanceStores (i.e. this store).
282-
// Doesn't this mean that the leaseholder view is up to date?
283-
panic(fmt.Sprintf("internal state inconsistency: "+
284-
"store=%v range_id=%v should be leaseholder but isn't",
285-
store.StoreID, rangeID))
286-
}
287-
cands, _ := rstate.constraints.candidatesToMoveLease()
288-
var candsPL storeSet
289-
for _, cand := range cands {
290-
candsPL.insert(cand.storeID)
291-
}
292-
// Always consider the local store (which already holds the lease) as a
293-
// candidate, so that we don't move the lease away if keeping it would be
294-
// the better option overall.
295-
// TODO(tbg): is this really needed? We intentionally exclude the leaseholder
296-
// in candidatesToMoveLease, so why reinsert it now?
297-
candsPL.insert(store.StoreID)
298-
if len(candsPL) <= 1 {
299-
continue // leaseholder is the only candidate
300-
}
301-
clear(rs.scratch.nodes)
302-
means := computeMeansForStoreSet(rs.cs, candsPL, rs.scratch.nodes, rs.scratch.stores)
303-
sls := rs.cs.computeLoadSummary(ctx, store.StoreID, &means.storeLoad, &means.nodeLoad)
304-
log.KvDistribution.VInfof(ctx, 2, "considering lease-transfer r%v from s%v: candidates are %v", rangeID, store.StoreID, candsPL)
305-
if sls.dimSummary[CPURate] < overloadSlow {
306-
// This store is not cpu overloaded relative to these candidates for
307-
// this range.
308-
log.KvDistribution.VInfof(ctx, 2, "result(failed): skipping r%d since store not overloaded relative to candidates", rangeID)
309-
continue
310-
}
311-
var candsSet candidateSet
312-
for _, cand := range cands {
313-
if disp := rs.cs.stores[cand.storeID].adjusted.replicas[rangeID].LeaseDisposition; disp != LeaseDispositionOK {
314-
// Don't transfer lease to a store that is lagging.
315-
log.KvDistribution.Infof(ctx, "skipping store s%d for lease transfer: lease disposition %v",
316-
cand.storeID, disp)
317-
continue
318-
}
319-
candSls := rs.cs.computeLoadSummary(ctx, cand.storeID, &means.storeLoad, &means.nodeLoad)
320-
candsSet.candidates = append(candsSet.candidates, candidateInfo{
321-
StoreID: cand.storeID,
322-
storeLoadSummary: candSls,
323-
diversityScore: 0,
324-
leasePreferenceIndex: cand.leasePreferenceIndex,
325-
})
326-
}
327-
if len(candsSet.candidates) == 0 {
328-
log.KvDistribution.Infof(
329-
ctx,
330-
"result(failed): no candidates to move lease from n%vs%v for r%v before sortTargetCandidateSetAndPick [pre_filter_candidates=%v]",
331-
ss.NodeID, ss.StoreID, rangeID, candsPL)
332-
continue
333-
}
334-
// Have candidates. We set ignoreLevel to
335-
// ignoreHigherThanLoadThreshold since this is the only allocator that
336-
// can shed leases for this store, and lease shedding is cheap, and it
337-
// will only add CPU to the target store (so it is ok to ignore other
338-
// dimensions on the target).
339-
targetStoreID := sortTargetCandidateSetAndPick(
340-
ctx, candsSet, sls.sls, ignoreHigherThanLoadThreshold, CPURate, rs.rng)
341-
if targetStoreID == 0 {
342-
log.KvDistribution.Infof(
343-
ctx,
344-
"result(failed): no candidates to move lease from n%vs%v for r%v after sortTargetCandidateSetAndPick",
345-
ss.NodeID, ss.StoreID, rangeID)
346-
continue
347-
}
348-
targetSS := rs.cs.stores[targetStoreID]
349-
var addedLoad LoadVector
350-
// Only adding leaseholder CPU.
351-
addedLoad[CPURate] = rstate.load.Load[CPURate] - rstate.load.RaftCPU
352-
if addedLoad[CPURate] < 0 {
353-
// TODO(sumeer): remove this panic once we are not in an
354-
// experimental phase.
355-
addedLoad[CPURate] = 0
356-
panic("raft cpu higher than total cpu")
357-
}
358-
if !rs.cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, &means, true, CPURate) {
359-
log.KvDistribution.VInfof(ctx, 2, "result(failed): cannot shed from s%d to s%d for r%d: delta load %v",
360-
store.StoreID, targetStoreID, rangeID, addedLoad)
361-
continue
362-
}
363-
addTarget := roachpb.ReplicationTarget{
364-
NodeID: targetSS.NodeID,
365-
StoreID: targetSS.StoreID,
366-
}
367-
removeTarget := roachpb.ReplicationTarget{
368-
NodeID: ss.NodeID,
369-
StoreID: ss.StoreID,
370-
}
371-
replicaChanges := MakeLeaseTransferChanges(
372-
rangeID, rstate.replicas, rstate.load, addTarget, removeTarget)
373-
leaseChange := MakePendingRangeChange(rangeID, replicaChanges[:])
374-
if err := rs.cs.preCheckOnApplyReplicaChanges(leaseChange.pendingReplicaChanges); err != nil {
375-
panic(errors.Wrapf(err, "pre-check failed for lease transfer %v", leaseChange))
376-
}
377-
rs.cs.addPendingRangeChange(leaseChange)
378-
rs.changes = append(rs.changes, leaseChange)
379-
rs.leaseTransferCount++
380-
localLeaseTransferCount++
381-
if rs.changes[len(rs.changes)-1].IsChangeReplicas() || !rs.changes[len(rs.changes)-1].IsTransferLease() {
382-
panic(fmt.Sprintf("lease transfer is invalid: %v", rs.changes[len(rs.changes)-1]))
383-
}
384-
log.KvDistribution.Infof(ctx,
385-
"result(success): shedding r%v lease from s%v to s%v [change:%v] with "+
386-
"resulting loads source:%v target:%v (means: %v) (frac_pending: (src:%.2f,target:%.2f) (src:%.2f,target:%.2f))",
387-
rangeID, removeTarget.StoreID, addTarget.StoreID, rs.changes[len(rs.changes)-1],
388-
ss.adjusted.load, targetSS.adjusted.load, means.storeLoad.load,
389-
ss.maxFractionPendingIncrease, ss.maxFractionPendingDecrease,
390-
targetSS.maxFractionPendingIncrease, targetSS.maxFractionPendingDecrease)
391-
if rs.leaseTransferCount >= rs.maxLeaseTransferCount {
392-
log.KvDistribution.VInfof(ctx, 2, "reached max lease transfer count %d, returning", rs.maxLeaseTransferCount)
393-
break
394-
}
395-
doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold
396-
if doneShedding {
397-
log.KvDistribution.VInfof(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after lease transfers: done shedding with %d left in topK",
398-
store.StoreID, ss.maxFractionPendingDecrease, maxFractionPendingThreshold, n-(i+1))
399-
break
400-
}
401-
}
402-
if doneShedding || localLeaseTransferCount > 0 {
403-
// If managed to transfer a lease, wait for it to be done, before
404-
// shedding replicas from this store (which is more costly). Otherwise
405-
// we may needlessly start moving replicas. Note that the store
406-
// rebalancer will call the rebalance method again after the lease
407-
// transfer is done and we may still be considering those transfers as
408-
// pending from a load perspective, so we *may* not be able to do more
409-
// lease transfers -- so be it.
410-
log.KvDistribution.VInfof(ctx, 2, "skipping replica transfers for s%d: done shedding=%v, lease_transfers=%d",
411-
store.StoreID, doneShedding, localLeaseTransferCount)
412-
return true
413-
}
414-
return false
415-
}(rs, ss, store, ctx, localStoreID, now)
230+
shouldSkipReplicaMoves := rs.rebalanceLeases(ss, store, ctx, localStoreID, now)
416231
if shouldSkipReplicaMoves {
417232
return
418233
}
@@ -623,3 +438,196 @@ func (rs *rebalanceState) rebalanceStore(
623438
return
624439
}
625440
}
441+
442+
func (rs *rebalanceState) rebalanceLeases(
443+
ss *storeState,
444+
store sheddingStore,
445+
ctx context.Context,
446+
localStoreID roachpb.StoreID,
447+
now time.Time,
448+
) bool {
449+
log.KvDistribution.VInfof(ctx, 2, "local store s%d is CPU overloaded (%v >= %v), attempting lease transfers first",
450+
store.StoreID, store.dimSummary[CPURate], overloadSlow)
451+
// This store is local, and cpu overloaded. Shed leases first.
452+
//
453+
// NB: any ranges at this store that don't have pending changes must
454+
// have this local store as the leaseholder.
455+
localLeaseTransferCount := 0
456+
topKRanges := ss.adjusted.topKRanges[localStoreID]
457+
n := topKRanges.len()
458+
doneShedding := false
459+
for i := 0; i < n; i++ {
460+
rangeID := topKRanges.index(i)
461+
rstate := rs.cs.ranges[rangeID]
462+
if len(rstate.pendingChanges) > 0 {
463+
// If the range has pending changes, don't make more changes.
464+
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: has pending changes", rangeID)
465+
continue
466+
}
467+
for _, repl := range rstate.replicas {
468+
if repl.StoreID != localStoreID { // NB: localStoreID == ss.StoreID == store.StoreID
469+
continue
470+
}
471+
if !repl.IsLeaseholder {
472+
// TODO(tbg): is this true? Can't there be ranges with replicas on
473+
// multiple local stores, and wouldn't this assertion fire in that
474+
// case once rebalanceStores is invoked on whichever of the two
475+
// stores doesn't hold the lease?
476+
//
477+
// TODO(tbg): see also the other assertion below (leaseholderID !=
478+
// store.StoreID) which seems similar to this one.
479+
log.KvDistribution.Fatalf(ctx, "internal state inconsistency: replica considered for lease shedding has no pending"+
480+
" changes but is not leaseholder: %+v", rstate)
481+
}
482+
}
483+
if now.Sub(rstate.lastFailedChange) < rs.lastFailedChangeDelayDuration {
484+
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: too soon after failed change", rangeID)
485+
continue
486+
}
487+
if !rs.cs.ensureAnalyzedConstraints(rstate) {
488+
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraints analysis failed", rangeID)
489+
continue
490+
}
491+
if rstate.constraints.leaseholderID != store.StoreID {
492+
// We should not panic here since the leaseQueue may have shed the
493+
// lease and informed MMA, since the last time MMA computed the
494+
// top-k ranges. This is useful for debugging in the prototype, due
495+
// to the lack of unit tests.
496+
//
497+
// TODO(tbg): can the above scenario currently happen? ComputeChanges
498+
// first processes the leaseholder message and then, still under the
499+
// lock, immediately calls into rebalanceStores (i.e. this store).
500+
// Doesn't this mean that the leaseholder view is up to date?
501+
panic(fmt.Sprintf("internal state inconsistency: "+
502+
"store=%v range_id=%v should be leaseholder but isn't",
503+
store.StoreID, rangeID))
504+
}
505+
cands, _ := rstate.constraints.candidatesToMoveLease()
506+
var candsPL storeSet
507+
for _, cand := range cands {
508+
candsPL.insert(cand.storeID)
509+
}
510+
// Always consider the local store (which already holds the lease) as a
511+
// candidate, so that we don't move the lease away if keeping it would be
512+
// the better option overall.
513+
// TODO(tbg): is this really needed? We intentionally exclude the leaseholder
514+
// in candidatesToMoveLease, so why reinsert it now?
515+
candsPL.insert(store.StoreID)
516+
if len(candsPL) <= 1 {
517+
continue // leaseholder is the only candidate
518+
}
519+
clear(rs.scratch.nodes)
520+
means := computeMeansForStoreSet(rs.cs, candsPL, rs.scratch.nodes, rs.scratch.stores)
521+
sls := rs.cs.computeLoadSummary(ctx, store.StoreID, &means.storeLoad, &means.nodeLoad)
522+
log.KvDistribution.VInfof(ctx, 2, "considering lease-transfer r%v from s%v: candidates are %v", rangeID, store.StoreID, candsPL)
523+
if sls.dimSummary[CPURate] < overloadSlow {
524+
// This store is not cpu overloaded relative to these candidates for
525+
// this range.
526+
log.KvDistribution.VInfof(ctx, 2, "result(failed): skipping r%d since store not overloaded relative to candidates", rangeID)
527+
continue
528+
}
529+
var candsSet candidateSet
530+
for _, cand := range cands {
531+
if disp := rs.cs.stores[cand.storeID].adjusted.replicas[rangeID].LeaseDisposition; disp != LeaseDispositionOK {
532+
// Don't transfer lease to a store that is lagging.
533+
log.KvDistribution.Infof(ctx, "skipping store s%d for lease transfer: lease disposition %v",
534+
cand.storeID, disp)
535+
continue
536+
}
537+
candSls := rs.cs.computeLoadSummary(ctx, cand.storeID, &means.storeLoad, &means.nodeLoad)
538+
candsSet.candidates = append(candsSet.candidates, candidateInfo{
539+
StoreID: cand.storeID,
540+
storeLoadSummary: candSls,
541+
diversityScore: 0,
542+
leasePreferenceIndex: cand.leasePreferenceIndex,
543+
})
544+
}
545+
if len(candsSet.candidates) == 0 {
546+
log.KvDistribution.Infof(
547+
ctx,
548+
"result(failed): no candidates to move lease from n%vs%v for r%v before sortTargetCandidateSetAndPick [pre_filter_candidates=%v]",
549+
ss.NodeID, ss.StoreID, rangeID, candsPL)
550+
continue
551+
}
552+
// Have candidates. We set ignoreLevel to
553+
// ignoreHigherThanLoadThreshold since this is the only allocator that
554+
// can shed leases for this store, and lease shedding is cheap, and it
555+
// will only add CPU to the target store (so it is ok to ignore other
556+
// dimensions on the target).
557+
targetStoreID := sortTargetCandidateSetAndPick(
558+
ctx, candsSet, sls.sls, ignoreHigherThanLoadThreshold, CPURate, rs.rng)
559+
if targetStoreID == 0 {
560+
log.KvDistribution.Infof(
561+
ctx,
562+
"result(failed): no candidates to move lease from n%vs%v for r%v after sortTargetCandidateSetAndPick",
563+
ss.NodeID, ss.StoreID, rangeID)
564+
continue
565+
}
566+
targetSS := rs.cs.stores[targetStoreID]
567+
var addedLoad LoadVector
568+
// Only adding leaseholder CPU.
569+
addedLoad[CPURate] = rstate.load.Load[CPURate] - rstate.load.RaftCPU
570+
if addedLoad[CPURate] < 0 {
571+
// TODO(sumeer): remove this panic once we are not in an
572+
// experimental phase.
573+
addedLoad[CPURate] = 0
574+
panic("raft cpu higher than total cpu")
575+
}
576+
if !rs.cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, &means, true, CPURate) {
577+
log.KvDistribution.VInfof(ctx, 2, "result(failed): cannot shed from s%d to s%d for r%d: delta load %v",
578+
store.StoreID, targetStoreID, rangeID, addedLoad)
579+
continue
580+
}
581+
addTarget := roachpb.ReplicationTarget{
582+
NodeID: targetSS.NodeID,
583+
StoreID: targetSS.StoreID,
584+
}
585+
removeTarget := roachpb.ReplicationTarget{
586+
NodeID: ss.NodeID,
587+
StoreID: ss.StoreID,
588+
}
589+
replicaChanges := MakeLeaseTransferChanges(
590+
rangeID, rstate.replicas, rstate.load, addTarget, removeTarget)
591+
leaseChange := MakePendingRangeChange(rangeID, replicaChanges[:])
592+
if err := rs.cs.preCheckOnApplyReplicaChanges(leaseChange.pendingReplicaChanges); err != nil {
593+
panic(errors.Wrapf(err, "pre-check failed for lease transfer %v", leaseChange))
594+
}
595+
rs.cs.addPendingRangeChange(leaseChange)
596+
rs.changes = append(rs.changes, leaseChange)
597+
rs.leaseTransferCount++
598+
localLeaseTransferCount++
599+
if rs.changes[len(rs.changes)-1].IsChangeReplicas() || !rs.changes[len(rs.changes)-1].IsTransferLease() {
600+
panic(fmt.Sprintf("lease transfer is invalid: %v", rs.changes[len(rs.changes)-1]))
601+
}
602+
log.KvDistribution.Infof(ctx,
603+
"result(success): shedding r%v lease from s%v to s%v [change:%v] with "+
604+
"resulting loads source:%v target:%v (means: %v) (frac_pending: (src:%.2f,target:%.2f) (src:%.2f,target:%.2f))",
605+
rangeID, removeTarget.StoreID, addTarget.StoreID, rs.changes[len(rs.changes)-1],
606+
ss.adjusted.load, targetSS.adjusted.load, means.storeLoad.load,
607+
ss.maxFractionPendingIncrease, ss.maxFractionPendingDecrease,
608+
targetSS.maxFractionPendingIncrease, targetSS.maxFractionPendingDecrease)
609+
if rs.leaseTransferCount >= rs.maxLeaseTransferCount {
610+
log.KvDistribution.VInfof(ctx, 2, "reached max lease transfer count %d, returning", rs.maxLeaseTransferCount)
611+
break
612+
}
613+
doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold
614+
if doneShedding {
615+
log.KvDistribution.VInfof(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after lease transfers: done shedding with %d left in topK",
616+
store.StoreID, ss.maxFractionPendingDecrease, maxFractionPendingThreshold, n-(i+1))
617+
break
618+
}
619+
}
620+
if doneShedding || localLeaseTransferCount > 0 {
621+
// If managed to transfer a lease, wait for it to be done, before
622+
// shedding replicas from this store (which is more costly). Otherwise
623+
// we may needlessly start moving replicas. Note that the store
624+
// rebalancer will call the rebalance method again after the lease
625+
// transfer is done and we may still be considering those transfers as
626+
// pending from a load perspective, so we *may* not be able to do more
627+
// lease transfers -- so be it.
628+
log.KvDistribution.VInfof(ctx, 2, "skipping replica transfers for s%d: done shedding=%v, lease_transfers=%d",
629+
store.StoreID, doneShedding, localLeaseTransferCount)
630+
return true
631+
}
632+
return false
633+
}

0 commit comments

Comments
 (0)