Skip to content

Commit b611eea

Browse files
authored
Merge pull request #2199 from sthaha/feat-config-max-terminated
feat(monitor): add configurable limit for terminated workloads tracking
2 parents b12f34a + 2fbc24f commit b611eea

File tree

9 files changed

+137
-27
lines changed

9 files changed

+137
-27
lines changed

cmd/kepler/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ func createServices(logger *slog.Logger, cfg *config.Config) ([]service.Service,
153153
monitor.WithResourceInformer(resourceInformer),
154154
monitor.WithInterval(cfg.Monitor.Interval),
155155
monitor.WithMaxStaleness(cfg.Monitor.Staleness),
156+
monitor.WithMaxTerminated(cfg.Monitor.MaxTerminated),
156157
)
157158

158159
apiServer := server.NewAPIServer(

compose/dev/kepler-dev/etc/kepler/config.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ monitor:
2020
# NOTE: Keep staleness shorter than the monitor interval.
2121
staleness: 1000ms
2222

23+
# maximum number of terminated workloads (process, container, VM, pods)
24+
# to be kept in memory until the data is exported; 0 disables the limit
25+
maxTerminated: 500
26+
2327
host:
2428
sysfs: /host/sys # Path to sysfs filesystem (default: /sys)
2529
procfs: /host/proc # Path to procfs filesystem (default: /proc)

config/config.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ type (
4646
Monitor struct {
4747
Interval time.Duration `yaml:"interval"` // Interval for monitoring resources
4848
Staleness time.Duration `yaml:"staleness"` // Time after which calculated values are considered stale
49+
50+
MaxTerminated int `yaml:"maxTerminated"`
4951
}
5052

5153
// Exporter configuration
@@ -147,8 +149,9 @@ const (
147149
HostSysFSFlag = "host.sysfs"
148150
HostProcFSFlag = "host.procfs"
149151

150-
MonitorIntervalFlag = "monitor.interval"
151-
MonitorStaleness = "monitor.staleness" // not a flag
152+
MonitorIntervalFlag = "monitor.interval"
153+
MonitorStaleness = "monitor.staleness" // not a flag
154+
MonitorMaxTerminatedFlag = "monitor.max-terminated"
152155

153156
// RAPL
154157
RaplZones = "rapl.zones" // not a flag
@@ -190,6 +193,8 @@ func DefaultConfig() *Config {
190193
Monitor: Monitor{
191194
Interval: 5 * time.Second,
192195
Staleness: 500 * time.Millisecond,
196+
197+
MaxTerminated: 500,
193198
},
194199
Exporter: Exporter{
195200
Stdout: StdoutExporter{
@@ -286,6 +291,8 @@ func RegisterFlags(app *kingpin.Application) ConfigUpdaterFn {
286291
// monitor
287292
monitorInterval := app.Flag(MonitorIntervalFlag,
288293
"Interval for monitoring resources (processes, container, vm, etc...); 0 to disable").Default("5s").Duration()
294+
maxTerminated := app.Flag(MonitorMaxTerminatedFlag,
295+
"Maximum number of terminated workloads to keep in memory until exported; 0 for unlimited").Default("500").Int()
289296

290297
enablePprof := app.Flag(pprofEnabledFlag, "Enable pprof debug endpoints").Default("false").Bool()
291298
webConfig := app.Flag(WebConfigFlag, "Web config file path").Default("").String()
@@ -295,7 +302,7 @@ func RegisterFlags(app *kingpin.Application) ConfigUpdaterFn {
295302

296303
prometheusExporterEnabled := app.Flag(ExporterPrometheusEnabledFlag, "Enable Prometheus exporter").Default("true").Bool()
297304

298-
var metricsLevel = metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod
305+
metricsLevel := metrics.MetricsLevelNode | metrics.MetricsLevelProcess | metrics.MetricsLevelContainer | metrics.MetricsLevelVM | metrics.MetricsLevelPod
299306
app.Flag(ExporterPrometheusMetricsFlag, "Metrics levels to export (node,process,container,vm,pod)").SetValue(NewMetricsLevelValue(&metricsLevel))
300307

301308
kubernetes := app.Flag(KubernetesFlag, "Monitor kubernetes").Default("false").Bool()
@@ -325,6 +332,10 @@ func RegisterFlags(app *kingpin.Application) ConfigUpdaterFn {
325332
cfg.Monitor.Interval = *monitorInterval
326333
}
327334

335+
if flagsSet[MonitorMaxTerminatedFlag] {
336+
cfg.Monitor.MaxTerminated = *maxTerminated
337+
}
338+
328339
if flagsSet[pprofEnabledFlag] {
329340
cfg.Debug.Pprof.Enabled = enablePprof
330341
}
@@ -434,6 +445,9 @@ func (c *Config) Validate(skips ...SkipValidation) error {
434445
if c.Monitor.Staleness < 0 {
435446
errs = append(errs, fmt.Sprintf("invalid monitor staleness: %s can't be negative", c.Monitor.Staleness))
436447
}
448+
if c.Monitor.MaxTerminated < 0 {
449+
errs = append(errs, fmt.Sprintf("invalid monitor max terminated: %d can't be negative", c.Monitor.MaxTerminated))
450+
}
437451
}
438452
{ // Kubernetes
439453
if ptr.Deref(c.Kube.Enabled, false) {
@@ -514,6 +528,7 @@ func (c *Config) manualString() string {
514528
{HostProcFSFlag, c.Host.ProcFS},
515529
{MonitorIntervalFlag, c.Monitor.Interval.String()},
516530
{MonitorStaleness, c.Monitor.Staleness.String()},
531+
{MonitorMaxTerminatedFlag, fmt.Sprintf("%d", c.Monitor.MaxTerminated)},
517532
{RaplZones, strings.Join(c.Rapl.Zones, ", ")},
518533
{ExporterStdoutEnabledFlag, fmt.Sprintf("%v", c.Exporter.Stdout.Enabled)},
519534
{ExporterPrometheusEnabledFlag, fmt.Sprintf("%v", c.Exporter.Prometheus.Enabled)},

config/config_test.go

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -627,14 +627,30 @@ func TestMonitorConfig(t *testing.T) {
627627
cfg.Monitor.Staleness = 100
628628
assert.NoError(t, cfg.Validate())
629629
})
630+
631+
t.Run("maxTerminated", func(t *testing.T) {
632+
cfg := DefaultConfig()
633+
assert.Equal(t, 500, cfg.Monitor.MaxTerminated, "default maxTerminated should be 500")
634+
assert.NoError(t, cfg.Validate())
635+
636+
cfg.Monitor.MaxTerminated = -10
637+
assert.ErrorContains(t, cfg.Validate(), "invalid configuration: invalid monitor max terminated")
638+
639+
cfg.Monitor.MaxTerminated = 0
640+
assert.NoError(t, cfg.Validate(), "maxTerminated=0 should be valid (unlimited)")
641+
642+
cfg.Monitor.MaxTerminated = 1000
643+
assert.NoError(t, cfg.Validate())
644+
})
630645
}
631646

632647
func TestMonitorConfigFlags(t *testing.T) {
633648
type expect struct {
634-
interval time.Duration
635-
staleness time.Duration
636-
parseError error
637-
cfgErr error
649+
interval time.Duration
650+
staleness time.Duration
651+
maxTerminated int
652+
parseError error
653+
cfgErr error
638654
}
639655
tt := []struct {
640656
name string
@@ -643,7 +659,7 @@ func TestMonitorConfigFlags(t *testing.T) {
643659
}{{
644660
name: "default",
645661
args: []string{},
646-
expected: expect{interval: 5 * time.Second, staleness: 500 * time.Millisecond, parseError: nil},
662+
expected: expect{interval: 5 * time.Second, staleness: 500 * time.Millisecond, maxTerminated: 500, parseError: nil},
647663
}, {
648664
name: "invalid-interval flag",
649665
args: []string{"--monitor.interval=-10Fs"},
@@ -652,6 +668,18 @@ func TestMonitorConfigFlags(t *testing.T) {
652668
name: "invalid-interval",
653669
args: []string{"--monitor.interval=-10s"},
654670
expected: expect{cfgErr: fmt.Errorf("invalid configuration: invalid monitor interval")},
671+
}, {
672+
name: "valid-max-terminated",
673+
args: []string{"--monitor.max-terminated=1000"},
674+
expected: expect{interval: 5 * time.Second, staleness: 500 * time.Millisecond, maxTerminated: 1000, parseError: nil},
675+
}, {
676+
name: "max-terminated-zero",
677+
args: []string{"--monitor.max-terminated=0"},
678+
expected: expect{interval: 5 * time.Second, staleness: 500 * time.Millisecond, maxTerminated: 0, parseError: nil},
679+
}, {
680+
name: "invalid-max-terminated",
681+
args: []string{"--monitor.max-terminated=-10"},
682+
expected: expect{cfgErr: fmt.Errorf("invalid configuration: invalid monitor max terminated")},
655683
}}
656684

657685
for _, tc := range tt {
@@ -676,10 +704,46 @@ func TestMonitorConfigFlags(t *testing.T) {
676704
assert.NoError(t, err, "unexpected config update error")
677705
assert.Equal(t, cfg.Monitor.Interval, tc.expected.interval)
678706
assert.Equal(t, cfg.Monitor.Staleness, tc.expected.staleness)
707+
assert.Equal(t, cfg.Monitor.MaxTerminated, tc.expected.maxTerminated)
679708
})
680709
}
681710
}
682711

712+
func TestMonitorMaxTerminatedYAML(t *testing.T) {
713+
t.Run("yaml-config-maxTerminated", func(t *testing.T) {
714+
yamlData := `
715+
monitor:
716+
maxTerminated: 1000
717+
`
718+
reader := strings.NewReader(yamlData)
719+
cfg, err := Load(reader)
720+
assert.NoError(t, err)
721+
assert.Equal(t, 1000, cfg.Monitor.MaxTerminated)
722+
})
723+
724+
t.Run("yaml-config-maxTerminated-zero", func(t *testing.T) {
725+
yamlData := `
726+
monitor:
727+
maxTerminated: 0
728+
`
729+
reader := strings.NewReader(yamlData)
730+
cfg, err := Load(reader)
731+
assert.NoError(t, err)
732+
assert.Equal(t, 0, cfg.Monitor.MaxTerminated)
733+
})
734+
735+
t.Run("yaml-config-maxTerminated-invalid", func(t *testing.T) {
736+
yamlData := `
737+
monitor:
738+
maxTerminated: -100
739+
`
740+
reader := strings.NewReader(yamlData)
741+
_, err := Load(reader)
742+
assert.Error(t, err)
743+
assert.Contains(t, err.Error(), "invalid monitor max terminated")
744+
})
745+
}
746+
683747
func TestConfigDefault(t *testing.T) {
684748
cfg := DefaultConfig()
685749

docs/configuration/configuration.md

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ You can configure Kepler by passing flags when starting the service. The followi
2323
| `--host.sysfs` | Path to sysfs filesystem | `/sys` | Any valid directory path |
2424
| `--host.procfs` | Path to procfs filesystem | `/proc` | Any valid directory path |
2525
| `--monitor.interval` | Monitor refresh interval | `5s` | Any valid duration |
26+
| `--monitor.max-terminated` | Maximum number of terminated workloads to keep in memory until exported | `500` | Any non-negative integer (0 for unlimited) |
2627
| `--web.config-file` | Path to TLS server config file | `""` | Any valid file path |
2728
| `--debug.pprof` | Enable pprof debugging endpoints | `false` | `true`, `false` |
2829
| `--exporter.stdout` | Enable stdout exporter | `false` | `true`, `false` |
@@ -55,6 +56,12 @@ kepler --metrics=node --metrics=container
5556

5657
# Export only process level metrics
5758
kepler --metrics=process
59+
60+
# Set maximum terminated workloads to 1000
61+
kepler --monitor.max-terminated=1000
62+
63+
# Disable terminated workload tracking (unlimited)
64+
kepler --monitor.max-terminated=0
5865
```
5966

6067
## 🗂️ Configuration File
@@ -69,8 +76,9 @@ log:
6976
format: text # text or json (default: text)
7077

7178
monitor:
72-
interval: 5s # Monitor refresh interval (default: 5s)
73-
staleness: 1000ms # Duration after which data is considered stale (default: 1000ms)
79+
interval: 5s # Monitor refresh interval (default: 5s)
80+
staleness: 1000ms # Duration after which data is considered stale (default: 1000ms)
81+
maxTerminated: 500 # Maximum number of terminated workloads to keep in memory (default: 500)
7482

7583
host:
7684
sysfs: /sys # Path to sysfs filesystem (default: /sys)
@@ -139,12 +147,15 @@ log:
139147
monitor:
140148
interval: 5s
141149
staleness: 1000ms
150+
maxTerminated: 500
142151
```
143152

144153
- **interval**: The monitor's refresh interval. All processes with a lifetime less than this interval will be ignored. Setting to 0s disables monitor refreshes.
145154

146155
- **staleness**: Duration after which data computed by the monitor is considered stale and recomputed when requested again. Especially useful when multiple Prometheus instances are scraping Kepler, ensuring they receive the same data within the staleness window. Should be shorter than the monitor interval.
147156

157+
- **maxTerminated**: Maximum number of terminated workloads (processes, containers, VMs, pods) to keep in memory until the data is exported. This prevents unbounded memory growth in high-churn environments. Set to 0 for unlimited (no limit). When the limit is reached, the least power consuming terminated workloads are removed first.
158+
148159
### 🗄️ Host Configuration
149160

150161
```yaml

hack/config.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ monitor:
2020
# NOTE: Keep staleness shorter than the monitor interval.
2121
staleness: 1000ms
2222

23+
# maximum number of terminated workloads (process, container, VM, pods)
24+
# to be kept in memory until the data is exported; 0 disables the limit
25+
maxTerminated: 500
26+
2327
host:
2428
sysfs: /sys # Path to sysfs filesystem (default: /sys)
2529
procfs: /proc # Path to procfs filesystem (default: /proc)
@@ -55,7 +59,6 @@ kube: # kubernetes related config
5559
config: "" # path to kubeconfig file (optional if running in-cluster)
5660
nodeName: "" # name of the kubernetes node (required when enabled)
5761

58-
5962
# WARN DO NOT ENABLE THIS IN PRODUCTION - for development / testing only
6063
dev:
6164
fake-cpu-meter:

internal/monitor/monitor.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,11 @@ type PowerMonitor struct {
4040
logger *slog.Logger
4141
cpu device.CPUPowerMeter
4242

43-
interval time.Duration
44-
clock clock.WithTicker
45-
maxStaleness time.Duration
46-
resources resource.Informer
43+
interval time.Duration
44+
clock clock.WithTicker
45+
maxStaleness time.Duration
46+
maxTerminated int
47+
resources resource.Informer
4748

4849
// signals when a snapshot has been updated
4950
dataCh chan struct{}
@@ -86,6 +87,7 @@ func NewPowerMonitor(meter device.CPUPowerMeter, applyOpts ...OptionFn) *PowerMo
8687
resources: opts.resources,
8788
dataCh: make(chan struct{}, 1),
8889
maxStaleness: opts.maxStaleness,
90+
maxTerminated: opts.maxTerminated,
8991
collectionCtx: ctx,
9092
collectionCancel: cancel,
9193
}

internal/monitor/options.go

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,25 @@ import (
1212
)
1313

1414
type Opts struct {
15-
logger *slog.Logger
16-
sysfsPath string
17-
interval time.Duration
18-
clock clock.WithTicker
19-
maxStaleness time.Duration
20-
resources resource.Informer
15+
logger *slog.Logger
16+
sysfsPath string
17+
interval time.Duration
18+
clock clock.WithTicker
19+
maxStaleness time.Duration
20+
maxTerminated int
21+
resources resource.Informer
2122
}
2223

2324
// NewConfig returns a new Config with defaults set
2425
func DefaultOpts() Opts {
2526
return Opts{
26-
logger: slog.Default(),
27-
sysfsPath: "/sys",
28-
interval: 5 * time.Second,
29-
clock: clock.RealClock{},
30-
maxStaleness: 500 * time.Millisecond,
31-
resources: nil,
27+
logger: slog.Default(),
28+
sysfsPath: "/sys",
29+
interval: 5 * time.Second,
30+
clock: clock.RealClock{},
31+
maxStaleness: 500 * time.Millisecond,
32+
maxTerminated: 500,
33+
resources: nil,
3234
}
3335
}
3436

@@ -69,3 +71,10 @@ func WithResourceInformer(r resource.Informer) OptionFn {
6971
o.resources = r
7072
}
7173
}
74+
75+
// WithMaxTerminated sets the maximum number of terminated workloads to keep in memory
76+
func WithMaxTerminated(max int) OptionFn {
77+
return func(o *Opts) {
78+
o.maxTerminated = max
79+
}
80+
}

manifests/k8s/configmap.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ data:
1717
monitor:
1818
interval: 5s
1919
staleness: 500ms
20+
maxTerminated: 100
2021
rapl:
2122
zones: []
2223
exporter:

0 commit comments

Comments
 (0)