Skip to content

Commit 72291ca

Browse files
committed
kvserver/load: add NodeCapacityProvider
Previously, we added a NodeCapacityProvider field to StoreGossip but left it nil in production code. In addition, we had a TODO in 1b6a91f to find a more appropriate home for node capacity provider, since the runtime load monitor is a node-level concept and was being passed to NewStores. During implementation, we identified a better home for it. This commit moves NodeCapacityProvider to be a node level concept as part of StoreConfig and passes it to each store for use when constructing the store descriptor. Note that runtime load monitoring currently lacks caching, and we may add it later if it proves too expensive. Epic: none Release note: none
1 parent 9e1e159 commit 72291ca

File tree

7 files changed

+319
-7
lines changed

7 files changed

+319
-7
lines changed

pkg/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ ALL_TESTS = [
265265
"//pkg/kv/kvserver/kvstorage:kvstorage_test",
266266
"//pkg/kv/kvserver/leases:leases_test",
267267
"//pkg/kv/kvserver/liveness:liveness_test",
268+
"//pkg/kv/kvserver/load:load_test",
268269
"//pkg/kv/kvserver/lockspanset:lockspanset_test",
269270
"//pkg/kv/kvserver/logstore:logstore_test",
270271
"//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb_test",
@@ -1547,6 +1548,7 @@ GO_TARGETS = [
15471548
"//pkg/kv/kvserver/liveness:liveness",
15481549
"//pkg/kv/kvserver/liveness:liveness_test",
15491550
"//pkg/kv/kvserver/load:load",
1551+
"//pkg/kv/kvserver/load:load_test",
15501552
"//pkg/kv/kvserver/lockspanset:lockspanset",
15511553
"//pkg/kv/kvserver/lockspanset:lockspanset_test",
15521554
"//pkg/kv/kvserver/logstore:logstore",

pkg/kv/kvserver/load/BUILD.bazel

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,38 @@
1-
load("@io_bazel_rules_go//go:def.bzl", "go_library")
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "load",
55
srcs = [
6+
"node_capacity_provider.go",
67
"record_replica_load.go",
78
"replica_load.go",
9+
"testing_knobs.go",
810
],
911
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load",
1012
visibility = ["//visibility:public"],
1113
deps = [
1214
"//pkg/kv/kvserver/replicastats",
1315
"//pkg/roachpb",
16+
"//pkg/server/status",
17+
"//pkg/util/buildutil",
1418
"//pkg/util/hlc",
19+
"//pkg/util/log",
20+
"//pkg/util/stop",
1521
"//pkg/util/syncutil",
1622
"//pkg/util/timeutil",
23+
"@com_github_cockroachdb_errors//:errors",
24+
"@com_github_vividcortex_ewma//:ewma",
25+
],
26+
)
27+
28+
go_test(
29+
name = "load_test",
30+
srcs = ["node_capacity_provider_test.go"],
31+
deps = [
32+
":load",
33+
"//pkg/testutils",
34+
"//pkg/util/stop",
35+
"@com_github_cockroachdb_errors//:errors",
36+
"@com_github_stretchr_testify//require",
1737
],
1838
)
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package load
7+
8+
import (
9+
"context"
10+
"time"
11+
12+
"github.com/VividCortex/ewma"
13+
"github.com/cockroachdb/cockroach/pkg/roachpb"
14+
"github.com/cockroachdb/cockroach/pkg/server/status"
15+
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
16+
"github.com/cockroachdb/cockroach/pkg/util/log"
17+
"github.com/cockroachdb/cockroach/pkg/util/stop"
18+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
19+
"github.com/cockroachdb/errors"
20+
)
21+
22+
// StoresStatsAggregator provides aggregated cpu usage stats across all stores.
23+
type StoresStatsAggregator interface {
24+
// GetAggregatedStoreStats returns the total cpu usage across all stores and
25+
// the count of stores. If useCached is true, it uses the cached store
26+
// descriptor instead of computing new ones. Implemented by Stores.
27+
GetAggregatedStoreStats(useCached bool) (aggregatedCPUUsage int64, totalStoreCount int32)
28+
}
29+
30+
// NodeCapacityProvider reports node-level cpu usage and capacity by sampling
31+
// runtime stats and aggregating store-level cpu capacity across all stores. It
32+
// is used by Store to populate the NodeCapacity field in the StoreDescriptor.
33+
type NodeCapacityProvider struct {
34+
stores StoresStatsAggregator
35+
runtimeLoadMonitor *runtimeLoadMonitor
36+
}
37+
38+
// NewNodeCapacityProvider creates a new NodeCapacityProvider that monitors CPU
39+
// metrics using the provided stores aggregator. The optional knobs parameter
40+
// allows customizing refresh intervals for testing.
41+
func NewNodeCapacityProvider(
42+
stopper *stop.Stopper, stores StoresStatsAggregator, knobs *NodeCapacityProviderTestingKnobs,
43+
) *NodeCapacityProvider {
44+
if stopper == nil || stores == nil {
45+
panic("programming error: stopper or stores aggregator cannot be nil")
46+
}
47+
48+
// refreshIntervals define how frequently cpu metrics are updated.
49+
const (
50+
// defaultCPUUsageRefreshInterval controls how often cpu usage measurements
51+
// are taken.
52+
defaultCPUUsageRefreshInterval = time.Second
53+
// defaultCPUCapacityRefreshInterval controls how often the total CPU
54+
// capacity of the node is re-calculated. This is less frequent than usage
55+
// since capacity changes happen less often.
56+
defaultCPUCapacityRefreshInterval = 10 * time.Second
57+
)
58+
59+
// defaultMovingAverageAge defines the effective time window size. With a
60+
// value of 20, the 20th-to-last measurement contributes meaningfully to the
61+
// average, while earlier measurements have diminishing impact.
62+
const defaultMovingAverageAge = 20
63+
64+
usageInterval := defaultCPUUsageRefreshInterval
65+
capacityInterval := defaultCPUCapacityRefreshInterval
66+
if knobs != nil {
67+
usageInterval = knobs.CpuUsageRefreshInterval
68+
capacityInterval = knobs.CpuCapacityRefreshInterval
69+
}
70+
71+
monitor := &runtimeLoadMonitor{
72+
stopper: stopper,
73+
usageRefreshInterval: usageInterval,
74+
capacityRefreshInterval: capacityInterval,
75+
}
76+
monitor.mu.usageEWMA = ewma.NewMovingAverage(defaultMovingAverageAge)
77+
monitor.recordCPUCapacity(context.Background())
78+
return &NodeCapacityProvider{
79+
stores: stores,
80+
runtimeLoadMonitor: monitor,
81+
}
82+
}
83+
84+
// Run starts the background monitoring of cpu metrics.
85+
func (n *NodeCapacityProvider) Run(ctx context.Context) {
86+
_ = n.runtimeLoadMonitor.stopper.RunAsyncTask(ctx, "runtime-load-monitor", func(ctx context.Context) {
87+
n.runtimeLoadMonitor.run(ctx)
88+
})
89+
}
90+
91+
// GetNodeCapacity returns the NodeCapacity which node-level cpu usage and
92+
// capacity and aggregated store-level cpu usage. If useCached is true, it will
93+
// use cached store descriptors to aggregate the sum of store-level cpu
94+
// capacity.
95+
func (n *NodeCapacityProvider) GetNodeCapacity(useCached bool) roachpb.NodeCapacity {
96+
storesCPURate, numStores := n.stores.GetAggregatedStoreStats(useCached)
97+
// TODO(wenyihu6): may be unexpected to caller that useCached only applies to
98+
// the stores stats but not runtime load monitor. We can change
99+
// runtimeLoadMonitor to also fetch updated stats.
100+
// TODO(wenyihu6): NodeCPURateCapacity <= NodeCPURateUsage fails on CI and
101+
// requires more investigation.
102+
cpuUsageNanoPerSec, cpuCapacityNanoPerSec := n.runtimeLoadMonitor.GetCPUStats()
103+
return roachpb.NodeCapacity{
104+
StoresCPURate: storesCPURate,
105+
NumStores: numStores,
106+
NodeCPURateCapacity: cpuCapacityNanoPerSec,
107+
NodeCPURateUsage: cpuUsageNanoPerSec,
108+
}
109+
}
110+
111+
// runtimeLoadMonitor polls cpu usage and capacity stats of the node
112+
// periodically and maintaining a moving average.
113+
type runtimeLoadMonitor struct {
114+
usageRefreshInterval time.Duration
115+
capacityRefreshInterval time.Duration
116+
stopper *stop.Stopper
117+
118+
mu struct {
119+
syncutil.Mutex
120+
// lastTotalUsageNanos tracks cumulative cpu usage in nanoseconds using
121+
// status.GetProcCPUTime.
122+
lastTotalUsageNanos float64
123+
// usageEWMA maintains a moving average of delta cpu usage between two
124+
// subsequent polls in nanoseconds. The cpu usage is obtained by polling
125+
// stats from status.GetProcCPUTime which is cumulative.
126+
usageEWMA ewma.MovingAverage
127+
// logicalCPUsPerSec represents the node's cpu capacity in logical
128+
// CPU-seconds per second, obtained from status.GetCPUCapacity.
129+
logicalCPUsPerSec int64
130+
}
131+
}
132+
133+
// GetCPUStats returns the current cpu usage and capacity stats for the node.
134+
func (m *runtimeLoadMonitor) GetCPUStats() (cpuUsageNanoPerSec int64, cpuCapacityNanoPerSec int64) {
135+
m.mu.Lock()
136+
defer m.mu.Unlock()
137+
// usageEWMA is usage in nanoseconds. Divide by refresh interval to get the
138+
// per-second nano-sec rate.
139+
cpuUsageNanoPerSec = int64(m.mu.usageEWMA.Value() / m.usageRefreshInterval.Seconds())
140+
// logicalCPUsPerSec is in logical cpu-seconds per second. Convert the unit
141+
// from cpu-seconds to cpu-nanoseconds.
142+
cpuCapacityNanoPerSec = m.mu.logicalCPUsPerSec * time.Second.Nanoseconds()
143+
return
144+
}
145+
146+
// recordCPUUsage samples and records the current cpu usage of the node.
147+
func (m *runtimeLoadMonitor) recordCPUUsage(ctx context.Context) {
148+
m.mu.Lock()
149+
defer m.mu.Unlock()
150+
userTimeMillis, sysTimeMillis, err := status.GetProcCPUTime(ctx)
151+
if err != nil {
152+
if buildutil.CrdbTestBuild {
153+
panic(err)
154+
}
155+
// TODO(wenyihu6): we should revisit error handling here for production.
156+
log.Warningf(ctx, "failed to get cpu usage: %v", err)
157+
}
158+
// Convert milliseconds to nanoseconds.
159+
totalUsageNanos := float64(userTimeMillis*1e6 + sysTimeMillis*1e6)
160+
if buildutil.CrdbTestBuild && m.mu.lastTotalUsageNanos > totalUsageNanos {
161+
panic(errors.Newf("programming error: last cpu usage is larger than current: %v > %v",
162+
m.mu.lastTotalUsageNanos, totalUsageNanos))
163+
}
164+
m.mu.usageEWMA.Add(totalUsageNanos - m.mu.lastTotalUsageNanos)
165+
m.mu.lastTotalUsageNanos = totalUsageNanos
166+
}
167+
168+
// recordCPUCapacity samples and records the current cpu capacity of the node.
169+
func (m *runtimeLoadMonitor) recordCPUCapacity(ctx context.Context) {
170+
m.mu.Lock()
171+
defer m.mu.Unlock()
172+
m.mu.logicalCPUsPerSec = int64(status.GetCPUCapacity())
173+
if m.mu.logicalCPUsPerSec == 0 {
174+
if buildutil.CrdbTestBuild {
175+
panic("programming error: cpu capacity is 0")
176+
}
177+
// TODO(wenyihu6): we should pass in an actual context here.
178+
log.Warningf(ctx, "failed to get cpu capacity")
179+
}
180+
}
181+
182+
// run is the main loop of the RuntimeLoadMonitor and periodically polls the cpu
183+
// usage and capacity. It continues to run until the context is done or the
184+
// stopper is quiesced.
185+
func (m *runtimeLoadMonitor) run(ctx context.Context) {
186+
usageTimer := time.NewTicker(m.usageRefreshInterval)
187+
defer usageTimer.Stop()
188+
capacityTimer := time.NewTicker(m.capacityRefreshInterval)
189+
defer capacityTimer.Stop()
190+
191+
for {
192+
select {
193+
case <-ctx.Done():
194+
return
195+
case <-m.stopper.ShouldQuiesce():
196+
return
197+
case <-usageTimer.C:
198+
usageTimer.Reset(m.usageRefreshInterval)
199+
m.recordCPUUsage(ctx)
200+
case <-capacityTimer.C:
201+
capacityTimer.Reset(m.capacityRefreshInterval)
202+
m.recordCPUCapacity(ctx)
203+
}
204+
}
205+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package load_test
7+
8+
import (
9+
"context"
10+
"testing"
11+
"time"
12+
13+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/load"
14+
"github.com/cockroachdb/cockroach/pkg/testutils"
15+
"github.com/cockroachdb/cockroach/pkg/util/stop"
16+
"github.com/cockroachdb/errors"
17+
"github.com/stretchr/testify/require"
18+
)
19+
20+
// mockStoresStatsAggregator implements StoresStatsAggregator for testing.
21+
type mockStoresStatsAggregator struct {
22+
cpuUsage int64
23+
storeCount int32
24+
}
25+
26+
func (m *mockStoresStatsAggregator) GetAggregatedStoreStats(
27+
_ bool,
28+
) (totalCPUUsage int64, totalStoreCount int32) {
29+
return m.cpuUsage, m.storeCount
30+
}
31+
32+
// TestNodeCapacityProvider tests the basic functionality of the
33+
// NodeCapacityProvider.
34+
func TestNodeCapacityProvider(t *testing.T) {
35+
stopper := stop.NewStopper()
36+
defer stopper.Stop(context.Background())
37+
38+
mockStores := &mockStoresStatsAggregator{
39+
cpuUsage: 1000,
40+
storeCount: 3,
41+
}
42+
43+
provider := load.NewNodeCapacityProvider(stopper, mockStores, &load.NodeCapacityProviderTestingKnobs{
44+
CpuUsageRefreshInterval: 1 * time.Millisecond,
45+
CpuCapacityRefreshInterval: 1 * time.Millisecond,
46+
})
47+
48+
ctx, cancel := context.WithCancel(context.Background())
49+
provider.Run(ctx)
50+
51+
// Provider should have valid stats.
52+
testutils.SucceedsSoon(t, func() error {
53+
nc := provider.GetNodeCapacity(false)
54+
if nc.NodeCPURateUsage == 0 || nc.NodeCPURateCapacity == 0 || nc.StoresCPURate == 0 {
55+
return errors.Newf(
56+
"CPU usage or capacity is 0: node cpu rate usage %v, node cpu rate capacity %v, stores cpu rate %v",
57+
nc.NodeCPURateUsage, nc.NodeCPURateCapacity, nc.StoresCPURate)
58+
}
59+
return nil
60+
})
61+
62+
cancel()
63+
// GetNodeCapacity should still return valid stats after cancellation.
64+
nc := provider.GetNodeCapacity(false)
65+
require.Greater(t, nc.NodeCPURateCapacity, int64(0))
66+
require.Greater(t, nc.NodeCPURateUsage, int64(0))
67+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package load
7+
8+
import "time"
9+
10+
// NodeCapacityProviderTestingKnobs contains testing knobs for the node capacity
11+
// provider.
12+
type NodeCapacityProviderTestingKnobs struct {
13+
CpuUsageRefreshInterval time.Duration
14+
CpuCapacityRefreshInterval time.Duration
15+
}
16+
17+
// NodeCapacityProviderTestingKnobs implements the ModuleTestingKnobs interface.
18+
func (*NodeCapacityProviderTestingKnobs) ModuleTestingKnobs() {}

pkg/server/status/BUILD.bazel

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ go_library(
2222
# keep
2323
clinkopts = select({
2424
"@io_bazel_rules_go//go/platform:android": [
25-
"-lrt -lm -lpthread",
25+
"-lrt -lm -lpthread -ldl",
2626
],
2727
"@io_bazel_rules_go//go/platform:dragonfly": [
2828
"-lm",
@@ -31,7 +31,7 @@ go_library(
3131
"-lm",
3232
],
3333
"@io_bazel_rules_go//go/platform:linux": [
34-
"-lrt -lm -lpthread",
34+
"-lrt -lm -lpthread -ldl",
3535
],
3636
"//conditions:default": [],
3737
}),

pkg/server/status/runtime.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -912,7 +912,7 @@ func (rsr *RuntimeStatSampler) SampleEnvironment(ctx context.Context, cs *CGoMem
912912
if err != nil {
913913
log.Ops.Errorf(ctx, "unable to get process CPU usage: %v", err)
914914
}
915-
cpuCapacity := getCPUCapacity()
915+
cpuCapacity := GetCPUCapacity()
916916
cpuUsageStats, err := cpu.Times(false /* percpu */)
917917
if err != nil {
918918
log.Ops.Errorf(ctx, "unable to get system CPU usage: %v", err)
@@ -1257,10 +1257,10 @@ func GetProcCPUTime(ctx context.Context) (userTimeMillis, sysTimeMillis int64, e
12571257
return int64(cpuTime.User), int64(cpuTime.Sys), nil
12581258
}
12591259

1260-
// getCPUCapacity returns the number of logical CPU processors available for
1260+
// GetCPUCapacity returns the number of logical CPU processors available for
12611261
// use by the process. The capacity accounts for cgroup constraints, GOMAXPROCS
1262-
// and the number of host processors.
1263-
func getCPUCapacity() float64 {
1262+
// and the number of host processors.
1263+
func GetCPUCapacity() float64 {
12641264
numProcs := float64(runtime.GOMAXPROCS(0 /* read only */))
12651265
cgroupCPU, err := cgroups.GetCgroupCPU()
12661266
if err != nil {

0 commit comments

Comments
 (0)