Skip to content

Commit 399a4f6

Browse files
committed
mmaprototype: extract newRebalanceEnv
This commit - creates a proper constructor for rebalanceEnv - moves rebalanceStores from the clusterState receiver onto rebalanceEnv - moves some consts and improves a number of comments. This all prepares for more testing of rebalanceStores via TestClusterState, which will require fiddling with the limits such as maxRangeMoveCount. Having these consts only inform the default configuration of a rebalanceEnv will allow the test to update these values on the create rebalanceEnv before calling into rebalanceStores. Epic: CRDB-55052
1 parent 5fa041d commit 399a4f6

File tree

4 files changed

+70
-41
lines changed

4 files changed

+70
-41
lines changed

pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ type allocatorState struct {
213213
var _ Allocator = &allocatorState{}
214214

215215
// TODO(sumeer): temporary constants.
216+
// TODO(tbg): avoid direct access to this constant so that it can be configured
217+
// in tests. As is, it can be overridden in some places but not others.
216218
const (
217219
maxFractionPendingThreshold = 0.1
218220
)
@@ -333,7 +335,8 @@ func (a *allocatorState) ComputeChanges(
333335
panic(fmt.Sprintf("ComputeChanges: expected StoreID %d, got %d", opts.LocalStoreID, msg.StoreID))
334336
}
335337
a.cs.processStoreLeaseholderMsg(ctx, msg, a.mmaMetrics)
336-
return a.cs.rebalanceStores(ctx, opts.LocalStoreID, a.rand, a.diversityScoringMemo)
338+
re := newRebalanceEnv(a.cs, a.rand, a.diversityScoringMemo, a.cs.ts.Now())
339+
return re.rebalanceStores(ctx, opts.LocalStoreID)
337340
}
338341

339342
// AdminRelocateOne implements the Allocator interface.

pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -880,7 +880,7 @@ type storeState struct {
880880
localityTiers
881881

882882
// Time when this store started to be observed as overloaded. Set by
883-
// clusterState.rebalanceStores.
883+
// rebalanceStores.
884884
overloadStartTime time.Time
885885
// When overloaded this is equal to time.Time{}.
886886
overloadEndTime time.Time

pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go

Lines changed: 63 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ import (
2424
var mmaid = atomic.Int64{}
2525

2626
// rebalanceEnv tracks the state and outcomes of a rebalanceStores invocation.
27+
// Recall that such an invocation is on behalf of a local store, but will
28+
// iterate over a slice of shedding stores - these are not necessarily local
29+
// but are instead stores which are overloaded and need to shed load. Typically,
30+
// the particular calling local store has at least some leases (i.e. is
31+
// capable of making decisions), though in edge cases this may not be true,
32+
// in which case the rebalanceStores invocation will simply not be able to
33+
// do anything.
2734
type rebalanceEnv struct {
2835
*clusterState
2936
// rng is the random number generator for non-deterministic decisions.
@@ -36,13 +43,20 @@ type rebalanceEnv struct {
3643
rangeMoveCount int
3744
// leaseTransferCount tracks the number of lease transfers made.
3845
leaseTransferCount int
39-
// maxRangeMoveCount is the maximum number of range moves allowed.
46+
// maxRangeMoveCount is the maximum number of range moves allowed in the
47+
// context of the current rebalanceStores invocation (across multiple
48+
// shedding stores).
4049
maxRangeMoveCount int
41-
// maxLeaseTransferCount is the maximum number of lease transfers allowed.
50+
// maxLeaseTransferCount is the maximum number of lease transfers allowed in
51+
// the context of the current rebalanceStores invocation. Note that because
52+
// leases can only be shed from the particular local store on whose behalf
53+
// rebalanceStores was called, this limit applies to this particular one
54+
// store.
4255
maxLeaseTransferCount int
4356
// lastFailedChangeDelayDuration is the delay after a failed change before retrying.
4457
lastFailedChangeDelayDuration time.Duration
45-
// now is the timestamp when rebalancing started.
58+
// now is the timestamp representing the start time of the current
59+
// rebalanceStores invocation.
4660
now time.Time
4761
// Scratch variables reused across iterations.
4862
scratch struct {
@@ -54,6 +68,43 @@ type rebalanceEnv struct {
5468
}
5569
}
5670

71+
func newRebalanceEnv(
72+
cs *clusterState, rng *rand.Rand, dsm *diversityScoringMemo, now time.Time,
73+
) *rebalanceEnv {
74+
75+
// NB: these consts are intentionally local to the constructor, proving
76+
// that they're not accessed anywhere else.
77+
const (
78+
// The current usage of the multi-metric allocator has rebalanceStores called
79+
// in a loop while it emits work, and on a timer otherwise. This means that
80+
// we don't need to emit many changes per invocation. Especially for range
81+
// moves, which require moving significant amounts of data, emitting them one
82+
// by one is fine and by doing so we operate on more recent information at each
83+
// turn. Even if the caller carried out multiple changes concurrently, we'd
84+
// want to be careful not to emit too many range moves at once, since they are
85+
// expensive.
86+
// Lease transfers are cheap and fast, so we emit a few more to avoid frequent
87+
// trips through rebalanceStores which could cause some logging noise.
88+
maxRangeMoveCount = 1
89+
maxLeaseTransferCount = 8
90+
// See the long comment where rangeState.lastFailedChange is declared.
91+
lastFailedChangeDelayDuration time.Duration = 60 * time.Second
92+
)
93+
94+
re := &rebalanceEnv{
95+
clusterState: cs,
96+
rng: rng,
97+
dsm: dsm,
98+
now: now,
99+
maxRangeMoveCount: maxRangeMoveCount,
100+
maxLeaseTransferCount: maxLeaseTransferCount,
101+
lastFailedChangeDelayDuration: lastFailedChangeDelayDuration,
102+
}
103+
re.scratch.nodes = map[roachpb.NodeID]*NodeLoad{}
104+
re.scratch.stores = map[roachpb.StoreID]struct{}{}
105+
return re
106+
}
107+
57108
type sheddingStore struct {
58109
roachpb.StoreID
59110
storeLoadSummary
@@ -63,11 +114,11 @@ type sheddingStore struct {
63114
//
64115
// We do not want to shed replicas for CPU from a remote store until its had a
65116
// chance to shed leases.
66-
func (cs *clusterState) rebalanceStores(
67-
ctx context.Context, localStoreID roachpb.StoreID, rng *rand.Rand, dsm *diversityScoringMemo,
117+
func (re *rebalanceEnv) rebalanceStores(
118+
ctx context.Context, localStoreID roachpb.StoreID,
68119
) []PendingRangeChange {
69-
now := cs.ts.Now()
70120
ctx = logtags.AddTag(ctx, "mmaid", mmaid.Add(1))
121+
71122
log.KvDistribution.VEventf(ctx, 2, "rebalanceStores begins")
72123
// To select which stores are overloaded, we use a notion of overload that
73124
// is based on cluster means (and of course individual store/node
@@ -87,7 +138,7 @@ func (cs *clusterState) rebalanceStores(
87138
// responsible for equalizing load across two nodes that have 30% and 50%
88139
// cpu utilization while the cluster mean is 70% utilization (as an
89140
// example).
90-
clusterMeans := cs.meansMemo.getMeans(nil)
141+
clusterMeans := re.meansMemo.getMeans(nil)
91142
var sheddingStores []sheddingStore
92143
log.KvDistribution.Infof(ctx,
93144
"cluster means: (stores-load %s) (stores-capacity %s) (nodes-cpu-load %d) (nodes-cpu-capacity %d)",
@@ -97,15 +148,15 @@ func (cs *clusterState) rebalanceStores(
97148
// fdDrain or fdDead, nor do we attempt to shed replicas from a store which
98149
// is storeMembershipRemoving (decommissioning). These are currently handled
99150
// via replicate_queue.go.
100-
for storeID, ss := range cs.stores {
101-
sls := cs.meansMemo.getStoreLoadSummary(ctx, clusterMeans, storeID, ss.loadSeqNum)
151+
for storeID, ss := range re.stores {
152+
sls := re.meansMemo.getStoreLoadSummary(ctx, clusterMeans, storeID, ss.loadSeqNum)
102153
log.KvDistribution.VEventf(ctx, 2, "evaluating s%d: node load %s, store load %s, worst dim %s",
103154
storeID, sls.nls, sls.sls, sls.worstDim)
104155

105156
if sls.sls >= overloadSlow {
106157
if ss.overloadEndTime != (time.Time{}) {
107-
if now.Sub(ss.overloadEndTime) > overloadGracePeriod {
108-
ss.overloadStartTime = now
158+
if re.now.Sub(ss.overloadEndTime) > overloadGracePeriod {
159+
ss.overloadStartTime = re.now
109160
log.KvDistribution.Infof(ctx, "overload-start s%v (%v) - grace period expired", storeID, sls)
110161
} else {
111162
// Else, extend the previous overload interval.
@@ -128,7 +179,7 @@ func (cs *clusterState) rebalanceStores(
128179
// NB: we don't stop the overloaded interval if the store is at
129180
// loadNoChange, since a store can hover at the border of the two.
130181
log.KvDistribution.Infof(ctx, "overload-end s%v (%v) - load dropped below no-change threshold", storeID, sls)
131-
ss.overloadEndTime = now
182+
ss.overloadEndTime = re.now
132183
}
133184
}
134185

@@ -158,32 +209,6 @@ func (cs *clusterState) rebalanceStores(
158209
}
159210
}
160211

161-
// The caller has a fixed concurrency limit it can move ranges at, when it
162-
// is the sender of the snapshot. So we don't want to create too many
163-
// changes, since then the allocator gets too far ahead of what has been
164-
// enacted, and its decision-making is no longer based on recent
165-
// information. We don't have this issue with lease transfers since they are
166-
// very fast, so we set a much higher limit.
167-
//
168-
// TODO: revisit these constants.
169-
const maxRangeMoveCount = 1
170-
const maxLeaseTransferCount = 8
171-
// See the long comment where rangeState.lastFailedChange is declared.
172-
const lastFailedChangeDelayDuration time.Duration = 60 * time.Second
173-
re := &rebalanceEnv{
174-
clusterState: cs,
175-
rng: rng,
176-
dsm: dsm,
177-
changes: []PendingRangeChange{},
178-
rangeMoveCount: 0,
179-
leaseTransferCount: 0,
180-
maxRangeMoveCount: maxRangeMoveCount,
181-
maxLeaseTransferCount: maxLeaseTransferCount,
182-
lastFailedChangeDelayDuration: lastFailedChangeDelayDuration,
183-
now: now,
184-
}
185-
re.scratch.nodes = map[roachpb.NodeID]*NodeLoad{}
186-
re.scratch.stores = map[roachpb.StoreID]struct{}{}
187212
for _, store := range sheddingStores {
188213
if re.rangeMoveCount >= re.maxRangeMoveCount || re.leaseTransferCount >= re.maxLeaseTransferCount {
189214
break

pkg/kv/kvserver/allocator/mmaprototype/cluster_state_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,8 @@ func TestClusterState(t *testing.T) {
523523
tr.SetRedactable(true)
524524
defer tr.Close()
525525
ctx, finishAndGet := tracing.ContextWithRecordingSpan(context.Background(), tr, "rebalance-stores")
526-
cs.rebalanceStores(ctx, storeID, rng, dsm)
526+
re := newRebalanceEnv(cs, rng, dsm, cs.ts.Now())
527+
re.rebalanceStores(ctx, storeID)
527528
rec := finishAndGet()
528529
var sb redact.StringBuilder
529530
rec.SafeFormatMinimal(&sb)

0 commit comments

Comments
 (0)