Skip to content

Commit a2e2f92

Browse files
committed
kvserver: plumb nodeCapacityProvider
Previously, we added a NodeCapacityProvider field to StoreGossip but left it nil in production code In addition, we had a TODO in 1b6a91f to find a more appropriate home for node capacity provider, since the runtime load monitor is a node-level concept and was being passed to NewStores. During implementation, we identified a better home for it. This commit moves NodeCapacityProvider to be a node level concept as part of StoreConfig and passes it to each store for use when constructing the store descriptor. Note that runtime load monitoring currently lacks caching and just computes periodically. We may consider this as a future improvement. Epic: none Release note: none
1 parent 72291ca commit a2e2f92

File tree

11 files changed

+140
-70
lines changed

11 files changed

+140
-70
lines changed

pkg/base/testing_knobs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,5 @@ type TestingKnobs struct {
5959
LicenseTestingKnobs ModuleTestingKnobs
6060
VecIndexTestingKnobs ModuleTestingKnobs
6161
PolicyRefresherTestingKnobs ModuleTestingKnobs
62+
NodeCapacityProviderKnobs ModuleTestingKnobs
6263
}

pkg/kv/kvserver/asim/gossip/gossip.go

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ type storeGossiper struct {
5151

5252
func newStoreGossiper(
5353
descriptorGetter func(cached bool) roachpb.StoreDescriptor,
54-
nodeCapacityProvider kvserver.NodeCapacityProvider,
5554
clock timeutil.TimeSource,
5655
st *cluster.Settings,
5756
) *storeGossiper {
@@ -62,9 +61,8 @@ func newStoreGossiper(
6261

6362
desc := sg.descriptorGetter(false /* cached */)
6463
knobs := kvserver.StoreGossipTestingKnobs{AsyncDisabled: true}
65-
sg.local = kvserver.NewStoreGossip(sg, sg, knobs, &st.SV, clock, nodeCapacityProvider)
64+
sg.local = kvserver.NewStoreGossip(sg, sg, knobs, &st.SV, clock)
6665
sg.local.Ident = roachpb.StoreIdent{StoreID: desc.StoreID, NodeID: desc.Node.NodeID}
67-
6866
return sg
6967
}
7068

@@ -119,26 +117,17 @@ func NewGossip(s state.State, settings *config.SimulationSettings) *gossip {
119117
return g
120118
}
121119

122-
var _ kvserver.NodeCapacityProvider = &simNodeCapacityProvider{}
123-
124-
type simNodeCapacityProvider struct {
125-
localNodeID state.NodeID
126-
state state.State
127-
}
128-
129-
func (s simNodeCapacityProvider) GetNodeCapacity(_ bool) roachpb.NodeCapacity {
130-
return s.state.NodeCapacity(s.localNodeID)
131-
}
132-
133120
func (g *gossip) addStoreToGossip(s state.State, storeID state.StoreID, nodeID state.NodeID) {
134121
// Add the store gossip in an "adding" state initially, this is to avoid
135122
// recursive calls to get the store descriptor.
136123
g.storeGossip[storeID] = &storeGossiper{addingStore: true}
137124
g.storeGossip[storeID] = newStoreGossiper(
138125
func(cached bool) roachpb.StoreDescriptor {
139-
return s.StoreDescriptors(cached, storeID)[0]
126+
nc := s.NodeCapacity(nodeID)
127+
desc := s.StoreDescriptors(cached, storeID)[0]
128+
desc.NodeCapacity = nc
129+
return desc
140130
},
141-
simNodeCapacityProvider{localNodeID: nodeID, state: s},
142131
s.Clock(), g.settings.ST)
143132
}
144133

pkg/kv/kvserver/store.go

Lines changed: 41 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
4848
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage/snaprecv"
4949
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
50+
loadmonitor "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load"
5051
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
5152
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/multiqueue"
5253
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry"
@@ -897,27 +898,28 @@ type Store struct {
897898
raftLogQueue *raftLogQueue // Raft log truncation queue
898899
// Carries out truncations proposed by the raft log queue, and "replicated"
899900
// via raft, when they are safe. Created in Store.Start.
900-
raftTruncator *raftLogTruncator
901-
raftSnapshotQueue *raftSnapshotQueue // Raft repair queue
902-
tsMaintenanceQueue *timeSeriesMaintenanceQueue // Time series maintenance queue
903-
scanner *replicaScanner // Replica scanner
904-
consistencyQueue *consistencyQueue // Replica consistency check queue
905-
consistencyLimiter *quotapool.RateLimiter // Rate limits consistency checks
906-
metrics *StoreMetrics
907-
intentResolver *intentresolver.IntentResolver
908-
recoveryMgr txnrecovery.Manager
909-
storeLiveness storeliveness.Fabric
910-
syncWaiters []*logstore.SyncWaiterLoop
911-
raftEntryCache *raftentry.Cache
912-
limiters batcheval.Limiters
913-
txnWaitMetrics *txnwait.Metrics
914-
raftMetrics *raft.Metrics
915-
sstSnapshotStorage snaprecv.SSTSnapshotStorage
916-
protectedtsReader spanconfig.ProtectedTSReader
917-
ctSender *sidetransport.Sender
918-
policyRefresher *policyrefresher.PolicyRefresher
919-
storeGossip *StoreGossip
920-
rebalanceObjManager *RebalanceObjectiveManager
901+
raftTruncator *raftLogTruncator
902+
raftSnapshotQueue *raftSnapshotQueue // Raft repair queue
903+
tsMaintenanceQueue *timeSeriesMaintenanceQueue // Time series maintenance queue
904+
scanner *replicaScanner // Replica scanner
905+
consistencyQueue *consistencyQueue // Replica consistency check queue
906+
consistencyLimiter *quotapool.RateLimiter // Rate limits consistency checks
907+
metrics *StoreMetrics
908+
intentResolver *intentresolver.IntentResolver
909+
recoveryMgr txnrecovery.Manager
910+
storeLiveness storeliveness.Fabric
911+
syncWaiters []*logstore.SyncWaiterLoop
912+
raftEntryCache *raftentry.Cache
913+
limiters batcheval.Limiters
914+
txnWaitMetrics *txnwait.Metrics
915+
raftMetrics *raft.Metrics
916+
sstSnapshotStorage snaprecv.SSTSnapshotStorage
917+
protectedtsReader spanconfig.ProtectedTSReader
918+
ctSender *sidetransport.Sender
919+
policyRefresher *policyrefresher.PolicyRefresher
920+
nodeCapacityProvider *loadmonitor.NodeCapacityProvider
921+
storeGossip *StoreGossip
922+
rebalanceObjManager *RebalanceObjectiveManager
921923

922924
// kvflowRangeControllerFactory is used for replication AC (flow control) V2
923925
// to create new range controllers which mediate the flow of requests to
@@ -1189,6 +1191,10 @@ type StoreConfig struct {
11891191
// leaseholder replicas. One per node.
11901192
PolicyRefresher *policyrefresher.PolicyRefresher
11911193

1194+
// NodeCapacityProvider is used to provide node capacity information for
1195+
// store.Descriptor.
1196+
NodeCapacityProvider *loadmonitor.NodeCapacityProvider
1197+
11921198
// TimeSeriesDataStore is an interface used by the store's time series
11931199
// maintenance queue to dispatch individual maintenance tasks.
11941200
TimeSeriesDataStore TimeSeriesDataStore
@@ -1485,6 +1491,7 @@ func NewStore(
14851491
metrics: newStoreMetrics(cfg.HistogramWindowInterval),
14861492
ctSender: cfg.ClosedTimestampSender,
14871493
policyRefresher: cfg.PolicyRefresher,
1494+
nodeCapacityProvider: cfg.NodeCapacityProvider,
14881495
ioThresholds: &iot,
14891496
rangeFeedSlowClosedTimestampNudge: singleflight.NewGroup("rangfeed-ct-nudge", "range"),
14901497
}
@@ -1726,9 +1733,13 @@ func NewStore(
17261733
updateSystemConfigUpdateQueueLimits)
17271734

17281735
if s.cfg.Gossip != nil {
1729-
// TODO(wenyihu6): pass nodeCapacityProvider properly in production code.
1730-
s.storeGossip = NewStoreGossip(cfg.Gossip,
1731-
s, cfg.TestingKnobs.GossipTestingKnobs, &cfg.Settings.SV, timeutil.DefaultTimeSource{}, nil)
1736+
s.storeGossip = NewStoreGossip(
1737+
cfg.Gossip,
1738+
s,
1739+
cfg.TestingKnobs.GossipTestingKnobs,
1740+
&cfg.Settings.SV,
1741+
timeutil.DefaultTimeSource{},
1742+
)
17321743

17331744
// Add range scanner and configure with queues.
17341745
s.scanner = newReplicaScanner(
@@ -3276,11 +3287,12 @@ func (s *Store) Descriptor(ctx context.Context, useCached bool) (*roachpb.StoreD
32763287

32773288
// Initialize the store descriptor.
32783289
return &roachpb.StoreDescriptor{
3279-
StoreID: s.Ident.StoreID,
3280-
Attrs: s.Attrs(),
3281-
Node: *s.nodeDesc,
3282-
Capacity: capacity,
3283-
Properties: s.Properties(),
3290+
StoreID: s.Ident.StoreID,
3291+
Attrs: s.Attrs(),
3292+
Node: *s.nodeDesc,
3293+
Capacity: capacity,
3294+
Properties: s.Properties(),
3295+
NodeCapacity: s.nodeCapacityProvider.GetNodeCapacity(useCached),
32843296
}, nil
32853297
}
32863298

pkg/kv/kvserver/store_gossip.go

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,6 @@ type StoreGossip struct {
249249
descriptorGetter StoreDescriptorProvider
250250
sv *settings.Values
251251
clock timeutil.TimeSource
252-
ncProvider NodeCapacityProvider
253252
}
254253

255254
// StoreGossipTestingKnobs defines the testing knobs specific to StoreGossip.
@@ -281,7 +280,6 @@ func NewStoreGossip(
281280
testingKnobs StoreGossipTestingKnobs,
282281
sv *settings.Values,
283282
clock timeutil.TimeSource,
284-
nodeCapacityProvider NodeCapacityProvider,
285283
) *StoreGossip {
286284
return &StoreGossip{
287285
cachedCapacity: &cachedCapacity{},
@@ -290,7 +288,6 @@ func NewStoreGossip(
290288
knobs: testingKnobs,
291289
sv: sv,
292290
clock: clock,
293-
ncProvider: nodeCapacityProvider,
294291
}
295292
}
296293

@@ -382,12 +379,6 @@ func (s *StoreGossip) GossipStore(ctx context.Context, useCached bool) error {
382379
return errors.Wrapf(err, "problem getting store descriptor for store %+v", s.Ident)
383380
}
384381

385-
// TODO(wenyihu6): ncProvider is nil during production code. Only populated
386-
// for asim.
387-
if s.ncProvider != nil {
388-
storeDesc.NodeCapacity = s.ncProvider.GetNodeCapacity(useCached)
389-
}
390-
391382
// Set countdown target for re-gossiping capacity to be large enough that
392383
// it would only occur when there has been significant changes. We
393384
// currently gossip every 10 seconds, meaning that unless significant
@@ -582,7 +573,3 @@ func deltaExceedsThreshold(
582573
exceeds = deltaAbsolute >= requiredMinDelta && deltaFraction >= requiredDeltaFraction
583574
return exceeds, delta
584575
}
585-
586-
type NodeCapacityProvider interface {
587-
GetNodeCapacity(useCached bool) roachpb.NodeCapacity
588-
}

pkg/kv/kvserver/store_gossip_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ func TestStoreGossipDeltaTrigger(t *testing.T) {
108108
cfg.TestingKnobs.GossipTestingKnobs,
109109
&cluster.MakeTestingClusterSettings().SV,
110110
timeutil.DefaultTimeSource{},
111-
nil,
112111
)
113112
sg.cachedCapacity.cached = tc.cached
114113
sg.cachedCapacity.lastGossiped = tc.lastGossiped

pkg/kv/kvserver/store_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
3939
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
4040
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
41+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/load"
4142
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
4243
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
4344
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness"
@@ -259,6 +260,9 @@ func createTestStoreWithoutStart(
259260

260261
stores := NewStores(cfg.AmbientCtx, cfg.Clock)
261262
nodeDesc := &roachpb.NodeDescriptor{NodeID: 1}
263+
if cfg.NodeCapacityProvider == nil {
264+
cfg.NodeCapacityProvider = load.NewNodeCapacityProvider(stopper, stores, nil)
265+
}
262266

263267
rangeProv := &dummyFirstRangeProvider{}
264268
var storeSender struct{ kv.Sender }
@@ -4215,3 +4219,52 @@ func BenchmarkStoreGetReplica(b *testing.B) {
42154219
}
42164220
})
42174221
}
4222+
4223+
// TestNewNodeCapacityProviderCluster tests the basic functionality of the
4224+
// NodeCapacityProvider with a real cluster.
4225+
func TestNewNodeCapacityProviderCluster(t *testing.T) {
4226+
defer leaktest.AfterTest(t)()
4227+
defer log.Scope(t).Close(t)
4228+
4229+
numNodes := 3
4230+
numStoresPerNode := 2
4231+
var storeSpecs []base.StoreSpec
4232+
for i := 0; i < numStoresPerNode; i++ {
4233+
storeSpecs = append(storeSpecs, base.StoreSpec{InMemory: true})
4234+
}
4235+
serverArgs := base.TestServerArgs{
4236+
Knobs: base.TestingKnobs{
4237+
NodeCapacityProviderKnobs: &load.NodeCapacityProviderTestingKnobs{
4238+
CpuUsageRefreshInterval: 1 * time.Millisecond,
4239+
CpuCapacityRefreshInterval: 1 * time.Millisecond,
4240+
},
4241+
}, StoreSpecs: storeSpecs}
4242+
tcArgs := base.TestClusterArgs{
4243+
ParallelStart: true,
4244+
ReplicationMode: base.ReplicationManual, // saves time
4245+
ServerArgsPerNode: map[int]base.TestServerArgs{
4246+
0: serverArgs,
4247+
1: serverArgs,
4248+
},
4249+
}
4250+
4251+
ctx := context.Background()
4252+
tc := serverutils.StartCluster(t, numNodes, tcArgs)
4253+
defer tc.Stopper().Stop(ctx)
4254+
store, err := tc.Server(0).GetStores().(*Stores).GetStore(tc.Server(0).GetFirstStoreID())
4255+
require.NoError(t, err)
4256+
testutils.SucceedsSoon(t, func() error {
4257+
storeDesc, err := store.Descriptor(ctx, false /*useCached*/)
4258+
require.NoError(t, err)
4259+
nc := storeDesc.NodeCapacity
4260+
require.Equal(t, int32(numStoresPerNode), nc.NumStores)
4261+
if nc.NodeCPURateUsage == 0 || nc.NodeCPURateCapacity == 0 || nc.StoresCPURate == 0 {
4262+
return errors.Newf(
4263+
"CPU usage or capacity is 0: node cpu rate usage %v, node cpu rate capacity %v, stores cpu rate %v",
4264+
nc.NodeCPURateUsage, nc.NodeCPURateCapacity, nc.StoresCPURate)
4265+
}
4266+
// TODO(wenyihu6): NodeCPURateCapacity <= NodeCPURateUsage fails on CI and
4267+
// requires more investigation.
4268+
return nil
4269+
})
4270+
}

pkg/kv/kvserver/stores.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -336,8 +336,17 @@ func (ls *Stores) GetStoreMetricRegistry(storeID roachpb.StoreID) *metric.Regist
336336
return nil
337337
}
338338

339-
// TODO(wenyihu6): Implment properly for production code. This now just returns
340-
// an empty struct and unused everywhere.
341-
func (ls *Stores) GetNodeCapacity(useCached bool) roachpb.NodeCapacity {
342-
panic("unimplemented")
339+
// GetAggregatedStoreStats returns the aggregated cpu usage across all stores and
340+
// the count of stores.
341+
func (ls *Stores) GetAggregatedStoreStats(useCached bool) (storesCPURate int64, numStores int32) {
342+
_ = ls.VisitStores(func(s *Store) error {
343+
c, err := s.Capacity(context.Background(), useCached)
344+
if err != nil {
345+
panic(err)
346+
}
347+
storesCPURate += int64(c.CPUPerSecond)
348+
numStores++
349+
return nil
350+
})
351+
return storesCPURate, numStores
343352
}

pkg/server/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ go_library(
142142
"//pkg/kv/kvserver/kvstorage",
143143
"//pkg/kv/kvserver/liveness",
144144
"//pkg/kv/kvserver/liveness/livenesspb",
145+
"//pkg/kv/kvserver/load",
145146
"//pkg/kv/kvserver/logstore",
146147
"//pkg/kv/kvserver/loqrecovery",
147148
"//pkg/kv/kvserver/loqrecovery/loqrecoverypb",

pkg/server/server.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import (
4848
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
4949
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
5050
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
51+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/load"
5152
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
5253
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery"
5354
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
@@ -161,13 +162,14 @@ type topLevelServer struct {
161162
appRegistry *metric.Registry
162163
sysRegistry *metric.Registry
163164

164-
recorder *status.MetricsRecorder
165-
runtime *status.RuntimeStatSampler
166-
ruleRegistry *metric.RuleRegistry
167-
promRuleExporter *metric.PrometheusRuleExporter
168-
updates *diagnostics.UpdateChecker
169-
ctSender *sidetransport.Sender
170-
policyRefresher *policyrefresher.PolicyRefresher
165+
recorder *status.MetricsRecorder
166+
runtime *status.RuntimeStatSampler
167+
ruleRegistry *metric.RuleRegistry
168+
promRuleExporter *metric.PrometheusRuleExporter
169+
updates *diagnostics.UpdateChecker
170+
ctSender *sidetransport.Sender
171+
policyRefresher *policyrefresher.PolicyRefresher
172+
nodeCapacityProvider *load.NodeCapacityProvider
171173

172174
http *httpServer
173175
adminAuthzCheck privchecker.CheckerForRPCHandlers
@@ -688,6 +690,14 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
688690
policyRefresher = policyrefresher.NewPolicyRefresher(stopper, st, ctSender.GetLeaseholders,
689691
rpcContext.RemoteClocks.AllLatencies, knobs)
690692
}
693+
var nodeCapacityProvider *load.NodeCapacityProvider
694+
{
695+
var knobs *load.NodeCapacityProviderTestingKnobs
696+
if nodeCapacityProviderKnobs := cfg.TestingKnobs.NodeCapacityProviderKnobs; nodeCapacityProviderKnobs != nil {
697+
knobs = nodeCapacityProviderKnobs.(*load.NodeCapacityProviderTestingKnobs)
698+
}
699+
nodeCapacityProvider = load.NewNodeCapacityProvider(stopper, stores, knobs)
700+
}
691701

692702
// The Executor will be further initialized later, as we create more
693703
// of the server's components. There's a circular dependency - many things
@@ -907,6 +917,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
907917
ClosedTimestampSender: ctSender,
908918
ClosedTimestampReceiver: ctReceiver,
909919
PolicyRefresher: policyRefresher,
920+
NodeCapacityProvider: nodeCapacityProvider,
910921
ProtectedTimestampReader: protectedTSReader,
911922
EagerLeaseAcquisitionLimiter: eagerLeaseAcquisitionLimiter,
912923
KVMemoryMonitor: kvMemoryMonitor,
@@ -1340,6 +1351,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
13401351
updates: updates,
13411352
ctSender: ctSender,
13421353
policyRefresher: policyRefresher,
1354+
nodeCapacityProvider: nodeCapacityProvider,
13431355
runtime: runtimeSampler,
13441356
http: sHTTP,
13451357
adminAuthzCheck: adminAuthzCheck,
@@ -2177,6 +2189,10 @@ func (s *topLevelServer) PreStart(ctx context.Context) error {
21772189
// closed timestamp policies for ranges periodically.
21782190
s.policyRefresher.Run(workersCtx)
21792191

2192+
// Start node capacity provider in the background. It refreshes node cpu usage
2193+
// and capacity for store descriptor.
2194+
s.nodeCapacityProvider.Run(workersCtx)
2195+
21802196
// Start dispatching extant flow tokens.
21812197
if err := s.raftTransport.Start(workersCtx); err != nil {
21822198
return err

0 commit comments

Comments
 (0)