Skip to content

Commit 7b39ad9

Browse files
craig[bot]wenyihu6
andcommitted
Merge #149892
149892: asim: port over more asim changes from prototype r=tbg a=wenyihu6 **asim: correct data driven input command for example_fullfisk** Previously, we started tracking logical bytes from follower replicas, which increased the used store capacity. Since gen_cluster defaults to a 256 GiB store_byte_capacity, this was no longer sufficient and resulted in a fraction used over 100%. This commit updates the data driven input to match the current prototype input. Epic: none Release note: none --- **kvserver: track write bytes per sec properly** Previously, we introduced write bytes per second as part of the store descriptor's capacity. This commit updates the corresponding capacity tracking logic in the production code properly, along with the gossip logic that handles capacity change events. Note that the actual capacity info is not yet used by any production logic in a meaningful way. Epic: none Release note: none --- **asim: fix inconsistent range state** Previously, capacity change events could be published while a range was still in an inconsistent state (such as, after the leaseholder was removed but before a new one was added). Without this fix, future commits that populate the node capacity provider would trigger a panic: node capacity calls RangeUsageInfo to aggregate CPU rate across replicas, which asserts on the range having a valid leaseholder. In this inconsistent state, the leaseholder can't be found, leading to a panic. Epic: none Release note: none --- **asim: introduce NodeCapacityProvider** Previously, we added state.NodeCapacity but didn’t implement the logic to retrieve actual node capacity. This commit introduces a NodeCapacityProvider interface, implemented by the asim’s state and Stores in production code. Node capacity provider is saved as a field in StoreGossip. It will later be used to populate node capacity of store descriptor before gossiping. Note that node capacity provider remains nil in production to avoid changing existing behavior. Epic: none Release note: none --- **asim: move store pool to be at the node level** Previously, asim created a separate store pool per store, unlike production code, which shares a single store pool among stores on the same node. This commit updates asim to align with the behaviour. Epic: none Release note: none --- **asim: introduce MMAllocator at node level** This commit introduces construction of MMAllocator at the node level. Note that the old allocator is still kept around at the store level since only MMA store rebalancer would use MMAAllocator. Epic: none Release note: none --- **asim: remove unused storepool field from store** Previously, we moved the store pool from the store level to the node level and removed the usage of store pool from the store. This commit cleans up the now unused storePool field. Epic: none Release note: none --- **asim: use cluster setting for some non-simulation settings** Previously, we began populating cluster settings in the simulator to allow direct use of non-simulation settings. This commit updates LoadThreshold, kvserver.LBRebalancingObjective, and allocatorimpl.LoadRebalanceRequiredMinDiff to use values from the populated cluster settings directly. Epic: none Release note: none --- **asim: panic if ticket not found in store rebalancer** This commit adds a panic assertion to ensure pending ticket exists in store rebalancer asim since asim should not delete a ticket from controller.tickets. Epic: none Relese note: none Co-authored-by: wenyihu6 <[email protected]>
2 parents 080d8ad + fae892b commit 7b39ad9

22 files changed

+266
-179
lines changed

