Skip to content

Commit 06b478d

Browse files
craig[bot]tbg
andcommitted
Merge #158182
158182: mmaprototype: prepare rebalanceStores for more testing r=tbg a=tbg This will further simplify testing rebalancing in `TestClusterState` by allowing that test to edit the `rebalanceEnv` before calling its `rebalanceStores` method, for example in order to tune configuration parameters that are otherwise hard-coded. Most of this was extracted from #157820. Epic: CRDB-55052 Co-authored-by: Tobias Grieger <[email protected]>
2 parents 5fa041d + e5f4b62 commit 06b478d

13 files changed

+102
-58
lines changed

pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -212,11 +212,6 @@ type allocatorState struct {
212212

213213
var _ Allocator = &allocatorState{}
214214

215-
// TODO(sumeer): temporary constants.
216-
const (
217-
maxFractionPendingThreshold = 0.1
218-
)
219-
220215
func NewAllocatorState(ts timeutil.TimeSource, rand *rand.Rand) *allocatorState {
221216
interner := newStringInterner()
222217
cs := newClusterState(ts, interner)
@@ -333,7 +328,8 @@ func (a *allocatorState) ComputeChanges(
333328
panic(fmt.Sprintf("ComputeChanges: expected StoreID %d, got %d", opts.LocalStoreID, msg.StoreID))
334329
}
335330
a.cs.processStoreLeaseholderMsg(ctx, msg, a.mmaMetrics)
336-
return a.cs.rebalanceStores(ctx, opts.LocalStoreID, a.rand, a.diversityScoringMemo)
331+
re := newRebalanceEnv(a.cs, a.rand, a.diversityScoringMemo, a.cs.ts.Now())
332+
return re.rebalanceStores(ctx, opts.LocalStoreID)
337333
}
338334

339335
// AdminRelocateOne implements the Allocator interface.
@@ -497,6 +493,7 @@ func sortTargetCandidateSetAndPick(
497493
ignoreLevel ignoreLevel,
498494
overloadedDim LoadDimension,
499495
rng *rand.Rand,
496+
maxFractionPendingThreshold float64,
500497
) roachpb.StoreID {
501498
var b strings.Builder
502499
for i := range cands.candidates {

pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -880,7 +880,7 @@ type storeState struct {
880880
localityTiers
881881

882882
// Time when this store started to be observed as overloaded. Set by
883-
// clusterState.rebalanceStores.
883+
// rebalanceStores.
884884
overloadStartTime time.Time
885885
// When overloaded this is equal to time.Time{}.
886886
overloadEndTime time.Time
@@ -1385,6 +1385,8 @@ type clusterState struct {
13851385
*constraintMatcher
13861386
*localityTierInterner
13871387
meansMemo *meansMemo
1388+
1389+
mmaid int // a counter for rebalanceStores calls, for logging
13881390
}
13891391

13901392
func newClusterState(ts timeutil.TimeSource, interner *stringInterner) *clusterState {

pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go

Lines changed: 94 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"math/rand"
1313
"slices"
1414
"strings"
15-
"sync/atomic"
1615
"time"
1716

1817
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -21,9 +20,14 @@ import (
2120
"github.com/cockroachdb/logtags"
2221
)
2322

24-
var mmaid = atomic.Int64{}
25-
2623
// rebalanceEnv tracks the state and outcomes of a rebalanceStores invocation.
24+
// Recall that such an invocation is on behalf of a local store, but will
25+
// iterate over a slice of shedding stores - these are not necessarily local
26+
// but are instead stores which are overloaded and need to shed load. Typically,
27+
// the particular calling local store has at least some leases (i.e. is
28+
// capable of making decisions), though in edge cases this may not be true,
29+
// in which case the rebalanceStores invocation will simply not be able to
30+
// do anything.
2731
type rebalanceEnv struct {
2832
*clusterState
2933
// rng is the random number generator for non-deterministic decisions.
@@ -36,13 +40,25 @@ type rebalanceEnv struct {
3640
rangeMoveCount int
3741
// leaseTransferCount tracks the number of lease transfers made.
3842
leaseTransferCount int
39-
// maxRangeMoveCount is the maximum number of range moves allowed.
43+
// maxRangeMoveCount is the maximum number of range moves allowed in the
44+
// context of the current rebalanceStores invocation (across multiple
45+
// shedding stores).
4046
maxRangeMoveCount int
41-
// maxLeaseTransferCount is the maximum number of lease transfers allowed.
47+
// maxLeaseTransferCount is the maximum number of lease transfers allowed in
48+
// the context of the current rebalanceStores invocation. Note that because
49+
// leases can only be shed from the particular local store on whose behalf
50+
// rebalanceStores was called, this limit applies to this particular one
51+
// store.
4252
maxLeaseTransferCount int
53+
// If a store's maxFractionPendingDecrease is at least this threshold, no
54+
// further changes should be made at this time. This is because the inflight
55+
// operation's changes to the load are estimated, and we don't want to pile up
56+
// too many possibly inaccurate estimates.
57+
fractionPendingIncreaseOrDecreaseThreshold float64
4358
// lastFailedChangeDelayDuration is the delay after a failed change before retrying.
4459
lastFailedChangeDelayDuration time.Duration
45-
// now is the timestamp when rebalancing started.
60+
// now is the timestamp representing the start time of the current
61+
// rebalanceStores invocation.
4662
now time.Time
4763
// Scratch variables reused across iterations.
4864
scratch struct {
@@ -54,6 +70,47 @@ type rebalanceEnv struct {
5470
}
5571
}
5672

73+
func newRebalanceEnv(
74+
cs *clusterState, rng *rand.Rand, dsm *diversityScoringMemo, now time.Time,
75+
) *rebalanceEnv {
76+
77+
// NB: these consts are intentionally local to the constructor, proving
78+
// that they're not accessed anywhere else.
79+
const (
80+
// The current usage of the multi-metric allocator has rebalanceStores called
81+
// in a loop while it emits work, and on a timer otherwise. This means that
82+
// we don't need to emit many changes per invocation. Especially for range
83+
// moves, which require moving significant amounts of data, emitting them one
84+
// by one is fine and by doing so we operate on more recent information at each
85+
// turn. Even if the caller carried out multiple changes concurrently, we'd
86+
// want to be careful not to emit too many range moves at once, since they are
87+
// expensive.
88+
// Lease transfers are cheap and fast, so we emit a few more to avoid frequent
89+
// trips through rebalanceStores which could cause some logging noise.
90+
maxRangeMoveCount = 1
91+
maxLeaseTransferCount = 8
92+
// TODO(sumeer): revisit this value.
93+
fractionPendingIncreaseOrDecreaseThreshold = 0.1
94+
95+
// See the long comment where rangeState.lastFailedChange is declared.
96+
lastFailedChangeDelayDuration time.Duration = 60 * time.Second
97+
)
98+
99+
re := &rebalanceEnv{
100+
clusterState: cs,
101+
rng: rng,
102+
dsm: dsm,
103+
now: now,
104+
maxRangeMoveCount: maxRangeMoveCount,
105+
maxLeaseTransferCount: maxLeaseTransferCount,
106+
fractionPendingIncreaseOrDecreaseThreshold: fractionPendingIncreaseOrDecreaseThreshold,
107+
lastFailedChangeDelayDuration: lastFailedChangeDelayDuration,
108+
}
109+
re.scratch.nodes = map[roachpb.NodeID]*NodeLoad{}
110+
re.scratch.stores = map[roachpb.StoreID]struct{}{}
111+
return re
112+
}
113+
57114
type sheddingStore struct {
58115
roachpb.StoreID
59116
storeLoadSummary
@@ -63,11 +120,13 @@ type sheddingStore struct {
63120
//
64121
// We do not want to shed replicas for CPU from a remote store until its had a
65122
// chance to shed leases.
66-
func (cs *clusterState) rebalanceStores(
67-
ctx context.Context, localStoreID roachpb.StoreID, rng *rand.Rand, dsm *diversityScoringMemo,
123+
func (re *rebalanceEnv) rebalanceStores(
124+
ctx context.Context, localStoreID roachpb.StoreID,
68125
) []PendingRangeChange {
69-
now := cs.ts.Now()
70-
ctx = logtags.AddTag(ctx, "mmaid", mmaid.Add(1))
126+
re.mmaid++
127+
id := re.mmaid
128+
ctx = logtags.AddTag(ctx, "mmaid", id)
129+
71130
log.KvDistribution.VEventf(ctx, 2, "rebalanceStores begins")
72131
// To select which stores are overloaded, we use a notion of overload that
73132
// is based on cluster means (and of course individual store/node
@@ -87,7 +146,7 @@ func (cs *clusterState) rebalanceStores(
87146
// responsible for equalizing load across two nodes that have 30% and 50%
88147
// cpu utilization while the cluster mean is 70% utilization (as an
89148
// example).
90-
clusterMeans := cs.meansMemo.getMeans(nil)
149+
clusterMeans := re.meansMemo.getMeans(nil)
91150
var sheddingStores []sheddingStore
92151
log.KvDistribution.Infof(ctx,
93152
"cluster means: (stores-load %s) (stores-capacity %s) (nodes-cpu-load %d) (nodes-cpu-capacity %d)",
@@ -97,15 +156,26 @@ func (cs *clusterState) rebalanceStores(
97156
// fdDrain or fdDead, nor do we attempt to shed replicas from a store which
98157
// is storeMembershipRemoving (decommissioning). These are currently handled
99158
// via replicate_queue.go.
100-
for storeID, ss := range cs.stores {
101-
sls := cs.meansMemo.getStoreLoadSummary(ctx, clusterMeans, storeID, ss.loadSeqNum)
159+
160+
// Need deterministic order for datadriven testing.
161+
storeStateSlice := make([]*storeState, 0, len(re.stores))
162+
for _, ss := range re.stores {
163+
storeStateSlice = append(storeStateSlice, ss)
164+
}
165+
slices.SortFunc(storeStateSlice, func(a, b *storeState) int {
166+
return cmp.Compare(a.StoreID, b.StoreID)
167+
})
168+
169+
for _, ss := range storeStateSlice {
170+
storeID := ss.StoreID
171+
sls := re.meansMemo.getStoreLoadSummary(ctx, clusterMeans, storeID, ss.loadSeqNum)
102172
log.KvDistribution.VEventf(ctx, 2, "evaluating s%d: node load %s, store load %s, worst dim %s",
103173
storeID, sls.nls, sls.sls, sls.worstDim)
104174

105175
if sls.sls >= overloadSlow {
106176
if ss.overloadEndTime != (time.Time{}) {
107-
if now.Sub(ss.overloadEndTime) > overloadGracePeriod {
108-
ss.overloadStartTime = now
177+
if re.now.Sub(ss.overloadEndTime) > overloadGracePeriod {
178+
ss.overloadStartTime = re.now
109179
log.KvDistribution.Infof(ctx, "overload-start s%v (%v) - grace period expired", storeID, sls)
110180
} else {
111181
// Else, extend the previous overload interval.
@@ -114,21 +184,21 @@ func (cs *clusterState) rebalanceStores(
114184
ss.overloadEndTime = time.Time{}
115185
}
116186
// The pending decrease must be small enough to continue shedding
117-
if ss.maxFractionPendingDecrease < maxFractionPendingThreshold &&
187+
if ss.maxFractionPendingDecrease < re.fractionPendingIncreaseOrDecreaseThreshold &&
118188
// There should be no pending increase, since that can be an overestimate.
119189
ss.maxFractionPendingIncrease < epsilon {
120190
log.KvDistribution.VEventf(ctx, 2, "store s%v was added to shedding store list", storeID)
121191
sheddingStores = append(sheddingStores, sheddingStore{StoreID: storeID, storeLoadSummary: sls})
122192
} else {
123193
log.KvDistribution.VEventf(ctx, 2,
124194
"skipping overloaded store s%d (worst dim: %s): pending decrease %.2f >= threshold %.2f or pending increase %.2f >= epsilon",
125-
storeID, sls.worstDim, ss.maxFractionPendingDecrease, maxFractionPendingThreshold, ss.maxFractionPendingIncrease)
195+
storeID, sls.worstDim, ss.maxFractionPendingDecrease, re.fractionPendingIncreaseOrDecreaseThreshold, ss.maxFractionPendingIncrease)
126196
}
127197
} else if sls.sls < loadNoChange && ss.overloadEndTime == (time.Time{}) {
128198
// NB: we don't stop the overloaded interval if the store is at
129199
// loadNoChange, since a store can hover at the border of the two.
130200
log.KvDistribution.Infof(ctx, "overload-end s%v (%v) - load dropped below no-change threshold", storeID, sls)
131-
ss.overloadEndTime = now
201+
ss.overloadEndTime = re.now
132202
}
133203
}
134204

@@ -158,32 +228,6 @@ func (cs *clusterState) rebalanceStores(
158228
}
159229
}
160230

161-
// The caller has a fixed concurrency limit it can move ranges at, when it
162-
// is the sender of the snapshot. So we don't want to create too many
163-
// changes, since then the allocator gets too far ahead of what has been
164-
// enacted, and its decision-making is no longer based on recent
165-
// information. We don't have this issue with lease transfers since they are
166-
// very fast, so we set a much higher limit.
167-
//
168-
// TODO: revisit these constants.
169-
const maxRangeMoveCount = 1
170-
const maxLeaseTransferCount = 8
171-
// See the long comment where rangeState.lastFailedChange is declared.
172-
const lastFailedChangeDelayDuration time.Duration = 60 * time.Second
173-
re := &rebalanceEnv{
174-
clusterState: cs,
175-
rng: rng,
176-
dsm: dsm,
177-
changes: []PendingRangeChange{},
178-
rangeMoveCount: 0,
179-
leaseTransferCount: 0,
180-
maxRangeMoveCount: maxRangeMoveCount,
181-
maxLeaseTransferCount: maxLeaseTransferCount,
182-
lastFailedChangeDelayDuration: lastFailedChangeDelayDuration,
183-
now: now,
184-
}
185-
re.scratch.nodes = map[roachpb.NodeID]*NodeLoad{}
186-
re.scratch.stores = map[roachpb.StoreID]struct{}{}
187231
for _, store := range sheddingStores {
188232
if re.rangeMoveCount >= re.maxRangeMoveCount || re.leaseTransferCount >= re.maxLeaseTransferCount {
189233
break
@@ -378,7 +422,7 @@ func (re *rebalanceEnv) rebalanceReplicas(
378422
ignoreLevel, ssSLS.sls, rangeID, overloadDur)
379423
}
380424
targetStoreID := sortTargetCandidateSetAndPick(
381-
ctx, cands, ssSLS.sls, ignoreLevel, loadDim, re.rng)
425+
ctx, cands, ssSLS.sls, ignoreLevel, loadDim, re.rng, re.fractionPendingIncreaseOrDecreaseThreshold)
382426
if targetStoreID == 0 {
383427
log.KvDistribution.VEventf(ctx, 2, "result(failed): no suitable target found among candidates for r%d "+
384428
"(threshold %s; %s)", rangeID, ssSLS.sls, ignoreLevel)
@@ -424,10 +468,10 @@ func (re *rebalanceEnv) rebalanceReplicas(
424468
log.KvDistribution.VEventf(ctx, 2, "s%d has reached max range move count %d: mma returning", store.StoreID, re.maxRangeMoveCount)
425469
return
426470
}
427-
doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold
471+
doneShedding = ss.maxFractionPendingDecrease >= re.fractionPendingIncreaseOrDecreaseThreshold
428472
if doneShedding {
429473
log.KvDistribution.VEventf(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after rebalancing: done shedding with %d left in topk",
430-
store.StoreID, ss.maxFractionPendingDecrease, maxFractionPendingThreshold, n-(i+1))
474+
store.StoreID, ss.maxFractionPendingDecrease, re.fractionPendingIncreaseOrDecreaseThreshold, n-(i+1))
431475
break
432476
}
433477
}
@@ -551,7 +595,7 @@ func (re *rebalanceEnv) rebalanceLeases(
551595
// will only add CPU to the target store (so it is ok to ignore other
552596
// dimensions on the target).
553597
targetStoreID := sortTargetCandidateSetAndPick(
554-
ctx, candsSet, sls.sls, ignoreHigherThanLoadThreshold, CPURate, re.rng)
598+
ctx, candsSet, sls.sls, ignoreHigherThanLoadThreshold, CPURate, re.rng, re.fractionPendingIncreaseOrDecreaseThreshold)
555599
if targetStoreID == 0 {
556600
log.KvDistribution.Infof(
557601
ctx,
@@ -606,10 +650,10 @@ func (re *rebalanceEnv) rebalanceLeases(
606650
log.KvDistribution.VEventf(ctx, 2, "reached max lease transfer count %d, returning", re.maxLeaseTransferCount)
607651
break
608652
}
609-
doneShedding = ss.maxFractionPendingDecrease >= maxFractionPendingThreshold
653+
doneShedding = ss.maxFractionPendingDecrease >= re.fractionPendingIncreaseOrDecreaseThreshold
610654
if doneShedding {
611655
log.KvDistribution.VEventf(ctx, 2, "s%d has reached pending decrease threshold(%.2f>=%.2f) after lease transfers: done shedding with %d left in topK",
612-
store.StoreID, ss.maxFractionPendingDecrease, maxFractionPendingThreshold, n-(i+1))
656+
store.StoreID, ss.maxFractionPendingDecrease, re.fractionPendingIncreaseOrDecreaseThreshold, n-(i+1))
613657
break
614658
}
615659
}

pkg/kv/kvserver/allocator/mmaprototype/cluster_state_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,8 @@ func TestClusterState(t *testing.T) {
523523
tr.SetRedactable(true)
524524
defer tr.Close()
525525
ctx, finishAndGet := tracing.ContextWithRecordingSpan(context.Background(), tr, "rebalance-stores")
526-
cs.rebalanceStores(ctx, storeID, rng, dsm)
526+
re := newRebalanceEnv(cs, rng, dsm, cs.ts.Now())
527+
re.rebalanceStores(ctx, storeID)
527528
rec := finishAndGet()
528529
var sb redact.StringBuilder
529530
rec.SafeFormatMinimal(&sb)

0 commit comments

Comments
 (0)