Skip to content

Commit 9b16bb1

Browse files
craig[bot]tbg
andcommitted
Merge #157820
157820: mmaprototype: clarify maxRangeMoveCount and maxLeaseTransferCount r=tbg a=tbg Informs #157757. Epic: CRDB-55052 Co-authored-by: Tobias Grieger <[email protected]>
2 parents ab5107e + da91095 commit 9b16bb1

14 files changed

+539
-161
lines changed

pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -585,14 +585,15 @@ func sortTargetCandidateSetAndPick(
585585
(cand.nls == loadThreshold && ignoreLevel < ignoreHigherThanLoadThreshold)
586586
candDiscardedByOverloadDim := overloadedDim != NumLoadDimensions &&
587587
cand.dimSummary[overloadedDim] >= loadNoChange
588-
if candDiscardedByNLS || candDiscardedByOverloadDim ||
589-
cand.maxFractionPendingIncrease >= maxFractionPendingThreshold {
588+
candDiscardedByPendingThreshold := cand.maxFractionPendingIncrease >= maxFractionPendingThreshold
589+
if candDiscardedByNLS || candDiscardedByOverloadDim || candDiscardedByPendingThreshold {
590590
// Discard this candidate.
591591
if cand.maxFractionPendingIncrease > epsilon && discardedCandsHadNoPendingChanges {
592592
discardedCandsHadNoPendingChanges = false
593593
}
594594
log.KvDistribution.VEventf(ctx, 2,
595-
"candiate store %v was discarded: sls=%v", cand.StoreID, cand.storeLoadSummary)
595+
"candiate store %v was discarded due to (nls=%t overloadDim=%t pending_thresh=%t): sls=%v", cand.StoreID,
596+
candDiscardedByNLS, candDiscardedByOverloadDim, candDiscardedByPendingThreshold, cand.storeLoadSummary)
596597
continue
597598
}
598599
cands.candidates[j] = cand

pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2473,6 +2473,9 @@ func (cs *clusterState) canShedAndAddLoad(
24732473
// that case.
24742474
}
24752475
canAddLoad = overloadedDimPermitsChange && !otherDimensionsBecameWorseInTarget &&
2476+
// NB: the target here is quite loaded, so we are stricter than in other
2477+
// places and require that there are *no* pending changes (rather than
2478+
// a threshold fraction).
24762479
targetSLS.maxFractionPendingIncrease < epsilon &&
24772480
targetSLS.maxFractionPendingDecrease < epsilon &&
24782481
// NB: targetSLS.nls <= targetSLS.sls is not a typo, in that we are

pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go

Lines changed: 85 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ type rebalanceEnv struct {
3838
changes []PendingRangeChange
3939
// rangeMoveCount tracks the number of range moves made.
4040
rangeMoveCount int
41-
// leaseTransferCount tracks the number of lease transfers made.
42-
leaseTransferCount int
4341
// maxRangeMoveCount is the maximum number of range moves allowed in the
4442
// context of the current rebalanceStores invocation (across multiple
4543
// shedding stores).
@@ -48,7 +46,8 @@ type rebalanceEnv struct {
4846
// the context of the current rebalanceStores invocation. Note that because
4947
// leases can only be shed from the particular local store on whose behalf
5048
// rebalanceStores was called, this limit applies to this particular one
51-
// store.
49+
// store, and thus needs to be checked against only in that single
50+
// rebalanceLeasesFromLocalStoreID invocation.
5251
maxLeaseTransferCount int
5352
// If a store's maxFractionPendingDecrease is at least this threshold, no
5453
// further changes should be made at this time. This is because the inflight
@@ -229,7 +228,13 @@ func (re *rebalanceEnv) rebalanceStores(
229228
}
230229

231230
for _, store := range sheddingStores {
232-
if re.rangeMoveCount >= re.maxRangeMoveCount || re.leaseTransferCount >= re.maxLeaseTransferCount {
231+
// NB: we don't have to check the maxLeaseTransferCount here since only one
232+
// store can transfer leases - the local store. So the limit is only checked
233+
// inside of the corresponding rebalanceLeasesFromLocalStoreID call, but
234+
// not in this outer loop.
235+
if re.rangeMoveCount >= re.maxRangeMoveCount {
236+
log.KvDistribution.VEventf(ctx, 2, "reached max range move count %d, stopping further rebalancing",
237+
re.maxRangeMoveCount)
233238
break
234239
}
235240
re.rebalanceStore(ctx, store, localStoreID)
@@ -257,7 +262,7 @@ func (re *rebalanceEnv) rebalanceStore(
257262
if !ss.adjusted.replicas[rangeID].IsLeaseholder {
258263
load[CPURate] = rstate.load.RaftCPU
259264
}
260-
fmt.Fprintf(&b, " r%d:%v", rangeID, load)
265+
_, _ = fmt.Fprintf(&b, " r%d:%v", rangeID, load)
261266
}
262267
log.KvDistribution.Infof(ctx, "top-K[%s] ranges for s%d with lease on local s%d:%s",
263268
topKRanges.dim, store.StoreID, localStoreID, b.String())
@@ -267,19 +272,40 @@ func (re *rebalanceEnv) rebalanceStore(
267272
}
268273
}
269274

270-
// TODO(tbg): it's somewhat akward that we only enter this branch for
271-
// ss.StoreID == localStoreID and not for *any* calling local store.
272-
// More generally, does it make sense that rebalanceStores is called on
273-
// behalf of a particular store (vs. being called on behalf of the set
274-
// of local store IDs)?
275-
if ss.StoreID == localStoreID && store.dimSummary[CPURate] >= overloadSlow {
276-
shouldSkipReplicaMoves := re.rebalanceLeases(ctx, ss, store, localStoreID)
277-
if shouldSkipReplicaMoves {
278-
return
275+
if ss.StoreID == localStoreID {
276+
if store.dimSummary[CPURate] >= overloadSlow {
277+
// The store which called into rebalanceStore is overloaded. Try to shed
278+
// load from it via lease transfers first. Note that if we have multiple
279+
// stores, this rebalanceStore invocation is on behalf of exactly one of
280+
// them, and that's the one we're going to shed from here - other stores
281+
// will do it when they call into rebalanceStore.
282+
if numTransferred := re.rebalanceLeasesFromLocalStoreID(ctx, ss, store, localStoreID); numTransferred > 0 {
283+
// If any leases were transferred, wait for these changes to be done
284+
// before shedding replicas from this store (which is more costly).
285+
// Otherwise, we may needlessly start moving replicas when we could
286+
// instead have moved more leases in the next invocation. Note that the
287+
// store rebalancer will call the rebalance method again after the lease
288+
// transfer is done, and we may still be considering those transfers as
289+
// pending from a load perspective, so we *may* not be able to do more
290+
// lease transfers -- so be it.
291+
//
292+
// TODO(tbg): rather than skipping replica transfers when there were ANY
293+
// lease transfers, we could instead skip them only if we hit a limit in
294+
// transferring leases. If we didn't hit a limit, this indicates that we
295+
// did consider all of the possible replicas to transfer a lease for,
296+
// and came up short - it then makes sense to consider replica transfers.
297+
// The current heuristic instead bails early, and an immediate call to
298+
// rebalanceStores would likely be made, so that the results could
299+
// ultimately be the same (mod potentially some logging noise as we
300+
// iterate through rebalanceStores more frequently).
301+
log.KvDistribution.VEventf(ctx, 2, "skipping replica transfers for s%d to try more leases next time",
302+
ss.StoreID)
303+
return
304+
}
305+
} else {
306+
log.KvDistribution.VEventf(ctx, 2, "skipping lease shedding for calling store s%s: not cpu overloaded: %v",
307+
localStoreID, store.dimSummary[CPURate])
279308
}
280-
} else {
281-
log.KvDistribution.VEventf(ctx, 2, "skipping lease shedding: s%v != local store s%s or cpu is not overloaded: %v",
282-
ss.StoreID, localStoreID, store.dimSummary[CPURate])
283309
}
284310

285311
log.KvDistribution.VEventf(ctx, 2, "attempting to shed replicas next")
@@ -289,7 +315,6 @@ func (re *rebalanceEnv) rebalanceStore(
289315
func (re *rebalanceEnv) rebalanceReplicas(
290316
ctx context.Context, store sheddingStore, ss *storeState, localStoreID roachpb.StoreID,
291317
) {
292-
doneShedding := false
293318
if store.StoreID != localStoreID && store.dimSummary[CPURate] >= overloadSlow &&
294319
re.now.Sub(ss.overloadStartTime) < remoteStoreLeaseSheddingGraceDuration {
295320
log.KvDistribution.VEventf(ctx, 2, "skipping remote store s%d: in lease shedding grace period", store.StoreID)
@@ -316,6 +341,23 @@ func (re *rebalanceEnv) rebalanceReplicas(
316341
n := topKRanges.len()
317342
loadDim := topKRanges.dim
318343
for i := 0; i < n; i++ {
344+
if re.rangeMoveCount >= re.maxRangeMoveCount {
345+
log.KvDistribution.VEventf(ctx, 2,
346+
"reached max range move count %d; done shedding", re.maxRangeMoveCount)
347+
return
348+
}
349+
if ss.maxFractionPendingDecrease >= re.fractionPendingIncreaseOrDecreaseThreshold {
350+
log.KvDistribution.VEventf(ctx, 2,
351+
"s%d has reached pending decrease threshold(%.2f>=%.2f) after rebalancing; done shedding",
352+
store.StoreID, ss.maxFractionPendingDecrease, re.fractionPendingIncreaseOrDecreaseThreshold)
353+
// TODO(sumeer): For regular rebalancing, we will wait until those top-K
354+
// move and then continue with the rest. There is a risk that the top-K
355+
// have some constraint that prevents rebalancing, while the rest can be
356+
// moved. Running with underprovisioned clusters and expecting load-based
357+
// rebalancing to work well is not in scope.
358+
return
359+
}
360+
319361
rangeID := topKRanges.index(i)
320362
// TODO(sumeer): the following code belongs in a closure, since we will
321363
// repeat it for some random selection of non topKRanges.
@@ -464,42 +506,41 @@ func (re *rebalanceEnv) rebalanceReplicas(
464506
log.KvDistribution.VEventf(ctx, 2,
465507
"result(success): rebalancing r%v from s%v to s%v [change: %v] with resulting loads source: %v target: %v",
466508
rangeID, removeTarget.StoreID, addTarget.StoreID, re.changes[len(re.changes)-1], ss.adjusted.load, targetSS.adjusted.load)
467-
if re.rangeMoveCount >= re.maxRangeMoveCount {
468-
log.KvDistribution.VEventf(ctx, 2, "s%d has reached max range move count %d: mma returning", store.StoreID, re.maxRangeMoveCount)
469-
return
470-
}
471-
doneShedding = ss.maxFractionPendingDecrease >= re.fractionPendingIncreaseOrDecreaseThreshold
472-
if doneShedding {
473-
log.KvDistribution.VEventf(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after rebalancing: done shedding with %d left in topk",
474-
store.StoreID, ss.maxFractionPendingDecrease, re.fractionPendingIncreaseOrDecreaseThreshold, n-(i+1))
475-
break
476-
}
477-
}
478-
// TODO(sumeer): For regular rebalancing, we will wait until those top-K
479-
// move and then continue with the rest. There is a risk that the top-K
480-
// have some constraint that prevents rebalancing, while the rest can be
481-
// moved. Running with underprovisioned clusters and expecting load-based
482-
// rebalancing to work well is not in scope.
483-
if doneShedding {
484-
log.KvDistribution.VEventf(ctx, 2, "store s%d is done shedding, moving to next store", store.StoreID)
485-
return
486509
}
487510
}
488511

489-
func (re *rebalanceEnv) rebalanceLeases(
512+
// rebalanceLeasesFromLocalStoreID attempts to move leases away from the
513+
// provided store, which must be the local store which called into
514+
// rebalanceStores, and must be overloaded on CPU.
515+
//
516+
// Transfers are attempted until we run out of leases to try, hit the max lease
517+
// transfer count limit, or the maximum fractional pending decrease threshold.
518+
//
519+
// Returns the number of lease transfers made.
520+
func (re *rebalanceEnv) rebalanceLeasesFromLocalStoreID(
490521
ctx context.Context, ss *storeState, store sheddingStore, localStoreID roachpb.StoreID,
491-
) bool {
522+
) int /* leaseTransferCount */ {
492523
log.KvDistribution.VEventf(ctx, 2, "local store s%d is CPU overloaded (%v >= %v), attempting lease transfers first",
493524
store.StoreID, store.dimSummary[CPURate], overloadSlow)
494525
// This store is local, and cpu overloaded. Shed leases first.
495526
//
496527
// NB: any ranges at this store that don't have pending changes must
497528
// have this local store as the leaseholder.
498-
localLeaseTransferCount := 0
499529
topKRanges := ss.adjusted.topKRanges[localStoreID]
530+
var leaseTransferCount int
500531
n := topKRanges.len()
501-
doneShedding := false
502532
for i := 0; i < n; i++ {
533+
if leaseTransferCount >= re.maxLeaseTransferCount {
534+
log.KvDistribution.VEventf(ctx, 2, "reached max lease transfer count %d, returning", re.maxLeaseTransferCount)
535+
return leaseTransferCount
536+
}
537+
if ss.maxFractionPendingDecrease >= re.fractionPendingIncreaseOrDecreaseThreshold {
538+
log.KvDistribution.VEventf(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%."+
539+
"2f) after %d lease transfers",
540+
store.StoreID, ss.maxFractionPendingDecrease, re.fractionPendingIncreaseOrDecreaseThreshold, leaseTransferCount)
541+
return leaseTransferCount
542+
}
543+
503544
rangeID := topKRanges.index(i)
504545
rstate := re.ranges[rangeID]
505546
if len(rstate.pendingChanges) > 0 {
@@ -634,8 +675,6 @@ func (re *rebalanceEnv) rebalanceLeases(
634675
}
635676
re.addPendingRangeChange(leaseChange)
636677
re.changes = append(re.changes, leaseChange)
637-
re.leaseTransferCount++
638-
localLeaseTransferCount++
639678
if re.changes[len(re.changes)-1].IsChangeReplicas() || !re.changes[len(re.changes)-1].IsTransferLease() {
640679
panic(fmt.Sprintf("lease transfer is invalid: %v", re.changes[len(re.changes)-1]))
641680
}
@@ -646,28 +685,8 @@ func (re *rebalanceEnv) rebalanceLeases(
646685
ss.adjusted.load, targetSS.adjusted.load, means.storeLoad.load,
647686
ss.maxFractionPendingIncrease, ss.maxFractionPendingDecrease,
648687
targetSS.maxFractionPendingIncrease, targetSS.maxFractionPendingDecrease)
649-
if re.leaseTransferCount >= re.maxLeaseTransferCount {
650-
log.KvDistribution.VEventf(ctx, 2, "reached max lease transfer count %d, returning", re.maxLeaseTransferCount)
651-
break
652-
}
653-
doneShedding = ss.maxFractionPendingDecrease >= re.fractionPendingIncreaseOrDecreaseThreshold
654-
if doneShedding {
655-
log.KvDistribution.VEventf(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after lease transfers: done shedding with %d left in topK",
656-
store.StoreID, ss.maxFractionPendingDecrease, re.fractionPendingIncreaseOrDecreaseThreshold, n-(i+1))
657-
break
658-
}
659-
}
660-
if doneShedding || localLeaseTransferCount > 0 {
661-
// If managed to transfer a lease, wait for it to be done, before
662-
// shedding replicas from this store (which is more costly). Otherwise
663-
// we may needlessly start moving replicas. Note that the store
664-
// rebalancer will call the rebalance method again after the lease
665-
// transfer is done and we may still be considering those transfers as
666-
// pending from a load perspective, so we *may* not be able to do more
667-
// lease transfers -- so be it.
668-
log.KvDistribution.VEventf(ctx, 2, "skipping replica transfers for s%d: done shedding=%v, lease_transfers=%d",
669-
store.StoreID, doneShedding, localLeaseTransferCount)
670-
return true
688+
leaseTransferCount++
671689
}
672-
return false
690+
// We iterated through all top-K ranges without running into any limits.
691+
return leaseTransferCount
673692
}

pkg/kv/kvserver/allocator/mmaprototype/cluster_state_test.go

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
"fmt"
1111
"math/rand"
12+
"path/filepath"
1213
"slices"
1314
"sort"
1415
"strconv"
@@ -329,8 +330,9 @@ func testingGetPendingChanges(t *testing.T, cs *clusterState) []*pendingReplicaC
329330
}
330331

331332
func TestClusterState(t *testing.T) {
333+
tdPath := datapathutils.TestDataPath(t, "cluster_state")
332334
datadriven.Walk(t,
333-
datapathutils.TestDataPath(t, "cluster_state"),
335+
tdPath,
334336
func(t *testing.T, path string) {
335337
ts := timeutil.NewManualTime(time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC))
336338
cs := newClusterState(ts, newStringInterner())
@@ -355,8 +357,15 @@ func TestClusterState(t *testing.T) {
355357
return buf.String()
356358
}
357359

358-
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
360+
// Recursively invoked in `include` directive.
361+
var invokeFn func(t *testing.T, d *datadriven.TestData) string
362+
invokeFn = func(t *testing.T, d *datadriven.TestData) string {
359363
switch d.Cmd {
364+
case "include":
365+
loc := dd.ScanArg[string](t, d, "path")
366+
datadriven.RunTest(t, filepath.Join(tdPath, loc), invokeFn)
367+
return "ok"
368+
360369
case "ranges":
361370
var rangeIDs []int
362371
for rangeID := range cs.ranges {
@@ -436,13 +445,19 @@ func TestClusterState(t *testing.T) {
436445
// be in the past, indicating gossip delay. However, having it be
437446
// some arbitrary value can be confusing for the test reader.
438447
// Consider making it relative to ts.
439-
msg := parseStoreLoadMsg(t, d.Input)
440-
cs.processStoreLoadMsg(context.Background(), &msg)
448+
for line := range strings.Lines(d.Input) {
449+
msg := parseStoreLoadMsg(t, line)
450+
cs.processStoreLoadMsg(context.Background(), &msg)
451+
}
441452
return ""
442453

443454
case "store-leaseholder-msg":
444455
msg := parseStoreLeaseholderMsg(t, d.Input)
445-
cs.processStoreLeaseholderMsgInternal(context.Background(), &msg, 2, nil)
456+
n := numTopKReplicas
457+
if o, ok := dd.ScanArgOpt[int](t, d, "num-top-k-replicas"); ok {
458+
n = o
459+
}
460+
cs.processStoreLeaseholderMsgInternal(context.Background(), &msg, n, nil)
446461
return ""
447462

448463
case "make-pending-changes":
@@ -524,6 +539,17 @@ func TestClusterState(t *testing.T) {
524539
defer tr.Close()
525540
ctx, finishAndGet := tracing.ContextWithRecordingSpan(context.Background(), tr, "rebalance-stores")
526541
re := newRebalanceEnv(cs, rng, dsm, cs.ts.Now())
542+
543+
if n, ok := dd.ScanArgOpt[int](t, d, "max-lease-transfer-count"); ok {
544+
re.maxLeaseTransferCount = n
545+
}
546+
if n, ok := dd.ScanArgOpt[int](t, d, "max-range-move-count"); ok {
547+
re.maxRangeMoveCount = n
548+
}
549+
if f, ok := dd.ScanArgOpt[float64](t, d, "fraction-pending-decrease-threshold"); ok {
550+
re.fractionPendingIncreaseOrDecreaseThreshold = f
551+
}
552+
527553
re.rebalanceStores(ctx, storeID)
528554
rec := finishAndGet()
529555
var sb redact.StringBuilder
@@ -538,7 +564,8 @@ func TestClusterState(t *testing.T) {
538564
default:
539565
panic(fmt.Sprintf("unknown command: %v", d.Cmd))
540566
}
541-
},
542-
)
567+
}
568+
569+
datadriven.RunTest(t, path, invokeFn)
543570
})
544571
}

pkg/kv/kvserver/allocator/mmaprototype/testdata/cluster_state/multiple_ranges.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ store-load-msg
1717
----
1818

1919
# Range 4 has the highest cpu on the leaseholder, and range 3 the highest cpu on a follower.
20-
store-leaseholder-msg
20+
store-leaseholder-msg num-top-k-replicas=2
2121
store-id=1
2222
range-id=1 load=[10,10,20] raft-cpu=5 config=(num_replicas=3 constraints={'+region=us-west-1:1'} voter_constraints={'+region=us-west-1:1'})
2323
store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true

0 commit comments

Comments
 (0)