Skip to content

Commit 9272e28

Browse files
craig[bot]wenyihu6
andcommitted
Merge #150275
150275: kvserver/load: add node capacity provider r=tbg a=wenyihu6 **kvserver/load: add node capacity provider** This commit introduces NodeCapacityProvider, which monitors runtime cpu usage, capacity and also aggregates cpu usage across all stores. It is not yet integrated with other components. Future commits will hook it into each store to populate the node capacity field in the store descriptor. Epic: none Release none --- **kvserver: plumb nodeCapacityProvider** Previously, we added a NodeCapacityProvider field to StoreGossip but left it nil in production code In addition, we had a TODO in 1b6a91f to find a more appropriate home for node capacity provider, since the runtime load monitor is a node-level concept and was being passed to NewStores. During implementation, we identified a better home for it. This commit moves NodeCapacityProvider to be a node level concept as part of StoreConfig and passes it to each store for use when constructing the store descriptor. Note that runtime load monitoring currently lacks caching and just computes periodically. We may consider this as a future improvement. Epic: none Release note: none Co-authored-by: wenyihu6 <[email protected]>
2 parents d0a7b61 + a2e2f92 commit 9272e28

18 files changed

+459
-77
lines changed

pkg/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ ALL_TESTS = [
265265
"//pkg/kv/kvserver/kvstorage:kvstorage_test",
266266
"//pkg/kv/kvserver/leases:leases_test",
267267
"//pkg/kv/kvserver/liveness:liveness_test",
268+
"//pkg/kv/kvserver/load:load_test",
268269
"//pkg/kv/kvserver/lockspanset:lockspanset_test",
269270
"//pkg/kv/kvserver/logstore:logstore_test",
270271
"//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb_test",
@@ -1547,6 +1548,7 @@ GO_TARGETS = [
15471548
"//pkg/kv/kvserver/liveness:liveness",
15481549
"//pkg/kv/kvserver/liveness:liveness_test",
15491550
"//pkg/kv/kvserver/load:load",
1551+
"//pkg/kv/kvserver/load:load_test",
15501552
"//pkg/kv/kvserver/lockspanset:lockspanset",
15511553
"//pkg/kv/kvserver/lockspanset:lockspanset_test",
15521554
"//pkg/kv/kvserver/logstore:logstore",

pkg/base/testing_knobs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,5 @@ type TestingKnobs struct {
5959
LicenseTestingKnobs ModuleTestingKnobs
6060
VecIndexTestingKnobs ModuleTestingKnobs
6161
PolicyRefresherTestingKnobs ModuleTestingKnobs
62+
NodeCapacityProviderKnobs ModuleTestingKnobs
6263
}

pkg/kv/kvserver/asim/gossip/gossip.go

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ type storeGossiper struct {
5151

5252
func newStoreGossiper(
5353
descriptorGetter func(cached bool) roachpb.StoreDescriptor,
54-
nodeCapacityProvider kvserver.NodeCapacityProvider,
5554
clock timeutil.TimeSource,
5655
st *cluster.Settings,
5756
) *storeGossiper {
@@ -62,9 +61,8 @@ func newStoreGossiper(
6261

6362
desc := sg.descriptorGetter(false /* cached */)
6463
knobs := kvserver.StoreGossipTestingKnobs{AsyncDisabled: true}
65-
sg.local = kvserver.NewStoreGossip(sg, sg, knobs, &st.SV, clock, nodeCapacityProvider)
64+
sg.local = kvserver.NewStoreGossip(sg, sg, knobs, &st.SV, clock)
6665
sg.local.Ident = roachpb.StoreIdent{StoreID: desc.StoreID, NodeID: desc.Node.NodeID}
67-
6866
return sg
6967
}
7068

@@ -119,26 +117,17 @@ func NewGossip(s state.State, settings *config.SimulationSettings) *gossip {
119117
return g
120118
}
121119

122-
var _ kvserver.NodeCapacityProvider = &simNodeCapacityProvider{}
123-
124-
type simNodeCapacityProvider struct {
125-
localNodeID state.NodeID
126-
state state.State
127-
}
128-
129-
func (s simNodeCapacityProvider) GetNodeCapacity(_ bool) roachpb.NodeCapacity {
130-
return s.state.NodeCapacity(s.localNodeID)
131-
}
132-
133120
func (g *gossip) addStoreToGossip(s state.State, storeID state.StoreID, nodeID state.NodeID) {
134121
// Add the store gossip in an "adding" state initially, this is to avoid
135122
// recursive calls to get the store descriptor.
136123
g.storeGossip[storeID] = &storeGossiper{addingStore: true}
137124
g.storeGossip[storeID] = newStoreGossiper(
138125
func(cached bool) roachpb.StoreDescriptor {
139-
return s.StoreDescriptors(cached, storeID)[0]
126+
nc := s.NodeCapacity(nodeID)
127+
desc := s.StoreDescriptors(cached, storeID)[0]
128+
desc.NodeCapacity = nc
129+
return desc
140130
},
141-
simNodeCapacityProvider{localNodeID: nodeID, state: s},
142131
s.Clock(), g.settings.ST)
143132
}
144133

pkg/kv/kvserver/load/BUILD.bazel

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,38 @@
1-
load("@io_bazel_rules_go//go:def.bzl", "go_library")
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "load",
55
srcs = [
6+
"node_capacity_provider.go",
67
"record_replica_load.go",
78
"replica_load.go",
9+
"testing_knobs.go",
810
],
911
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load",
1012
visibility = ["//visibility:public"],
1113
deps = [
1214
"//pkg/kv/kvserver/replicastats",
1315
"//pkg/roachpb",
16+
"//pkg/server/status",
17+
"//pkg/util/buildutil",
1418
"//pkg/util/hlc",
19+
"//pkg/util/log",
20+
"//pkg/util/stop",
1521
"//pkg/util/syncutil",
1622
"//pkg/util/timeutil",
23+
"@com_github_cockroachdb_errors//:errors",
24+
"@com_github_vividcortex_ewma//:ewma",
25+
],
26+
)
27+
28+
go_test(
29+
name = "load_test",
30+
srcs = ["node_capacity_provider_test.go"],
31+
deps = [
32+
":load",
33+
"//pkg/testutils",
34+
"//pkg/util/stop",
35+
"@com_github_cockroachdb_errors//:errors",
36+
"@com_github_stretchr_testify//require",
1737
],
1838
)
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package load
7+
8+
import (
9+
"context"
10+
"time"
11+
12+
"github.com/VividCortex/ewma"
13+
"github.com/cockroachdb/cockroach/pkg/roachpb"
14+
"github.com/cockroachdb/cockroach/pkg/server/status"
15+
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
16+
"github.com/cockroachdb/cockroach/pkg/util/log"
17+
"github.com/cockroachdb/cockroach/pkg/util/stop"
18+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
19+
"github.com/cockroachdb/errors"
20+
)
21+
22+
// StoresStatsAggregator provides aggregated cpu usage stats across all stores.
23+
type StoresStatsAggregator interface {
24+
// GetAggregatedStoreStats returns the total cpu usage across all stores and
25+
// the count of stores. If useCached is true, it uses the cached store
26+
// descriptor instead of computing new ones. Implemented by Stores.
27+
GetAggregatedStoreStats(useCached bool) (aggregatedCPUUsage int64, totalStoreCount int32)
28+
}
29+
30+
// NodeCapacityProvider reports node-level cpu usage and capacity by sampling
31+
// runtime stats and aggregating store-level cpu capacity across all stores. It
32+
// is used by Store to populate the NodeCapacity field in the StoreDescriptor.
33+
type NodeCapacityProvider struct {
34+
stores StoresStatsAggregator
35+
runtimeLoadMonitor *runtimeLoadMonitor
36+
}
37+
38+
// NewNodeCapacityProvider creates a new NodeCapacityProvider that monitors CPU
39+
// metrics using the provided stores aggregator. The optional knobs parameter
40+
// allows customizing refresh intervals for testing.
41+
func NewNodeCapacityProvider(
42+
stopper *stop.Stopper, stores StoresStatsAggregator, knobs *NodeCapacityProviderTestingKnobs,
43+
) *NodeCapacityProvider {
44+
if stopper == nil || stores == nil {
45+
panic("programming error: stopper or stores aggregator cannot be nil")
46+
}
47+
48+
// refreshIntervals define how frequently cpu metrics are updated.
49+
const (
50+
// defaultCPUUsageRefreshInterval controls how often cpu usage measurements
51+
// are taken.
52+
defaultCPUUsageRefreshInterval = time.Second
53+
// defaultCPUCapacityRefreshInterval controls how often the total CPU
54+
// capacity of the node is re-calculated. This is less frequent than usage
55+
// since capacity changes happen less often.
56+
defaultCPUCapacityRefreshInterval = 10 * time.Second
57+
)
58+
59+
// defaultMovingAverageAge defines the effective time window size. With a
60+
// value of 20, the 20th-to-last measurement contributes meaningfully to the
61+
// average, while earlier measurements have diminishing impact.
62+
const defaultMovingAverageAge = 20
63+
64+
usageInterval := defaultCPUUsageRefreshInterval
65+
capacityInterval := defaultCPUCapacityRefreshInterval
66+
if knobs != nil {
67+
usageInterval = knobs.CpuUsageRefreshInterval
68+
capacityInterval = knobs.CpuCapacityRefreshInterval
69+
}
70+
71+
monitor := &runtimeLoadMonitor{
72+
stopper: stopper,
73+
usageRefreshInterval: usageInterval,
74+
capacityRefreshInterval: capacityInterval,
75+
}
76+
monitor.mu.usageEWMA = ewma.NewMovingAverage(defaultMovingAverageAge)
77+
monitor.recordCPUCapacity(context.Background())
78+
return &NodeCapacityProvider{
79+
stores: stores,
80+
runtimeLoadMonitor: monitor,
81+
}
82+
}
83+
84+
// Run starts the background monitoring of cpu metrics.
85+
func (n *NodeCapacityProvider) Run(ctx context.Context) {
86+
_ = n.runtimeLoadMonitor.stopper.RunAsyncTask(ctx, "runtime-load-monitor", func(ctx context.Context) {
87+
n.runtimeLoadMonitor.run(ctx)
88+
})
89+
}
90+
91+
// GetNodeCapacity returns the NodeCapacity which node-level cpu usage and
92+
// capacity and aggregated store-level cpu usage. If useCached is true, it will
93+
// use cached store descriptors to aggregate the sum of store-level cpu
94+
// capacity.
95+
func (n *NodeCapacityProvider) GetNodeCapacity(useCached bool) roachpb.NodeCapacity {
96+
storesCPURate, numStores := n.stores.GetAggregatedStoreStats(useCached)
97+
// TODO(wenyihu6): may be unexpected to caller that useCached only applies to
98+
// the stores stats but not runtime load monitor. We can change
99+
// runtimeLoadMonitor to also fetch updated stats.
100+
// TODO(wenyihu6): NodeCPURateCapacity <= NodeCPURateUsage fails on CI and
101+
// requires more investigation.
102+
cpuUsageNanoPerSec, cpuCapacityNanoPerSec := n.runtimeLoadMonitor.GetCPUStats()
103+
return roachpb.NodeCapacity{
104+
StoresCPURate: storesCPURate,
105+
NumStores: numStores,
106+
NodeCPURateCapacity: cpuCapacityNanoPerSec,
107+
NodeCPURateUsage: cpuUsageNanoPerSec,
108+
}
109+
}
110+
111+
// runtimeLoadMonitor polls cpu usage and capacity stats of the node
112+
// periodically and maintaining a moving average.
113+
type runtimeLoadMonitor struct {
114+
usageRefreshInterval time.Duration
115+
capacityRefreshInterval time.Duration
116+
stopper *stop.Stopper
117+
118+
mu struct {
119+
syncutil.Mutex
120+
// lastTotalUsageNanos tracks cumulative cpu usage in nanoseconds using
121+
// status.GetProcCPUTime.
122+
lastTotalUsageNanos float64
123+
// usageEWMA maintains a moving average of delta cpu usage between two
124+
// subsequent polls in nanoseconds. The cpu usage is obtained by polling
125+
// stats from status.GetProcCPUTime which is cumulative.
126+
usageEWMA ewma.MovingAverage
127+
// logicalCPUsPerSec represents the node's cpu capacity in logical
128+
// CPU-seconds per second, obtained from status.GetCPUCapacity.
129+
logicalCPUsPerSec int64
130+
}
131+
}
132+
133+
// GetCPUStats returns the current cpu usage and capacity stats for the node.
134+
func (m *runtimeLoadMonitor) GetCPUStats() (cpuUsageNanoPerSec int64, cpuCapacityNanoPerSec int64) {
135+
m.mu.Lock()
136+
defer m.mu.Unlock()
137+
// usageEWMA is usage in nanoseconds. Divide by refresh interval to get the
138+
// per-second nano-sec rate.
139+
cpuUsageNanoPerSec = int64(m.mu.usageEWMA.Value() / m.usageRefreshInterval.Seconds())
140+
// logicalCPUsPerSec is in logical cpu-seconds per second. Convert the unit
141+
// from cpu-seconds to cpu-nanoseconds.
142+
cpuCapacityNanoPerSec = m.mu.logicalCPUsPerSec * time.Second.Nanoseconds()
143+
return
144+
}
145+
146+
// recordCPUUsage samples and records the current cpu usage of the node.
147+
func (m *runtimeLoadMonitor) recordCPUUsage(ctx context.Context) {
148+
m.mu.Lock()
149+
defer m.mu.Unlock()
150+
userTimeMillis, sysTimeMillis, err := status.GetProcCPUTime(ctx)
151+
if err != nil {
152+
if buildutil.CrdbTestBuild {
153+
panic(err)
154+
}
155+
// TODO(wenyihu6): we should revisit error handling here for production.
156+
log.Warningf(ctx, "failed to get cpu usage: %v", err)
157+
}
158+
// Convert milliseconds to nanoseconds.
159+
totalUsageNanos := float64(userTimeMillis*1e6 + sysTimeMillis*1e6)
160+
if buildutil.CrdbTestBuild && m.mu.lastTotalUsageNanos > totalUsageNanos {
161+
panic(errors.Newf("programming error: last cpu usage is larger than current: %v > %v",
162+
m.mu.lastTotalUsageNanos, totalUsageNanos))
163+
}
164+
m.mu.usageEWMA.Add(totalUsageNanos - m.mu.lastTotalUsageNanos)
165+
m.mu.lastTotalUsageNanos = totalUsageNanos
166+
}
167+
168+
// recordCPUCapacity samples and records the current cpu capacity of the node.
169+
func (m *runtimeLoadMonitor) recordCPUCapacity(ctx context.Context) {
170+
m.mu.Lock()
171+
defer m.mu.Unlock()
172+
m.mu.logicalCPUsPerSec = int64(status.GetCPUCapacity())
173+
if m.mu.logicalCPUsPerSec == 0 {
174+
if buildutil.CrdbTestBuild {
175+
panic("programming error: cpu capacity is 0")
176+
}
177+
// TODO(wenyihu6): we should pass in an actual context here.
178+
log.Warningf(ctx, "failed to get cpu capacity")
179+
}
180+
}
181+
182+
// run is the main loop of the RuntimeLoadMonitor and periodically polls the cpu
183+
// usage and capacity. It continues to run until the context is done or the
184+
// stopper is quiesced.
185+
func (m *runtimeLoadMonitor) run(ctx context.Context) {
186+
usageTimer := time.NewTicker(m.usageRefreshInterval)
187+
defer usageTimer.Stop()
188+
capacityTimer := time.NewTicker(m.capacityRefreshInterval)
189+
defer capacityTimer.Stop()
190+
191+
for {
192+
select {
193+
case <-ctx.Done():
194+
return
195+
case <-m.stopper.ShouldQuiesce():
196+
return
197+
case <-usageTimer.C:
198+
usageTimer.Reset(m.usageRefreshInterval)
199+
m.recordCPUUsage(ctx)
200+
case <-capacityTimer.C:
201+
capacityTimer.Reset(m.capacityRefreshInterval)
202+
m.recordCPUCapacity(ctx)
203+
}
204+
}
205+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package load_test
7+
8+
import (
9+
"context"
10+
"testing"
11+
"time"
12+
13+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/load"
14+
"github.com/cockroachdb/cockroach/pkg/testutils"
15+
"github.com/cockroachdb/cockroach/pkg/util/stop"
16+
"github.com/cockroachdb/errors"
17+
"github.com/stretchr/testify/require"
18+
)
19+
20+
// mockStoresStatsAggregator implements StoresStatsAggregator for testing.
21+
type mockStoresStatsAggregator struct {
22+
cpuUsage int64
23+
storeCount int32
24+
}
25+
26+
func (m *mockStoresStatsAggregator) GetAggregatedStoreStats(
27+
_ bool,
28+
) (totalCPUUsage int64, totalStoreCount int32) {
29+
return m.cpuUsage, m.storeCount
30+
}
31+
32+
// TestNodeCapacityProvider tests the basic functionality of the
33+
// NodeCapacityProvider.
34+
func TestNodeCapacityProvider(t *testing.T) {
35+
stopper := stop.NewStopper()
36+
defer stopper.Stop(context.Background())
37+
38+
mockStores := &mockStoresStatsAggregator{
39+
cpuUsage: 1000,
40+
storeCount: 3,
41+
}
42+
43+
provider := load.NewNodeCapacityProvider(stopper, mockStores, &load.NodeCapacityProviderTestingKnobs{
44+
CpuUsageRefreshInterval: 1 * time.Millisecond,
45+
CpuCapacityRefreshInterval: 1 * time.Millisecond,
46+
})
47+
48+
ctx, cancel := context.WithCancel(context.Background())
49+
provider.Run(ctx)
50+
51+
// Provider should have valid stats.
52+
testutils.SucceedsSoon(t, func() error {
53+
nc := provider.GetNodeCapacity(false)
54+
if nc.NodeCPURateUsage == 0 || nc.NodeCPURateCapacity == 0 || nc.StoresCPURate == 0 {
55+
return errors.Newf(
56+
"CPU usage or capacity is 0: node cpu rate usage %v, node cpu rate capacity %v, stores cpu rate %v",
57+
nc.NodeCPURateUsage, nc.NodeCPURateCapacity, nc.StoresCPURate)
58+
}
59+
return nil
60+
})
61+
62+
cancel()
63+
// GetNodeCapacity should still return valid stats after cancellation.
64+
nc := provider.GetNodeCapacity(false)
65+
require.Greater(t, nc.NodeCPURateCapacity, int64(0))
66+
require.Greater(t, nc.NodeCPURateUsage, int64(0))
67+
}

0 commit comments

Comments
 (0)