Skip to content

Commit da91095

Browse files
committed
mmaprototype: clarify maxRangeMoveCount and maxLeaseTransferCount
Epic: CRDB-55052
1 parent daec0c8 commit da91095

11 files changed

+500
-150
lines changed

pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2473,6 +2473,9 @@ func (cs *clusterState) canShedAndAddLoad(
24732473
// that case.
24742474
}
24752475
canAddLoad = overloadedDimPermitsChange && !otherDimensionsBecameWorseInTarget &&
2476+
// NB: the target here is quite loaded, so we are stricter than in other
2477+
// places and require that there are *no* pending changes (rather than
2478+
// a threshold fraction).
24762479
targetSLS.maxFractionPendingIncrease < epsilon &&
24772480
targetSLS.maxFractionPendingDecrease < epsilon &&
24782481
// NB: targetSLS.nls <= targetSLS.sls is not a typo, in that we are

pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go

Lines changed: 85 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ type rebalanceEnv struct {
3838
changes []PendingRangeChange
3939
// rangeMoveCount tracks the number of range moves made.
4040
rangeMoveCount int
41-
// leaseTransferCount tracks the number of lease transfers made.
42-
leaseTransferCount int
4341
// maxRangeMoveCount is the maximum number of range moves allowed in the
4442
// context of the current rebalanceStores invocation (across multiple
4543
// shedding stores).
@@ -48,7 +46,8 @@ type rebalanceEnv struct {
4846
// the context of the current rebalanceStores invocation. Note that because
4947
// leases can only be shed from the particular local store on whose behalf
5048
// rebalanceStores was called, this limit applies to this particular one
51-
// store.
49+
// store, and thus needs to be checked against only in that single
50+
// rebalanceLeasesFromLocalStoreID invocation.
5251
maxLeaseTransferCount int
5352
// If a store's maxFractionPendingDecrease is at least this threshold, no
5453
// further changes should be made at this time. This is because the inflight
@@ -229,7 +228,13 @@ func (re *rebalanceEnv) rebalanceStores(
229228
}
230229

231230
for _, store := range sheddingStores {
232-
if re.rangeMoveCount >= re.maxRangeMoveCount || re.leaseTransferCount >= re.maxLeaseTransferCount {
231+
// NB: we don't have to check the maxLeaseTransferCount here since only one
232+
// store can transfer leases - the local store. So the limit is only checked
233+
// inside of the corresponding rebalanceLeasesFromLocalStoreID call, but
234+
// not in this outer loop.
235+
if re.rangeMoveCount >= re.maxRangeMoveCount {
236+
log.KvDistribution.VEventf(ctx, 2, "reached max range move count %d, stopping further rebalancing",
237+
re.maxRangeMoveCount)
233238
break
234239
}
235240
re.rebalanceStore(ctx, store, localStoreID)
@@ -257,7 +262,7 @@ func (re *rebalanceEnv) rebalanceStore(
257262
if !ss.adjusted.replicas[rangeID].IsLeaseholder {
258263
load[CPURate] = rstate.load.RaftCPU
259264
}
260-
fmt.Fprintf(&b, " r%d:%v", rangeID, load)
265+
_, _ = fmt.Fprintf(&b, " r%d:%v", rangeID, load)
261266
}
262267
log.KvDistribution.Infof(ctx, "top-K[%s] ranges for s%d with lease on local s%d:%s",
263268
topKRanges.dim, store.StoreID, localStoreID, b.String())
@@ -267,19 +272,40 @@ func (re *rebalanceEnv) rebalanceStore(
267272
}
268273
}
269274

270-
// TODO(tbg): it's somewhat akward that we only enter this branch for
271-
// ss.StoreID == localStoreID and not for *any* calling local store.
272-
// More generally, does it make sense that rebalanceStores is called on
273-
// behalf of a particular store (vs. being called on behalf of the set
274-
// of local store IDs)?
275-
if ss.StoreID == localStoreID && store.dimSummary[CPURate] >= overloadSlow {
276-
shouldSkipReplicaMoves := re.rebalanceLeases(ctx, ss, store, localStoreID)
277-
if shouldSkipReplicaMoves {
278-
return
275+
if ss.StoreID == localStoreID {
276+
if store.dimSummary[CPURate] >= overloadSlow {
277+
// The store which called into rebalanceStore is overloaded. Try to shed
278+
// load from it via lease transfers first. Note that if we have multiple
279+
// stores, this rebalanceStore invocation is on behalf of exactly one of
280+
// them, and that's the one we're going to shed from here - other stores
281+
// will do it when they call into rebalanceStore.
282+
if numTransferred := re.rebalanceLeasesFromLocalStoreID(ctx, ss, store, localStoreID); numTransferred > 0 {
283+
// If any leases were transferred, wait for these changes to be done
284+
// before shedding replicas from this store (which is more costly).
285+
// Otherwise, we may needlessly start moving replicas when we could
286+
// instead have moved more leases in the next invocation. Note that the
287+
// store rebalancer will call the rebalance method again after the lease
288+
// transfer is done, and we may still be considering those transfers as
289+
// pending from a load perspective, so we *may* not be able to do more
290+
// lease transfers -- so be it.
291+
//
292+
// TODO(tbg): rather than skipping replica transfers when there were ANY
293+
// lease transfers, we could instead skip them only if we hit a limit in
294+
// transferring leases. If we didn't hit a limit, this indicates that we
295+
// did consider all of the possible replicas to transfer a lease for,
296+
// and came up short - it then makes sense to consider replica transfers.
297+
// The current heuristic instead bails early, and an immediate call to
298+
// rebalanceStores would likely be made, so that the results could
299+
// ultimately be the same (mod potentially some logging noise as we
300+
// iterate through rebalanceStores more frequently).
301+
log.KvDistribution.VEventf(ctx, 2, "skipping replica transfers for s%d to try more leases next time",
302+
ss.StoreID)
303+
return
304+
}
305+
} else {
306+
log.KvDistribution.VEventf(ctx, 2, "skipping lease shedding for calling store s%s: not cpu overloaded: %v",
307+
localStoreID, store.dimSummary[CPURate])
279308
}
280-
} else {
281-
log.KvDistribution.VEventf(ctx, 2, "skipping lease shedding: s%v != local store s%s or cpu is not overloaded: %v",
282-
ss.StoreID, localStoreID, store.dimSummary[CPURate])
283309
}
284310

285311
log.KvDistribution.VEventf(ctx, 2, "attempting to shed replicas next")
@@ -289,7 +315,6 @@ func (re *rebalanceEnv) rebalanceStore(
289315
func (re *rebalanceEnv) rebalanceReplicas(
290316
ctx context.Context, store sheddingStore, ss *storeState, localStoreID roachpb.StoreID,
291317
) {
292-
doneShedding := false
293318
if store.StoreID != localStoreID && store.dimSummary[CPURate] >= overloadSlow &&
294319
re.now.Sub(ss.overloadStartTime) < remoteStoreLeaseSheddingGraceDuration {
295320
log.KvDistribution.VEventf(ctx, 2, "skipping remote store s%d: in lease shedding grace period", store.StoreID)
@@ -316,6 +341,23 @@ func (re *rebalanceEnv) rebalanceReplicas(
316341
n := topKRanges.len()
317342
loadDim := topKRanges.dim
318343
for i := 0; i < n; i++ {
344+
if re.rangeMoveCount >= re.maxRangeMoveCount {
345+
log.KvDistribution.VEventf(ctx, 2,
346+
"reached max range move count %d; done shedding", re.maxRangeMoveCount)
347+
return
348+
}
349+
if ss.maxFractionPendingDecrease >= re.fractionPendingIncreaseOrDecreaseThreshold {
350+
log.KvDistribution.VEventf(ctx, 2,
351+
"s%d has reached pending decrease threshold(%.2f>=%.2f) after rebalancing; done shedding",
352+
store.StoreID, ss.maxFractionPendingDecrease, re.fractionPendingIncreaseOrDecreaseThreshold)
353+
// TODO(sumeer): For regular rebalancing, we will wait until those top-K
354+
// move and then continue with the rest. There is a risk that the top-K
355+
// have some constraint that prevents rebalancing, while the rest can be
356+
// moved. Running with underprovisioned clusters and expecting load-based
357+
// rebalancing to work well is not in scope.
358+
return
359+
}
360+
319361
rangeID := topKRanges.index(i)
320362
// TODO(sumeer): the following code belongs in a closure, since we will
321363
// repeat it for some random selection of non topKRanges.
@@ -464,42 +506,41 @@ func (re *rebalanceEnv) rebalanceReplicas(
464506
log.KvDistribution.VEventf(ctx, 2,
465507
"result(success): rebalancing r%v from s%v to s%v [change: %v] with resulting loads source: %v target: %v",
466508
rangeID, removeTarget.StoreID, addTarget.StoreID, re.changes[len(re.changes)-1], ss.adjusted.load, targetSS.adjusted.load)
467-
if re.rangeMoveCount >= re.maxRangeMoveCount {
468-
log.KvDistribution.VEventf(ctx, 2, "s%d has reached max range move count %d: mma returning", store.StoreID, re.maxRangeMoveCount)
469-
return
470-
}
471-
doneShedding = ss.maxFractionPendingDecrease >= re.fractionPendingIncreaseOrDecreaseThreshold
472-
if doneShedding {
473-
log.KvDistribution.VEventf(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after rebalancing: done shedding with %d left in topk",
474-
store.StoreID, ss.maxFractionPendingDecrease, re.fractionPendingIncreaseOrDecreaseThreshold, n-(i+1))
475-
break
476-
}
477-
}
478-
// TODO(sumeer): For regular rebalancing, we will wait until those top-K
479-
// move and then continue with the rest. There is a risk that the top-K
480-
// have some constraint that prevents rebalancing, while the rest can be
481-
// moved. Running with underprovisioned clusters and expecting load-based
482-
// rebalancing to work well is not in scope.
483-
if doneShedding {
484-
log.KvDistribution.VEventf(ctx, 2, "store s%d is done shedding, moving to next store", store.StoreID)
485-
return
486509
}
487510
}
488511

489-
func (re *rebalanceEnv) rebalanceLeases(
512+
// rebalanceLeasesFromLocalStoreID attempts to move leases away from the
513+
// provided store, which must be the local store which called into
514+
// rebalanceStores, and must be overloaded on CPU.
515+
//
516+
// Transfers are attempted until we run out of leases to try, hit the max lease
517+
// transfer count limit, or the maximum fractional pending decrease threshold.
518+
//
519+
// Returns the number of lease transfers made.
520+
func (re *rebalanceEnv) rebalanceLeasesFromLocalStoreID(
490521
ctx context.Context, ss *storeState, store sheddingStore, localStoreID roachpb.StoreID,
491-
) bool {
522+
) int /* leaseTransferCount */ {
492523
log.KvDistribution.VEventf(ctx, 2, "local store s%d is CPU overloaded (%v >= %v), attempting lease transfers first",
493524
store.StoreID, store.dimSummary[CPURate], overloadSlow)
494525
// This store is local, and cpu overloaded. Shed leases first.
495526
//
496527
// NB: any ranges at this store that don't have pending changes must
497528
// have this local store as the leaseholder.
498-
localLeaseTransferCount := 0
499529
topKRanges := ss.adjusted.topKRanges[localStoreID]
530+
var leaseTransferCount int
500531
n := topKRanges.len()
501-
doneShedding := false
502532
for i := 0; i < n; i++ {
533+
if leaseTransferCount >= re.maxLeaseTransferCount {
534+
log.KvDistribution.VEventf(ctx, 2, "reached max lease transfer count %d, returning", re.maxLeaseTransferCount)
535+
return leaseTransferCount
536+
}
537+
if ss.maxFractionPendingDecrease >= re.fractionPendingIncreaseOrDecreaseThreshold {
538+
log.KvDistribution.VEventf(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%."+
539+
"2f) after %d lease transfers",
540+
store.StoreID, ss.maxFractionPendingDecrease, re.fractionPendingIncreaseOrDecreaseThreshold, leaseTransferCount)
541+
return leaseTransferCount
542+
}
543+
503544
rangeID := topKRanges.index(i)
504545
rstate := re.ranges[rangeID]
505546
if len(rstate.pendingChanges) > 0 {
@@ -634,8 +675,6 @@ func (re *rebalanceEnv) rebalanceLeases(
634675
}
635676
re.addPendingRangeChange(leaseChange)
636677
re.changes = append(re.changes, leaseChange)
637-
re.leaseTransferCount++
638-
localLeaseTransferCount++
639678
if re.changes[len(re.changes)-1].IsChangeReplicas() || !re.changes[len(re.changes)-1].IsTransferLease() {
640679
panic(fmt.Sprintf("lease transfer is invalid: %v", re.changes[len(re.changes)-1]))
641680
}
@@ -646,28 +685,8 @@ func (re *rebalanceEnv) rebalanceLeases(
646685
ss.adjusted.load, targetSS.adjusted.load, means.storeLoad.load,
647686
ss.maxFractionPendingIncrease, ss.maxFractionPendingDecrease,
648687
targetSS.maxFractionPendingIncrease, targetSS.maxFractionPendingDecrease)
649-
if re.leaseTransferCount >= re.maxLeaseTransferCount {
650-
log.KvDistribution.VEventf(ctx, 2, "reached max lease transfer count %d, returning", re.maxLeaseTransferCount)
651-
break
652-
}
653-
doneShedding = ss.maxFractionPendingDecrease >= re.fractionPendingIncreaseOrDecreaseThreshold
654-
if doneShedding {
655-
log.KvDistribution.VEventf(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after lease transfers: done shedding with %d left in topK",
656-
store.StoreID, ss.maxFractionPendingDecrease, re.fractionPendingIncreaseOrDecreaseThreshold, n-(i+1))
657-
break
658-
}
659-
}
660-
if doneShedding || localLeaseTransferCount > 0 {
661-
// If managed to transfer a lease, wait for it to be done, before
662-
// shedding replicas from this store (which is more costly). Otherwise
663-
// we may needlessly start moving replicas. Note that the store
664-
// rebalancer will call the rebalance method again after the lease
665-
// transfer is done and we may still be considering those transfers as
666-
// pending from a load perspective, so we *may* not be able to do more
667-
// lease transfers -- so be it.
668-
log.KvDistribution.VEventf(ctx, 2, "skipping replica transfers for s%d: done shedding=%v, lease_transfers=%d",
669-
store.StoreID, doneShedding, localLeaseTransferCount)
670-
return true
688+
leaseTransferCount++
671689
}
672-
return false
690+
// We iterated through all top-K ranges without running into any limits.
691+
return leaseTransferCount
673692
}
Lines changed: 0 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,84 +0,0 @@
1-
# Basic test that rebalanceStores issues a lease transfer when seeing a single replica
2-
# contributing to overload on the first out of three single-store nodes.
3-
4-
set-store
5-
store-id=1 node-id=1 attrs=purple locality-tiers=region=us-west-1,zone=us-west-1a
6-
store-id=2 node-id=2 attrs=yellow locality-tiers=region=us-east-1,zone=us-east-1a
7-
store-id=3 node-id=3 attrs=green locality-tiers=region=us-central-1,zone=us-central-1a
8-
----
9-
node-id=1 locality-tiers=region=us-west-1,zone=us-west-1a,node=1
10-
store-id=1 attrs=purple locality-code=1:2:3:
11-
node-id=2 locality-tiers=region=us-east-1,zone=us-east-1a,node=2
12-
store-id=2 attrs=yellow locality-code=4:5:6:
13-
node-id=3 locality-tiers=region=us-central-1,zone=us-central-1a,node=3
14-
store-id=3 attrs=green locality-code=7:8:9:
15-
16-
store-load-msg
17-
store-id=1 node-id=1 load=[80,0,0] capacity=[100,100,100] secondary-load=0 load-time=0s
18-
----
19-
20-
store-load-msg
21-
store-id=2 node-id=2 load=[10,0,0] capacity=[100,100,100] secondary-load=0 load-time=0s
22-
----
23-
24-
store-load-msg
25-
store-id=3 node-id=3 load=[10,0,0] capacity=[100,100,100] secondary-load=0 load-time=0s
26-
----
27-
28-
get-load-info
29-
----
30-
store-id=1 node-id=1 status=ok accepting all reported=[cpu:80, write-bandwidth:0, byte-size:0] adjusted=[cpu:80, write-bandwidth:0, byte-size:0] node-reported-cpu=80 node-adjusted-cpu=80 seq=1
31-
store-id=2 node-id=2 status=ok accepting all reported=[cpu:10, write-bandwidth:0, byte-size:0] adjusted=[cpu:10, write-bandwidth:0, byte-size:0] node-reported-cpu=10 node-adjusted-cpu=10 seq=1
32-
store-id=3 node-id=3 status=ok accepting all reported=[cpu:10, write-bandwidth:0, byte-size:0] adjusted=[cpu:10, write-bandwidth:0, byte-size:0] node-reported-cpu=10 node-adjusted-cpu=10 seq=1
33-
34-
store-leaseholder-msg
35-
store-id=1
36-
range-id=1 load=[60,0,0] raft-cpu=20 config=(num_replicas=3 constraints={} voter_constraints={})
37-
store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true
38-
store-id=2 replica-id=2 type=VOTER_FULL
39-
store-id=3 replica-id=3 type=VOTER_FULL
40-
----
41-
42-
ranges
43-
----
44-
range-id=1 local-store=1 load=[cpu:60, write-bandwidth:0, byte-size:0] raft-cpu=20
45-
store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true
46-
store-id=2 replica-id=2 type=VOTER_FULL
47-
store-id=3 replica-id=3 type=VOTER_FULL
48-
49-
get-load-info
50-
----
51-
store-id=1 node-id=1 status=ok accepting all reported=[cpu:80, write-bandwidth:0, byte-size:0] adjusted=[cpu:80, write-bandwidth:0, byte-size:0] node-reported-cpu=80 node-adjusted-cpu=80 seq=1
52-
top-k-ranges (local-store-id=1) dim=CPURate: r1
53-
store-id=2 node-id=2 status=ok accepting all reported=[cpu:10, write-bandwidth:0, byte-size:0] adjusted=[cpu:10, write-bandwidth:0, byte-size:0] node-reported-cpu=10 node-adjusted-cpu=10 seq=1
54-
top-k-ranges (local-store-id=1) dim=WriteBandwidth: r1
55-
store-id=3 node-id=3 status=ok accepting all reported=[cpu:10, write-bandwidth:0, byte-size:0] adjusted=[cpu:10, write-bandwidth:0, byte-size:0] node-reported-cpu=10 node-adjusted-cpu=10 seq=1
56-
top-k-ranges (local-store-id=1) dim=WriteBandwidth: r1
57-
58-
# s1 is overloaded and local, so rebalanceStores should try and succeed to shed
59-
# a lease from it. The gc duration for lease shedding is 1min.
60-
rebalance-stores store-id=1
61-
----
62-
[mmaid=1] rebalanceStores begins
63-
[mmaid=1] cluster means: (stores-load [cpu:33, write-bandwidth:0, byte-size:0]) (stores-capacity [cpu:100, write-bandwidth:100, byte-size:100]) (nodes-cpu-load 33) (nodes-cpu-capacity 100)
64-
[mmaid=1] evaluating s1: node load overloadUrgent, store load overloadUrgent, worst dim CPURate
65-
[mmaid=1] overload-continued s1 ((store=overloadUrgent worst=CPURate cpu=overloadUrgent writes=loadNormal bytes=loadNormal node=overloadUrgent high_disk=false frac_pending=0.00,0.00(true))) - within grace period
66-
[mmaid=1] store s1 was added to shedding store list
67-
[mmaid=1] evaluating s2: node load loadLow, store load loadNormal, worst dim WriteBandwidth
68-
[mmaid=1] evaluating s3: node load loadLow, store load loadNormal, worst dim WriteBandwidth
69-
[mmaid=1] start processing shedding store s1: cpu node load overloadUrgent, store load overloadUrgent, worst dim CPURate
70-
[mmaid=1] top-K[CPURate] ranges for s1 with lease on local s1: r1:[cpu:60, write-bandwidth:0, byte-size:0]
71-
[mmaid=1] local store s1 is CPU overloaded (overloadUrgent >= overloadSlow), attempting lease transfers first
72-
[mmaid=1] considering lease-transfer r1 from s1: candidates are [1 2 3]
73-
[mmaid=1] sortTargetCandidateSetAndPick: candidates: s2(loadNormal) s3(loadNormal), picked s2
74-
[mmaid=1] can add load to n2s2: true targetSLS[(store=overloadSlow worst=CPURate cpu=overloadSlow writes=loadNormal bytes=loadNormal node=overloadSlow high_disk=false frac_pending=0.00,0.00(true))] srcSLS[(store=overloadSlow worst=CPURate cpu=overloadSlow writes=loadNormal bytes=loadNormal node=overloadSlow high_disk=false frac_pending=0.00,0.00(true))]
75-
[mmaid=1] result(success): shedding r1 lease from s1 to s2 [change:r1=[transfer_to=2 cids=1,2]] with resulting loads source:[cpu:40, write-bandwidth:0, byte-size:0] target:[cpu:54, write-bandwidth:0, byte-size:0] (means: [cpu:33, write-bandwidth:0, byte-size:0]) (frac_pending: (src:0.00,target:0.50) (src:4.40,target:0.00))
76-
[mmaid=1] s1 has reached pending decrease threshold(0.50>=0.10) after lease transfers: done shedding with 0 left in topK
77-
[mmaid=1] skipping replica transfers for s1: done shedding=true, lease_transfers=1
78-
pending(2)
79-
change-id=1 store-id=1 node-id=1 range-id=1 load-delta=[cpu:-40, write-bandwidth:0, byte-size:0] start=0s gc=1m0s
80-
prev=(replica-id=1 type=VOTER_FULL leaseholder=true)
81-
next=(replica-id=1 type=VOTER_FULL)
82-
change-id=2 store-id=2 node-id=2 range-id=1 load-delta=[cpu:44, write-bandwidth:0, byte-size:0] start=0s gc=1m0s
83-
prev=(replica-id=2 type=VOTER_FULL)
84-
next=(replica-id=2 type=VOTER_FULL leaseholder=true)

0 commit comments

Comments
 (0)