Skip to content

Commit 01d86f3

Browse files
committed
mmaprototype: indent lease rebalancing block with explicit braces
Wrap the lease rebalancing logic in an explicit block scope to prepare for extraction into a separate method. This is a pure whitespace change with no logic modifications. This change makes the code that will become `considerRebalanceLeases` clearly delineated, setting up for the subsequent refactoring steps.
1 parent d2bb68c commit 01d86f3

File tree

1 file changed

+172
-170
lines changed

1 file changed

+172
-170
lines changed

pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go

Lines changed: 172 additions & 170 deletions
Original file line numberDiff line numberDiff line change
@@ -227,185 +227,187 @@ func (rs *rebalanceState) rebalanceStore(
227227
// behalf of a particular store (vs. being called on behalf of the set
228228
// of local store IDs)?
229229
if ss.StoreID == localStoreID && store.dimSummary[CPURate] >= overloadSlow {
230-
log.KvDistribution.VInfof(ctx, 2, "local store s%d is CPU overloaded (%v >= %v), attempting lease transfers first",
231-
store.StoreID, store.dimSummary[CPURate], overloadSlow)
232-
// This store is local, and cpu overloaded. Shed leases first.
233-
//
234-
// NB: any ranges at this store that don't have pending changes must
235-
// have this local store as the leaseholder.
236-
topKRanges := ss.adjusted.topKRanges[localStoreID]
237-
n := topKRanges.len()
238-
for i := 0; i < n; i++ {
239-
rangeID := topKRanges.index(i)
240-
rstate := rs.cs.ranges[rangeID]
241-
if len(rstate.pendingChanges) > 0 {
242-
// If the range has pending changes, don't make more changes.
243-
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: has pending changes", rangeID)
244-
continue
245-
}
246-
for _, repl := range rstate.replicas {
247-
if repl.StoreID != localStoreID { // NB: localStoreID == ss.StoreID == store.StoreID
230+
{
231+
log.KvDistribution.VInfof(ctx, 2, "local store s%d is CPU overloaded (%v >= %v), attempting lease transfers first",
232+
store.StoreID, store.dimSummary[CPURate], overloadSlow)
233+
// This store is local, and cpu overloaded. Shed leases first.
234+
//
235+
// NB: any ranges at this store that don't have pending changes must
236+
// have this local store as the leaseholder.
237+
topKRanges := ss.adjusted.topKRanges[localStoreID]
238+
n := topKRanges.len()
239+
for i := 0; i < n; i++ {
240+
rangeID := topKRanges.index(i)
241+
rstate := rs.cs.ranges[rangeID]
242+
if len(rstate.pendingChanges) > 0 {
243+
// If the range has pending changes, don't make more changes.
244+
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: has pending changes", rangeID)
248245
continue
249246
}
250-
if !repl.IsLeaseholder {
251-
// TODO(tbg): is this true? Can't there be ranges with replicas on
252-
// multiple local stores, and wouldn't this assertion fire in that
253-
// case once rebalanceStores is invoked on whichever of the two
254-
// stores doesn't hold the lease?
247+
for _, repl := range rstate.replicas {
248+
if repl.StoreID != localStoreID { // NB: localStoreID == ss.StoreID == store.StoreID
249+
continue
250+
}
251+
if !repl.IsLeaseholder {
252+
// TODO(tbg): is this true? Can't there be ranges with replicas on
253+
// multiple local stores, and wouldn't this assertion fire in that
254+
// case once rebalanceStores is invoked on whichever of the two
255+
// stores doesn't hold the lease?
256+
//
257+
// TODO(tbg): see also the other assertion below (leaseholderID !=
258+
// store.StoreID) which seems similar to this one.
259+
log.KvDistribution.Fatalf(ctx, "internal state inconsistency: replica considered for lease shedding has no pending"+
260+
" changes but is not leaseholder: %+v", rstate)
261+
}
262+
}
263+
if now.Sub(rstate.lastFailedChange) < rs.lastFailedChangeDelayDuration {
264+
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: too soon after failed change", rangeID)
265+
continue
266+
}
267+
if !rs.cs.ensureAnalyzedConstraints(rstate) {
268+
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraints analysis failed", rangeID)
269+
continue
270+
}
271+
if rstate.constraints.leaseholderID != store.StoreID {
272+
// We should not panic here since the leaseQueue may have shed the
273+
// lease and informed MMA, since the last time MMA computed the
274+
// top-k ranges. This is useful for debugging in the prototype, due
275+
// to the lack of unit tests.
255276
//
256-
// TODO(tbg): see also the other assertion below (leaseholderID !=
257-
// store.StoreID) which seems similar to this one.
258-
log.KvDistribution.Fatalf(ctx, "internal state inconsistency: replica considered for lease shedding has no pending"+
259-
" changes but is not leaseholder: %+v", rstate)
277+
// TODO(tbg): can the above scenario currently happen? ComputeChanges
278+
// first processes the leaseholder message and then, still under the
279+
// lock, immediately calls into rebalanceStores (i.e. this store).
280+
// Doesn't this mean that the leaseholder view is up to date?
281+
panic(fmt.Sprintf("internal state inconsistency: "+
282+
"store=%v range_id=%v should be leaseholder but isn't",
283+
store.StoreID, rangeID))
260284
}
261-
}
262-
if now.Sub(rstate.lastFailedChange) < rs.lastFailedChangeDelayDuration {
263-
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: too soon after failed change", rangeID)
264-
continue
265-
}
266-
if !rs.cs.ensureAnalyzedConstraints(rstate) {
267-
log.KvDistribution.VInfof(ctx, 2, "skipping r%d: constraints analysis failed", rangeID)
268-
continue
269-
}
270-
if rstate.constraints.leaseholderID != store.StoreID {
271-
// We should not panic here since the leaseQueue may have shed the
272-
// lease and informed MMA, since the last time MMA computed the
273-
// top-k ranges. This is useful for debugging in the prototype, due
274-
// to the lack of unit tests.
275-
//
276-
// TODO(tbg): can the above scenario currently happen? ComputeChanges
277-
// first processes the leaseholder message and then, still under the
278-
// lock, immediately calls into rebalanceStores (i.e. this store).
279-
// Doesn't this mean that the leaseholder view is up to date?
280-
panic(fmt.Sprintf("internal state inconsistency: "+
281-
"store=%v range_id=%v should be leaseholder but isn't",
282-
store.StoreID, rangeID))
283-
}
284-
cands, _ := rstate.constraints.candidatesToMoveLease()
285-
var candsPL storeSet
286-
for _, cand := range cands {
287-
candsPL.insert(cand.storeID)
288-
}
289-
// Always consider the local store (which already holds the lease) as a
290-
// candidate, so that we don't move the lease away if keeping it would be
291-
// the better option overall.
292-
// TODO(tbg): is this really needed? We intentionally exclude the leaseholder
293-
// in candidatesToMoveLease, so why reinsert it now?
294-
candsPL.insert(store.StoreID)
295-
if len(candsPL) <= 1 {
296-
continue // leaseholder is the only candidate
297-
}
298-
clear(rs.scratch.nodes)
299-
means := computeMeansForStoreSet(rs.cs, candsPL, rs.scratch.nodes, rs.scratch.stores)
300-
sls := rs.cs.computeLoadSummary(ctx, store.StoreID, &means.storeLoad, &means.nodeLoad)
301-
log.KvDistribution.VInfof(ctx, 2, "considering lease-transfer r%v from s%v: candidates are %v", rangeID, store.StoreID, candsPL)
302-
if sls.dimSummary[CPURate] < overloadSlow {
303-
// This store is not cpu overloaded relative to these candidates for
304-
// this range.
305-
log.KvDistribution.VInfof(ctx, 2, "result(failed): skipping r%d since store not overloaded relative to candidates", rangeID)
306-
continue
307-
}
308-
var candsSet candidateSet
309-
for _, cand := range cands {
310-
if disp := rs.cs.stores[cand.storeID].adjusted.replicas[rangeID].LeaseDisposition; disp != LeaseDispositionOK {
311-
// Don't transfer lease to a store that is lagging.
312-
log.KvDistribution.Infof(ctx, "skipping store s%d for lease transfer: lease disposition %v",
313-
cand.storeID, disp)
285+
cands, _ := rstate.constraints.candidatesToMoveLease()
286+
var candsPL storeSet
287+
for _, cand := range cands {
288+
candsPL.insert(cand.storeID)
289+
}
290+
// Always consider the local store (which already holds the lease) as a
291+
// candidate, so that we don't move the lease away if keeping it would be
292+
// the better option overall.
293+
// TODO(tbg): is this really needed? We intentionally exclude the leaseholder
294+
// in candidatesToMoveLease, so why reinsert it now?
295+
candsPL.insert(store.StoreID)
296+
if len(candsPL) <= 1 {
297+
continue // leaseholder is the only candidate
298+
}
299+
clear(rs.scratch.nodes)
300+
means := computeMeansForStoreSet(rs.cs, candsPL, rs.scratch.nodes, rs.scratch.stores)
301+
sls := rs.cs.computeLoadSummary(ctx, store.StoreID, &means.storeLoad, &means.nodeLoad)
302+
log.KvDistribution.VInfof(ctx, 2, "considering lease-transfer r%v from s%v: candidates are %v", rangeID, store.StoreID, candsPL)
303+
if sls.dimSummary[CPURate] < overloadSlow {
304+
// This store is not cpu overloaded relative to these candidates for
305+
// this range.
306+
log.KvDistribution.VInfof(ctx, 2, "result(failed): skipping r%d since store not overloaded relative to candidates", rangeID)
314307
continue
315308
}
316-
candSls := rs.cs.computeLoadSummary(ctx, cand.storeID, &means.storeLoad, &means.nodeLoad)
317-
candsSet.candidates = append(candsSet.candidates, candidateInfo{
318-
StoreID: cand.storeID,
319-
storeLoadSummary: candSls,
320-
diversityScore: 0,
321-
leasePreferenceIndex: cand.leasePreferenceIndex,
322-
})
323-
}
324-
if len(candsSet.candidates) == 0 {
325-
log.KvDistribution.Infof(
326-
ctx,
327-
"result(failed): no candidates to move lease from n%vs%v for r%v before sortTargetCandidateSetAndPick [pre_filter_candidates=%v]",
328-
ss.NodeID, ss.StoreID, rangeID, candsPL)
329-
continue
330-
}
331-
// Have candidates. We set ignoreLevel to
332-
// ignoreHigherThanLoadThreshold since this is the only allocator that
333-
// can shed leases for this store, and lease shedding is cheap, and it
334-
// will only add CPU to the target store (so it is ok to ignore other
335-
// dimensions on the target).
336-
targetStoreID := sortTargetCandidateSetAndPick(
337-
ctx, candsSet, sls.sls, ignoreHigherThanLoadThreshold, CPURate, rs.rng)
338-
if targetStoreID == 0 {
339-
log.KvDistribution.Infof(
340-
ctx,
341-
"result(failed): no candidates to move lease from n%vs%v for r%v after sortTargetCandidateSetAndPick",
342-
ss.NodeID, ss.StoreID, rangeID)
343-
continue
344-
}
345-
targetSS := rs.cs.stores[targetStoreID]
346-
var addedLoad LoadVector
347-
// Only adding leaseholder CPU.
348-
addedLoad[CPURate] = rstate.load.Load[CPURate] - rstate.load.RaftCPU
349-
if addedLoad[CPURate] < 0 {
350-
// TODO(sumeer): remove this panic once we are not in an
351-
// experimental phase.
352-
addedLoad[CPURate] = 0
353-
panic("raft cpu higher than total cpu")
354-
}
355-
if !rs.cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, &means, true, CPURate) {
356-
log.KvDistribution.VInfof(ctx, 2, "result(failed): cannot shed from s%d to s%d for r%d: delta load %v",
357-
store.StoreID, targetStoreID, rangeID, addedLoad)
358-
continue
359-
}
360-
addTarget := roachpb.ReplicationTarget{
361-
NodeID: targetSS.NodeID,
362-
StoreID: targetSS.StoreID,
363-
}
364-
removeTarget := roachpb.ReplicationTarget{
365-
NodeID: ss.NodeID,
366-
StoreID: ss.StoreID,
367-
}
368-
replicaChanges := MakeLeaseTransferChanges(
369-
rangeID, rstate.replicas, rstate.load, addTarget, removeTarget)
370-
leaseChange := MakePendingRangeChange(rangeID, replicaChanges[:])
371-
if err := rs.cs.preCheckOnApplyReplicaChanges(leaseChange.pendingReplicaChanges); err != nil {
372-
panic(errors.Wrapf(err, "pre-check failed for lease transfer %v", leaseChange))
373-
}
374-
rs.cs.addPendingRangeChange(leaseChange)
375-
rs.changes = append(rs.changes, leaseChange)
376-
rs.leaseTransferCount++
377-
if rs.changes[len(rs.changes)-1].IsChangeReplicas() || !rs.changes[len(rs.changes)-1].IsTransferLease() {
378-
panic(fmt.Sprintf("lease transfer is invalid: %v", rs.changes[len(rs.changes)-1]))
309+
var candsSet candidateSet
310+
for _, cand := range cands {
311+
if disp := rs.cs.stores[cand.storeID].adjusted.replicas[rangeID].LeaseDisposition; disp != LeaseDispositionOK {
312+
// Don't transfer lease to a store that is lagging.
313+
log.KvDistribution.Infof(ctx, "skipping store s%d for lease transfer: lease disposition %v",
314+
cand.storeID, disp)
315+
continue
316+
}
317+
candSls := rs.cs.computeLoadSummary(ctx, cand.storeID, &means.storeLoad, &means.nodeLoad)
318+
candsSet.candidates = append(candsSet.candidates, candidateInfo{
319+
StoreID: cand.storeID,
320+
storeLoadSummary: candSls,
321+
diversityScore: 0,
322+
leasePreferenceIndex: cand.leasePreferenceIndex,
323+
})
324+
}
325+
if len(candsSet.candidates) == 0 {
326+
log.KvDistribution.Infof(
327+
ctx,
328+
"result(failed): no candidates to move lease from n%vs%v for r%v before sortTargetCandidateSetAndPick [pre_filter_candidates=%v]",
329+
ss.NodeID, ss.StoreID, rangeID, candsPL)
330+
continue
331+
}
332+
// Have candidates. We set ignoreLevel to
333+
// ignoreHigherThanLoadThreshold since this is the only allocator that
334+
// can shed leases for this store, and lease shedding is cheap, and it
335+
// will only add CPU to the target store (so it is ok to ignore other
336+
// dimensions on the target).
337+
targetStoreID := sortTargetCandidateSetAndPick(
338+
ctx, candsSet, sls.sls, ignoreHigherThanLoadThreshold, CPURate, rs.rng)
339+
if targetStoreID == 0 {
340+
log.KvDistribution.Infof(
341+
ctx,
342+
"result(failed): no candidates to move lease from n%vs%v for r%v after sortTargetCandidateSetAndPick",
343+
ss.NodeID, ss.StoreID, rangeID)
344+
continue
345+
}
346+
targetSS := rs.cs.stores[targetStoreID]
347+
var addedLoad LoadVector
348+
// Only adding leaseholder CPU.
349+
addedLoad[CPURate] = rstate.load.Load[CPURate] - rstate.load.RaftCPU
350+
if addedLoad[CPURate] < 0 {
351+
// TODO(sumeer): remove this panic once we are not in an
352+
// experimental phase.
353+
addedLoad[CPURate] = 0
354+
panic("raft cpu higher than total cpu")
355+
}
356+
if !rs.cs.canShedAndAddLoad(ctx, ss, targetSS, addedLoad, &means, true, CPURate) {
357+
log.KvDistribution.VInfof(ctx, 2, "result(failed): cannot shed from s%d to s%d for r%d: delta load %v",
358+
store.StoreID, targetStoreID, rangeID, addedLoad)
359+
continue
360+
}
361+
addTarget := roachpb.ReplicationTarget{
362+
NodeID: targetSS.NodeID,
363+
StoreID: targetSS.StoreID,
364+
}
365+
removeTarget := roachpb.ReplicationTarget{
366+
NodeID: ss.NodeID,
367+
StoreID: ss.StoreID,
368+
}
369+
replicaChanges := MakeLeaseTransferChanges(
370+
rangeID, rstate.replicas, rstate.load, addTarget, removeTarget)
371+
leaseChange := MakePendingRangeChange(rangeID, replicaChanges[:])
372+
if err := rs.cs.preCheckOnApplyReplicaChanges(leaseChange.pendingReplicaChanges); err != nil {
373+
panic(errors.Wrapf(err, "pre-check failed for lease transfer %v", leaseChange))
374+
}
375+
rs.cs.addPendingRangeChange(leaseChange)
376+
rs.changes = append(rs.changes, leaseChange)
377+
rs.leaseTransferCount++
378+
if rs.changes[len(rs.changes)-1].IsChangeReplicas() || !rs.changes[len(rs.changes)-1].IsTransferLease() {
379+
panic(fmt.Sprintf("lease transfer is invalid: %v", rs.changes[len(rs.changes)-1]))
380+
}
381+
log.KvDistribution.Infof(ctx,
382+
"result(success): shedding r%v lease from s%v to s%v [change:%v] with "+
383+
"resulting loads source:%v target:%v (means: %v) (frac_pending: (src:%.2f,target:%.2f) (src:%.2f,target:%.2f))",
384+
rangeID, removeTarget.StoreID, addTarget.StoreID, rs.changes[len(rs.changes)-1],
385+
ss.adjusted.load, targetSS.adjusted.load, means.storeLoad.load,
386+
ss.maxFractionPendingIncrease, ss.maxFractionPendingDecrease,
387+
targetSS.maxFractionPendingIncrease, targetSS.maxFractionPendingDecrease)
388+
if rs.leaseTransferCount >= rs.maxLeaseTransferCount {
389+
log.KvDistribution.VInfof(ctx, 2, "reached max lease transfer count %d, returning", rs.maxLeaseTransferCount)
390+
return
391+
}
392+
doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold
393+
if doneShedding {
394+
log.KvDistribution.VInfof(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after lease transfers: done shedding with %d left in topK",
395+
store.StoreID, ss.maxFractionPendingDecrease, maxFractionPendingThreshold, n-(i+1))
396+
break
397+
}
379398
}
380-
log.KvDistribution.Infof(ctx,
381-
"result(success): shedding r%v lease from s%v to s%v [change:%v] with "+
382-
"resulting loads source:%v target:%v (means: %v) (frac_pending: (src:%.2f,target:%.2f) (src:%.2f,target:%.2f))",
383-
rangeID, removeTarget.StoreID, addTarget.StoreID, rs.changes[len(rs.changes)-1],
384-
ss.adjusted.load, targetSS.adjusted.load, means.storeLoad.load,
385-
ss.maxFractionPendingIncrease, ss.maxFractionPendingDecrease,
386-
targetSS.maxFractionPendingIncrease, targetSS.maxFractionPendingDecrease)
387-
if rs.leaseTransferCount >= rs.maxLeaseTransferCount {
388-
log.KvDistribution.VInfof(ctx, 2, "reached max lease transfer count %d, returning", rs.maxLeaseTransferCount)
399+
if doneShedding || rs.leaseTransferCount > 0 {
400+
// If managed to transfer a lease, wait for it to be done, before
401+
// shedding replicas from this store (which is more costly). Otherwise
402+
// we may needlessly start moving replicas. Note that the store
403+
// rebalancer will call the rebalance method again after the lease
404+
// transfer is done and we may still be considering those transfers as
405+
// pending from a load perspective, so we *may* not be able to do more
406+
// lease transfers -- so be it.
407+
log.KvDistribution.VInfof(ctx, 2, "skipping replica transfers for s%d: done shedding=%v, lease_transfers=%d",
408+
store.StoreID, doneShedding, rs.leaseTransferCount)
389409
return
390410
}
391-
doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold
392-
if doneShedding {
393-
log.KvDistribution.VInfof(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after lease transfers: done shedding with %d left in topK",
394-
store.StoreID, ss.maxFractionPendingDecrease, maxFractionPendingThreshold, n-(i+1))
395-
break
396-
}
397-
}
398-
if doneShedding || rs.leaseTransferCount > 0 {
399-
// If managed to transfer a lease, wait for it to be done, before
400-
// shedding replicas from this store (which is more costly). Otherwise
401-
// we may needlessly start moving replicas. Note that the store
402-
// rebalancer will call the rebalance method again after the lease
403-
// transfer is done and we may still be considering those transfers as
404-
// pending from a load perspective, so we *may* not be able to do more
405-
// lease transfers -- so be it.
406-
log.KvDistribution.VInfof(ctx, 2, "skipping replica transfers for s%d: done shedding=%v, lease_transfers=%d",
407-
store.StoreID, doneShedding, rs.leaseTransferCount)
408-
return
409411
}
410412
} else {
411413
log.KvDistribution.VInfof(ctx, 2, "skipping lease shedding: s%v != local store s%s or cpu is not overloaded: %v",

0 commit comments

Comments
 (0)