Skip to content

Commit bcc1685

Browse files
wenyihu6tbg
authored andcommitted
kvserver: pass NodeCapacityProviderConfig to NewNodeCapacityProvider
Previously, NewNodeCapacityProvider took NodeCapacityProviderTestingKnobs directly and handled the logic for picking refresh intervals internally. This wasn't ideal, as it is preferred for configuration to be handled by the caller, keeping NewNodeCapacityProvider focused on its core logic. This change moves that logic to the caller, which now constructs the config with the appropriate interval values. The default setting is now also closer to the server configuration in pkg/base/config.go. Epic: none Release note: none
1 parent d7cdba0 commit bcc1685

File tree

6 files changed

+62
-41
lines changed

6 files changed

+62
-41
lines changed

pkg/base/config.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,21 @@ const (
9696
// DefaultLeaseRenewalCrossValidate is the default setting for if
9797
// we should validate descriptors on lease renewals.
9898
DefaultLeaseRenewalCrossValidate = false
99+
100+
// DefaultCPUUsageRefreshInterval controls how often cpu usage measurements
101+
// are sampled by NodeCapacityProvider.
102+
DefaultCPUUsageRefreshInterval = time.Second
103+
104+
// DefaultCPUCapacityRefreshInterval controls how often the total CPU capacity
105+
// of the node is re-calculated by NodeCapacityProvider. This is less frequent
106+
// than usage since capacity changes happen less often.
107+
DefaultCPUCapacityRefreshInterval = 10 * time.Second
108+
109+
// DefaultCPUUsageMovingAverageAge defines the effective time window size for
110+
// sampling cpu usage. With a value of 20, the 20th-to-last measurement
111+
// contributes meaningfully to the average, while earlier measurements have
112+
// diminishing impact.
113+
DefaultCPUUsageMovingAverageAge = 20
99114
)
100115

101116
// DefaultCertsDirectory is the default value for the cert directory flag.

pkg/kv/kvserver/load/node_capacity_provider.go

Lines changed: 19 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -35,45 +35,35 @@ type NodeCapacityProvider struct {
3535
runtimeLoadMonitor *runtimeLoadMonitor
3636
}
3737