pkg/kv/kvserver/allocator/storepool/store_pool.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,7 @@ func (sp *StorePool) UpdateLocalStoreAfterRebalance(
547547
detail.Desc.Capacity.RangeCount++
548548
detail.Desc.Capacity.LogicalBytes += rangeUsageInfo.LogicalBytes
549549
detail.Desc.Capacity.WritesPerSecond += rangeUsageInfo.WritesPerSecond
550+
detail.Desc.Capacity.WriteBytesPerSecond += rangeUsageInfo.WriteBytesPerSecond
550551
if detail.Desc.Capacity.CPUPerSecond >= 0 {
551552
detail.Desc.Capacity.CPUPerSecond += rangeUsageInfo.RaftCPUNanosPerSecond
552553
}
@@ -562,6 +563,11 @@ func (sp *StorePool) UpdateLocalStoreAfterRebalance(
562563
} else {
563564
detail.Desc.Capacity.WritesPerSecond -= rangeUsageInfo.WritesPerSecond
564565
}
566+
if detail.Desc.Capacity.WriteBytesPerSecond <= rangeUsageInfo.WriteBytesPerSecond {
567+
detail.Desc.Capacity.WriteBytesPerSecond = 0
568+
} else {
569+
detail.Desc.Capacity.WriteBytesPerSecond -= rangeUsageInfo.WriteBytesPerSecond
570+
}
565571
// When CPU attribution is unsupported, the store will set the
566572
// CPUPerSecond of its store capacity to be -1.
567573
if detail.Desc.Capacity.CPUPerSecond >= 0 {

pkg/kv/kvserver/asim/asim.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func (s *Simulator) StoreAddNotify(storeID state.StoreID, _ state.State) {
131131
}
132132

133133
func (s *Simulator) addStore(storeID state.StoreID, tick time.Time) {
134-
allocator := s.state.MakeAllocator(storeID)
134+
allocator := s.state.Allocator(storeID)
135135
storePool := s.state.StorePool(storeID)
136136
s.rqs[storeID] = queue.NewReplicateQueue(
137137
storeID,

pkg/kv/kvserver/asim/config/settings.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ const (
3030
defaultLBRebalancingMode = 2 // Leases and replicas.
3131
defaultLBRebalancingInterval = time.Minute
3232
defaultLBRebalanceQPSThreshold = 0.1
33-
defaultLBMinRequiredQPSDiff = 200
3433
defaultLBRebalancingObjective = 0 // QPS
3534
)
3635

@@ -105,13 +104,6 @@ type SimulationSettings struct {
105104
// LBRebalancingInterval controls how often the store rebalancer will
106105
// consider opportunities for rebalancing.
107106
LBRebalancingInterval time.Duration
108-
// LBRebalanceQPSThreshold is the fraction above or below the mean store QPS,
109-
// that a store is considered overfull or underfull.
110-
LBRebalanceQPSThreshold float64
111-
// LBMinQPSDifferenceForTransfers is the minimum QPS difference that the store
112-
// rebalancer would care to reconcile (via lease or replica rebalancing) between
113-
// any two stores.
114-
LBMinRequiredQPSDiff float64
115107
// ReplicateQueueEnabled controls whether the replicate queue is enabled.
116108
ReplicateQueueEnabled bool
117109
// LeaseQueueEnabled controls whether the lease queue is enabled.
@@ -147,8 +139,6 @@ func DefaultSimulationSettings() *SimulationSettings {
147139
LBRebalancingMode: defaultLBRebalancingMode,
148140
LBRebalancingObjective: defaultLBRebalancingObjective,
149141
LBRebalancingInterval: defaultLBRebalancingInterval,
150-
LBRebalanceQPSThreshold: defaultLBRebalanceQPSThreshold,
151-
LBMinRequiredQPSDiff: defaultLBMinRequiredQPSDiff,
152142
ReplicateQueueEnabled: true,
153143
LeaseQueueEnabled: true,
154144
SplitQueueEnabled: true,

pkg/kv/kvserver/asim/gossip/gossip.go

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ type storeGossiper struct {
5151

5252
func newStoreGossiper(
5353
descriptorGetter func(cached bool) roachpb.StoreDescriptor,
54+
nodeCapacityProvider kvserver.NodeCapacityProvider,
5455
clock timeutil.TimeSource,
5556
st *cluster.Settings,
5657
) *storeGossiper {
@@ -61,7 +62,7 @@ func newStoreGossiper(
6162

6263
desc := sg.descriptorGetter(false /* cached */)
6364
knobs := kvserver.StoreGossipTestingKnobs{AsyncDisabled: true}
64-
sg.local = kvserver.NewStoreGossip(sg, sg, knobs, &st.SV, clock)
65+
sg.local = kvserver.NewStoreGossip(sg, sg, knobs, &st.SV, clock, nodeCapacityProvider)
6566
sg.local.Ident = roachpb.StoreIdent{StoreID: desc.StoreID, NodeID: desc.Node.NodeID}
6667

6768
return sg
@@ -110,21 +111,35 @@ func NewGossip(s state.State, settings *config.SimulationSettings) *gossip {
110111
},
111112
}
112113
for _, store := range s.Stores() {
113-
g.addStoreToGossip(s, store.StoreID())
114+
g.addStoreToGossip(s, store.StoreID(), store.NodeID())
114115
}
115116
s.RegisterCapacityChangeListener(g)
116117
s.RegisterCapacityListener(g)
117118
s.RegisterConfigChangeListener(g)
118119
return g
119120
}
120121

121-
func (g *gossip) addStoreToGossip(s state.State, storeID state.StoreID) {
122+
var _ kvserver.NodeCapacityProvider = &simNodeCapacityProvider{}
123+
124+
type simNodeCapacityProvider struct {
125+
localNodeID state.NodeID
126+
state state.State
127+
}
128+
129+
func (s simNodeCapacityProvider) GetNodeCapacity(_ bool) roachpb.NodeCapacity {
130+
return s.state.NodeCapacity(s.localNodeID)
131+
}
132+
133+
func (g *gossip) addStoreToGossip(s state.State, storeID state.StoreID, nodeID state.NodeID) {
122134
// Add the store gossip in an "adding" state initially, this is to avoid
123135
// recursive calls to get the store descriptor.
124136
g.storeGossip[storeID] = &storeGossiper{addingStore: true}
125-
g.storeGossip[storeID] = newStoreGossiper(func(cached bool) roachpb.StoreDescriptor {
126-
return s.StoreDescriptors(cached, storeID)[0]
127-
}, s.Clock(), g.settings.ST)
137+
g.storeGossip[storeID] = newStoreGossiper(
138+
func(cached bool) roachpb.StoreDescriptor {
139+
return s.StoreDescriptors(cached, storeID)[0]
140+
},
141+
simNodeCapacityProvider{localNodeID: nodeID, state: s},
142+
s.Clock(), g.settings.ST)
128143
}
129144

130145
// Tick checks for completed gossip updates and triggers new gossip
@@ -137,7 +152,7 @@ func (g *gossip) Tick(ctx context.Context, tick time.Time, s state.State) {
137152
// If the store gossip for this store doesn't yet exist, create it and
138153
// add it to the map of store gossips.
139154
if sg, ok = g.storeGossip[store.StoreID()]; !ok {
140-
g.addStoreToGossip(s, store.StoreID())
155+
g.addStoreToGossip(s, store.StoreID(), store.NodeID())
141156
}
142157

143158
// If the interval between the last time this store was gossiped for
@@ -183,6 +198,9 @@ func (g *gossip) NewCapacityNotify(capacity roachpb.StoreCapacity, storeID state
183198
if sg, ok := g.storeGossip[storeID]; ok {
184199
if !sg.addingStore {
185200
sg.local.UpdateCachedCapacity(capacity)
201+
sg.local.RecordNewPerSecondStats(
202+
capacity.QueriesPerSecond, capacity.WritesPerSecond,
203+
capacity.WriteBytesPerSecond, capacity.CPUPerSecond)
186204
}
187205
} else {
188206
panic(
@@ -194,7 +212,8 @@ func (g *gossip) NewCapacityNotify(capacity roachpb.StoreCapacity, storeID state
194212

195213
// StoreAddNotify notifies that a new store has been added with ID storeID.
196214
func (g *gossip) StoreAddNotify(storeID state.StoreID, s state.State) {
197-
g.addStoreToGossip(s, storeID)
215+
store, _ := s.Store(storeID)
216+
g.addStoreToGossip(s, storeID, store.NodeID())
198217
}
199218

200219
func (g *gossip) maybeUpdateState(tick time.Time, s state.State) {
@@ -210,7 +229,7 @@ func (g *gossip) maybeUpdateState(tick time.Time, s state.State) {
210229
for _, update := range updates {
211230
updateMap[update.Desc.StoreID] = update
212231
}
213-
for _, store := range s.Stores() {
214-
s.UpdateStorePool(store.StoreID(), updateMap)
232+
for _, node := range s.Nodes() {
233+
s.UpdateStorePool(node.NodeID(), updateMap)
215234
}
216235
}

pkg/kv/kvserver/asim/op/controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ func TestRelocateRangeOp(t *testing.T) {
281281
settings,
282282
)
283283
changer := state.NewReplicaChanger()
284-
allocator := s.MakeAllocator(state.StoreID(1))
284+
allocator := s.Allocator(state.StoreID(1))
285285
storePool := s.StorePool(state.StoreID(1))
286286
controller := NewController(changer, allocator, storePool, settings, 1 /* storeID */)
287287

pkg/kv/kvserver/asim/queue/lease_queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func TestLeaseQueue(t *testing.T) {
142142
store.StoreID(),
143143
changer,
144144
testSettings,
145-
s.MakeAllocator(store.StoreID()),
145+
s.Allocator(store.StoreID()),
146146
s.StorePool(store.StoreID()),
147147
start,
148148
)

pkg/kv/kvserver/asim/queue/replicate_queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ func TestReplicateQueue(t *testing.T) {
291291
store.StoreID(),
292292
changer,
293293
testSettings,
294-
s.MakeAllocator(store.StoreID()),
294+
s.Allocator(store.StoreID()),
295295
s.StorePool(store.StoreID()),
296296
start,
297297
)

pkg/kv/kvserver/asim/state/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ go_library(
2525
"//pkg/kv/kvserver",
2626
"//pkg/kv/kvserver/allocator",
2727
"//pkg/kv/kvserver/allocator/allocatorimpl",
28+
"//pkg/kv/kvserver/allocator/mmaprototype",
2829
"//pkg/kv/kvserver/allocator/storepool",
2930
"//pkg/kv/kvserver/asim/config",
3031
"//pkg/kv/kvserver/asim/workload",

pkg/kv/kvserver/asim/state/helpers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func NewStorePool(
6363
nodeLivenessFn storepool.NodeLivenessFunc,
6464
hlc *hlc.Clock,
6565
st *cluster.Settings,
66-
) (*storepool.StorePool, *cluster.Settings) {
66+
) *storepool.StorePool {
6767
stopper := stop.NewStopper()
6868
defer stopper.Stop(context.Background())
6969

@@ -80,7 +80,7 @@ func NewStorePool(
8080
nodeLivenessFn,
8181
/* deterministic */ true,
8282
)
83-
return sp, st
83+
return sp
8484
}
8585

8686
// OffsetTick offsets start time by adding tick number of seconds to it.

0 commit comments

Comments
 (0)