Skip to content

Commit 8ce341c

Browse files
issue-141 - bug: Excessive Mem Usage/Freezing (#142)
1 parent c556de0 commit 8ce341c

File tree

6 files changed

+110
-41
lines changed

6 files changed

+110
-41
lines changed

metrics/prom/prom_source.go

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ type PromConfig struct {
4444
func DefaultPromConfig() *PromConfig {
4545
return &PromConfig{
4646
Enabled: true,
47-
ScrapeInterval: 5 * time.Second,
48-
RetentionTime: 1 * time.Hour,
49-
MaxSamples: 10000,
47+
ScrapeInterval: 4 * time.Second,
48+
RetentionTime: 5 * time.Minute,
49+
MaxSamples: 50,
5050
Components: []prom.ComponentType{
5151
prom.ComponentKubelet,
5252
prom.ComponentCAdvisor,
@@ -444,26 +444,26 @@ func (p *PromMetricsSource) GetMetricsForPod(ctx context.Context, pod metav1.Obj
444444

445445
// GetAllPodMetrics retrieves metrics for all pods
446446
func (p *PromMetricsSource) GetAllPodMetrics(ctx context.Context) ([]*metrics.PodMetrics, error) {
447+
// Check health and get store reference under lock
447448
p.mu.RLock()
448-
defer p.mu.RUnlock()
449-
450449
if !p.isHealthyLocked() {
450+
p.mu.RUnlock()
451451
return nil, fmt.Errorf("prometheus source is not healthy")
452452
}
453-
454453
if p.store == nil {
454+
p.mu.RUnlock()
455455
return nil, fmt.Errorf("metrics store not initialized")
456456
}
457+
store := p.store
458+
p.mu.RUnlock()
457459

458-
// Get all unique pod/namespace combinations from labels
459-
// This requires querying the store for label values
460-
namespaces := p.store.GetLabelValues("namespace")
461-
pods := p.store.GetLabelValues("pod")
460+
// Get label values (store has its own lock)
461+
namespaces := store.GetLabelValues("namespace")
462+
pods := store.GetLabelValues("pod")
462463

463464
var allPodMetrics []*metrics.PodMetrics
464465

465-
// This is a simplified implementation - in production would need better logic
466-
// to match pods with their namespaces
466+
// Call GetPodMetrics without holding p.mu to avoid deadlock
467467
for _, namespace := range namespaces {
468468
for _, pod := range pods {
469469
if podMetrics, err := p.GetPodMetrics(ctx, namespace, pod); err == nil {
@@ -539,12 +539,8 @@ func (p *PromMetricsSource) SetHealthCallback(callback func(healthy bool, info m
539539
}
540540

541541
// buildSourceInfoLocked builds SourceInfo while holding the lock.
542-
// Must be called with p.mu held.
543-
func (p *PromMetricsSource) buildSourceInfoLocked(healthy bool) metrics.SourceInfo {
544-
metricCount := 0
545-
if p.store != nil {
546-
metricCount = len(p.store.GetMetricNames())
547-
}
542+
// metricCount must be obtained before acquiring lock to avoid nested locking.
543+
func (p *PromMetricsSource) buildSourceInfoLocked(healthy bool, metricCount int) metrics.SourceInfo {
548544
return metrics.SourceInfo{
549545
Type: metrics.SourceTypePrometheus,
550546
Version: "v1.0.0",
@@ -557,6 +553,12 @@ func (p *PromMetricsSource) buildSourceInfoLocked(healthy bool) metrics.SourceIn
557553

558554
// handleError is called when an error occurs during metrics collection
559555
func (p *PromMetricsSource) handleError(component prom.ComponentType, err error) {
556+
// Get metric count BEFORE acquiring lock to avoid nested locking
557+
metricCount := 0
558+
if p.store != nil {
559+
metricCount = len(p.store.GetMetricNames())
560+
}
561+
560562
var shouldNotify bool
561563
var info metrics.SourceInfo
562564

@@ -572,7 +574,7 @@ func (p *PromMetricsSource) handleError(component prom.ComponentType, err error)
572574
// Check if we need to notify (overall state changed from healthy to unhealthy)
573575
if wasHealthy && !nowHealthy {
574576
shouldNotify = true
575-
info = p.buildSourceInfoLocked(false)
577+
info = p.buildSourceInfoLocked(false, metricCount)
576578
}
577579
p.mu.Unlock()
578580

@@ -584,6 +586,12 @@ func (p *PromMetricsSource) handleError(component prom.ComponentType, err error)
584586

585587
// handleMetricsCollected is called when metrics are successfully collected
586588
func (p *PromMetricsSource) handleMetricsCollected(component prom.ComponentType, collectedMetrics *prom.ScrapedMetrics) {
589+
// Get metric count BEFORE acquiring lock to avoid nested locking
590+
metricCount := 0
591+
if p.store != nil {
592+
metricCount = len(p.store.GetMetricNames())
593+
}
594+
587595
var shouldNotify bool
588596
var info metrics.SourceInfo
589597

@@ -604,7 +612,7 @@ func (p *PromMetricsSource) handleMetricsCollected(component prom.ComponentType,
604612
// Check if we need to notify (overall state changed from unhealthy to healthy)
605613
if !wasHealthy && nowHealthy {
606614
shouldNotify = true
607-
info = p.buildSourceInfoLocked(true)
615+
info = p.buildSourceInfoLocked(true, metricCount)
608616
}
609617
p.mu.Unlock()
610618

metrics/prom/prom_source_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -204,16 +204,16 @@ func TestNewPromMetricsSource_WithNilConfig(t *testing.T) {
204204
}
205205

206206
// Verify default config values
207-
if source.config.ScrapeInterval != 5*time.Second {
208-
t.Errorf("Expected default scrape interval 5s, got %v", source.config.ScrapeInterval)
207+
if source.config.ScrapeInterval != 4*time.Second {
208+
t.Errorf("Expected default scrape interval 4s, got %v", source.config.ScrapeInterval)
209209
}
210210

211-
if source.config.RetentionTime != 1*time.Hour {
212-
t.Errorf("Expected default retention time 1h, got %v", source.config.RetentionTime)
211+
if source.config.RetentionTime != 5*time.Minute {
212+
t.Errorf("Expected default retention time 5m, got %v", source.config.RetentionTime)
213213
}
214214

215-
if source.config.MaxSamples != 10000 {
216-
t.Errorf("Expected default max samples 10000, got %d", source.config.MaxSamples)
215+
if source.config.MaxSamples != 50 {
216+
t.Errorf("Expected default max samples 50, got %d", source.config.MaxSamples)
217217
}
218218
}
219219

@@ -224,16 +224,16 @@ func TestDefaultPromConfig(t *testing.T) {
224224
t.Error("Expected Enabled to be true")
225225
}
226226

227-
if config.ScrapeInterval != 5*time.Second {
228-
t.Errorf("Expected ScrapeInterval 5s, got %v", config.ScrapeInterval)
227+
if config.ScrapeInterval != 4*time.Second {
228+
t.Errorf("Expected ScrapeInterval 4s, got %v", config.ScrapeInterval)
229229
}
230230

231-
if config.RetentionTime != 1*time.Hour {
232-
t.Errorf("Expected RetentionTime 1h, got %v", config.RetentionTime)
231+
if config.RetentionTime != 5*time.Minute {
232+
t.Errorf("Expected RetentionTime 5m, got %v", config.RetentionTime)
233233
}
234234

235-
if config.MaxSamples != 10000 {
236-
t.Errorf("Expected MaxSamples 10000, got %d", config.MaxSamples)
235+
if config.MaxSamples != 50 {
236+
t.Errorf("Expected MaxSamples 50, got %d", config.MaxSamples)
237237
}
238238

239239
// Check default components

prom/controller.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,9 @@ func (cc *CollectorController) runCollector(ctx context.Context) {
138138
case <-ctx.Done():
139139
return
140140
case <-ticker.C:
141-
cc.collectFromAllComponents(ctx)
141+
cycleCtx, cancel := context.WithTimeout(ctx, cc.config.Timeout)
142+
cc.collectFromAllComponents(cycleCtx)
143+
cancel()
142144
}
143145
}
144146
}

prom/scraper.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,9 +437,12 @@ func (ks *KubernetesScraper) scrapeTarget(ctx context.Context, target *ScrapeTar
437437
}, err
438438
}
439439

440-
// Convert to our internal format
440+
// Convert to our internal format, filtering to only allowed metrics
441441
metricFamilies := make(map[string]*MetricFamily)
442442
for name, family := range families {
443+
if !DefaultMetricAllowlist[name] {
444+
continue
445+
}
443446
metricFamily := ks.convertMetricFamily(name, family)
444447
metricFamilies[name] = metricFamily
445448
}

prom/storage.go

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"strings"
77
"sync"
88
"time"
9+
"unique"
910

1011
"github.com/prometheus/prometheus/model/labels"
1112
)
@@ -22,6 +23,10 @@ type InMemoryStore struct {
2223
labelNames map[string]bool
2324
labelValues map[string]map[string]bool // labelName -> values
2425

26+
// String interning - keeps handles alive to retain canonical strings
27+
stringHandles map[string]unique.Handle[string]
28+
handlesMu sync.RWMutex
29+
2530
// Configuration
2631
maxSamples int
2732
retentionTime time.Duration
@@ -39,12 +44,45 @@ func NewInMemoryStore(config *ScrapeConfig) *InMemoryStore {
3944
metricNames: make(map[string]bool),
4045
labelNames: make(map[string]bool),
4146
labelValues: make(map[string]map[string]bool),
47+
stringHandles: make(map[string]unique.Handle[string]),
4248
maxSamples: config.MaxSamples,
4349
retentionTime: config.RetentionTime,
4450
lastCleanup: time.Now(),
4551
}
4652
}
4753

54+
// InternString returns the canonical version of a string.
55+
// Uses Go's unique package to deduplicate repeated strings.
56+
func (store *InMemoryStore) InternString(s string) string {
57+
store.handlesMu.RLock()
58+
if h, ok := store.stringHandles[s]; ok {
59+
store.handlesMu.RUnlock()
60+
return h.Value()
61+
}
62+
store.handlesMu.RUnlock()
63+
64+
store.handlesMu.Lock()
65+
defer store.handlesMu.Unlock()
66+
if h, ok := store.stringHandles[s]; ok {
67+
return h.Value()
68+
}
69+
h := unique.Make(s)
70+
store.stringHandles[s] = h
71+
return h.Value()
72+
}
73+
74+
// internLabels creates a new labels.Labels with all strings interned.
75+
func (store *InMemoryStore) internLabels(lbls labels.Labels) labels.Labels {
76+
interned := make(labels.Labels, len(lbls))
77+
for i, l := range lbls {
78+
interned[i] = labels.Label{
79+
Name: store.InternString(l.Name),
80+
Value: store.InternString(l.Value),
81+
}
82+
}
83+
return interned
84+
}
85+
4886
// AddMetrics stores scraped metrics in the store
4987
func (store *InMemoryStore) AddMetrics(metrics *ScrapedMetrics) error {
5088
if metrics.Error != nil {
@@ -55,6 +93,9 @@ func (store *InMemoryStore) AddMetrics(metrics *ScrapedMetrics) error {
5593
defer store.mutex.Unlock()
5694

5795
for metricName, family := range metrics.Families {
96+
// Intern metric name for consistent memory usage
97+
metricName = store.InternString(metricName)
98+
5899
// Ensure metric exists in index
59100
store.metricNames[metricName] = true
60101

@@ -67,21 +108,23 @@ func (store *InMemoryStore) AddMetrics(metrics *ScrapedMetrics) error {
67108
for _, ts := range family.TimeSeries {
68109
seriesKey := ts.Labels.String()
69110

70-
// Update label indexes
111+
// Update label indexes with interned strings
71112
for _, label := range ts.Labels {
72-
store.labelNames[label.Name] = true
73-
if store.labelValues[label.Name] == nil {
74-
store.labelValues[label.Name] = make(map[string]bool)
113+
name := store.InternString(label.Name)
114+
value := store.InternString(label.Value)
115+
store.labelNames[name] = true
116+
if store.labelValues[name] == nil {
117+
store.labelValues[name] = make(map[string]bool)
75118
}
76-
store.labelValues[label.Name][label.Value] = true
119+
store.labelValues[name][value] = true
77120
}
78121

79122
// Get or create time series
80123
existingSeries, exists := store.series[metricName][seriesKey]
81124
if !exists {
82-
// Create new series with ring buffer
125+
// Create new series with ring buffer and interned labels
83126
existingSeries = &TimeSeries{
84-
Labels: ts.Labels,
127+
Labels: store.internLabels(ts.Labels),
85128
Samples: NewRingBuffer[MetricSample](store.maxSamples),
86129
}
87130
store.series[metricName][seriesKey] = existingSeries

prom/types.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,19 @@ const (
2323
ComponentKubeProxy ComponentType = "kube-proxy"
2424
)
2525

26+
// DefaultMetricAllowlist contains only the metrics ktop actually uses.
27+
// All other metrics are filtered out during scraping to reduce memory usage.
28+
var DefaultMetricAllowlist = map[string]bool{
29+
"container_cpu_usage_seconds_total": true,
30+
"container_memory_working_set_bytes": true,
31+
"container_network_receive_bytes_total": true,
32+
"container_network_transmit_bytes_total": true,
33+
"container_fs_reads_bytes_total": true,
34+
"container_fs_writes_bytes_total": true,
35+
"kubelet_running_pods": true,
36+
"container_count": true,
37+
}
38+
2639
// MetricSample represents a single metric data point
2740
type MetricSample struct {
2841
Timestamp int64

0 commit comments

Comments
 (0)