38+
// NodeCapacityProviderConfig holds the configuration for creating a
39+
// NodeCapacityProvider.
40+
type NodeCapacityProviderConfig struct {
41+
// CPUUsageRefreshInterval controls how often cpu usage measurements are
42+
// sampled.
43+
CPUUsageRefreshInterval time.Duration
44+
// CPUCapacityRefreshInterval controls how often the total CPU capacity is
45+
// polled.
46+
CPUCapacityRefreshInterval time.Duration
47+
// CPUUsageMovingAverageAge defines the effective time window size for the
48+
// moving average when sampling cpu usage.
49+
CPUUsageMovingAverageAge float64
50+
}
51+
3852
// NewNodeCapacityProvider creates a new NodeCapacityProvider that monitors CPU
39-
// metrics using the provided stores aggregator. The optional knobs parameter
40-
// allows customizing refresh intervals for testing.
53+
// metrics using the provided stores aggregator and configuration.
4154
func NewNodeCapacityProvider(
42-
stopper *stop.Stopper, stores StoresStatsAggregator, knobs *NodeCapacityProviderTestingKnobs,
55+
stopper *stop.Stopper, stores StoresStatsAggregator, config NodeCapacityProviderConfig,
4356
) *NodeCapacityProvider {
4457
if stopper == nil || stores == nil {
4558
panic("programming error: stopper or stores aggregator cannot be nil")
4659
}
4760

48-
// refreshIntervals define how frequently cpu metrics are updated.
49-
const (
50-
// defaultCPUUsageRefreshInterval controls how often cpu usage measurements
51-
// are taken.
52-
defaultCPUUsageRefreshInterval = time.Second
53-
// defaultCPUCapacityRefreshInterval controls how often the total CPU
54-
// capacity of the node is re-calculated. This is less frequent than usage
55-
// since capacity changes happen less often.
56-
defaultCPUCapacityRefreshInterval = 10 * time.Second
57-
)
58-
59-
// defaultMovingAverageAge defines the effective time window size. With a
60-
// value of 20, the 20th-to-last measurement contributes meaningfully to the
61-
// average, while earlier measurements have diminishing impact.
62-
const defaultMovingAverageAge = 20
63-
64-
usageInterval := defaultCPUUsageRefreshInterval
65-
capacityInterval := defaultCPUCapacityRefreshInterval
66-
if knobs != nil {
67-
usageInterval = knobs.CpuUsageRefreshInterval
68-
capacityInterval = knobs.CpuCapacityRefreshInterval
69-
}
70-
7161
monitor := &runtimeLoadMonitor{
7262
stopper: stopper,
73-
usageRefreshInterval: usageInterval,
74-
capacityRefreshInterval: capacityInterval,
63+
usageRefreshInterval: config.CPUUsageRefreshInterval,
64+
capacityRefreshInterval: config.CPUCapacityRefreshInterval,
7565
}
76-
monitor.mu.usageEWMA = ewma.NewMovingAverage(defaultMovingAverageAge)
66+
monitor.mu.usageEWMA = ewma.NewMovingAverage(config.CPUUsageMovingAverageAge)
7767
monitor.recordCPUCapacity(context.Background())
7868
return &NodeCapacityProvider{
7969
stores: stores,

pkg/kv/kvserver/load/node_capacity_provider_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,10 @@ func TestNodeCapacityProvider(t *testing.T) {
4040
storeCount: 3,
4141
}
4242

43-
provider := load.NewNodeCapacityProvider(stopper, mockStores, &load.NodeCapacityProviderTestingKnobs{
44-
CpuUsageRefreshInterval: 1 * time.Millisecond,
45-
CpuCapacityRefreshInterval: 1 * time.Millisecond,
43+
provider := load.NewNodeCapacityProvider(stopper, mockStores, load.NodeCapacityProviderConfig{
44+
CPUUsageRefreshInterval: 1 * time.Millisecond,
45+
CPUCapacityRefreshInterval: 1 * time.Millisecond,
46+
CPUUsageMovingAverageAge: 20,
4647
})
4748

4849
ctx, cancel := context.WithCancel(context.Background())

pkg/kv/kvserver/store_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,12 @@ func createTestStoreWithoutStart(
262262
stores := NewStores(cfg.AmbientCtx, cfg.Clock)
263263
nodeDesc := &roachpb.NodeDescriptor{NodeID: 1}
264264
if cfg.NodeCapacityProvider == nil {
265-
cfg.NodeCapacityProvider = load.NewNodeCapacityProvider(stopper, stores, nil)
265+
// Faster refresh intervals for testing.
266+
cfg.NodeCapacityProvider = load.NewNodeCapacityProvider(stopper, stores, load.NodeCapacityProviderConfig{
267+
CPUUsageRefreshInterval: 10 * time.Millisecond,
268+
CPUCapacityRefreshInterval: 10 * time.Millisecond,
269+
CPUUsageMovingAverageAge: 20,
270+
})
266271
}
267272

268273
rangeProv := &dummyFirstRangeProvider{}

pkg/server/server.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -690,14 +690,19 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
690690
policyRefresher = policyrefresher.NewPolicyRefresher(stopper, st, ctSender.GetLeaseholders,
691691
rpcContext.RemoteClocks.AllLatencies, knobs)
692692
}
693-
var nodeCapacityProvider *load.NodeCapacityProvider
694-
{
695-
var knobs *load.NodeCapacityProviderTestingKnobs
696-
if nodeCapacityProviderKnobs := cfg.TestingKnobs.NodeCapacityProviderKnobs; nodeCapacityProviderKnobs != nil {
697-
knobs = nodeCapacityProviderKnobs.(*load.NodeCapacityProviderTestingKnobs)
698-
}
699-
nodeCapacityProvider = load.NewNodeCapacityProvider(stopper, stores, knobs)
693+
694+
cpuUsageRefreshInterval := base.DefaultCPUUsageRefreshInterval
695+
cpuCapacityRefreshInterval := base.DefaultCPUCapacityRefreshInterval
696+
if ncpKnobs := cfg.TestingKnobs.NodeCapacityProviderKnobs; ncpKnobs != nil {
697+
cpuUsageRefreshInterval = ncpKnobs.(*load.NodeCapacityProviderTestingKnobs).CpuUsageRefreshInterval
698+
cpuCapacityRefreshInterval = ncpKnobs.(*load.NodeCapacityProviderTestingKnobs).CpuCapacityRefreshInterval
699+
}
700+
nodeCapacityProviderConfig := load.NodeCapacityProviderConfig{
701+
CPUUsageRefreshInterval: cpuUsageRefreshInterval,
702+
CPUCapacityRefreshInterval: cpuCapacityRefreshInterval,
703+
CPUUsageMovingAverageAge: base.DefaultCPUUsageMovingAverageAge,
700704
}
705+
nodeCapacityProvider := load.NewNodeCapacityProvider(stopper, stores, nodeCapacityProviderConfig)
701706

702707
// The Executor will be further initialized later, as we create more
703708
// of the server's components. There's a circular dependency - many things

pkg/testutils/localtestcluster/local_test_cluster.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,12 @@ func (ltc *LocalTestCluster) Start(t testing.TB, initFactory InitFactoryFn) {
167167
ltc.stopper.AddCloser(ltc.Eng)
168168

169169
ltc.Stores = kvserver.NewStores(ambient, ltc.Clock)
170-
cfg.NodeCapacityProvider = load.NewNodeCapacityProvider(ltc.stopper, ltc.Stores, nil /*knobs*/)
170+
// Faster refresh intervals for testing.
171+
cfg.NodeCapacityProvider = load.NewNodeCapacityProvider(ltc.stopper, ltc.Stores, load.NodeCapacityProviderConfig{
172+
CPUUsageRefreshInterval: 10 * time.Millisecond,
173+
CPUCapacityRefreshInterval: 10 * time.Millisecond,
174+
CPUUsageMovingAverageAge: 20,
175+
})
171176

172177
factory := initFactory(ctx, cfg.Settings, nodeDesc, ltc.stopper.Tracer(), ltc.Clock, ltc.Latency, ltc.Stores, ltc.stopper, ltc.Gossip)
173178

0 commit comments

Comments
 (0)