Skip to content

Commit a02e35a

Browse files
committed
Count live Pods per VPA
1 parent 7b7d704 commit a02e35a

File tree

5 files changed

+96
-42
lines changed

5 files changed

+96
-42
lines changed

vertical-pod-autoscaler/pkg/recommender/model/cluster.go

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,8 @@ type ClusterState struct {
4646
// time we've noticed the recommendation missing or last time we logged
4747
// a warning about it.
4848
EmptyVPAs map[VpaID]time.Time
49-
// VpasWithMatchingPods contains information if there exist live pods that
50-
// this VPAs selector matches.
51-
VpasWithMatchingPods map[VpaID]bool
49+
// VpaPodCount contains number of live Pods matching a given VPA object.
50+
VpaPodCount map[VpaID]int
5251
// Observed VPAs. Used to check if there are updates needed.
5352
ObservedVpas []*vpa_types.VerticalPodAutoscaler
5453

@@ -97,12 +96,12 @@ type PodState struct {
9796
// NewClusterState returns a new ClusterState with no pods.
9897
func NewClusterState() *ClusterState {
9998
return &ClusterState{
100-
Pods: make(map[PodID]*PodState),
101-
Vpas: make(map[VpaID]*Vpa),
102-
EmptyVPAs: make(map[VpaID]time.Time),
103-
VpasWithMatchingPods: make(map[VpaID]bool),
104-
aggregateStateMap: make(aggregateContainerStatesMap),
105-
labelSetMap: make(labelSetMap),
99+
Pods: make(map[PodID]*PodState),
100+
Vpas: make(map[VpaID]*Vpa),
101+
EmptyVPAs: make(map[VpaID]time.Time),
102+
VpaPodCount: make(map[VpaID]int),
103+
aggregateStateMap: make(aggregateContainerStatesMap),
104+
labelSetMap: make(labelSetMap),
106105
}
107106
}
108107

@@ -124,18 +123,44 @@ func (cluster *ClusterState) AddOrUpdatePod(podID PodID, newLabels labels.Set, p
124123
pod = newPod(podID)
125124
cluster.Pods[podID] = pod
126125
}
126+
127127
newlabelSetKey := cluster.getLabelSetKey(newLabels)
128+
if podExists && pod.labelSetKey != newlabelSetKey {
129+
// This Pod is already counted in the old VPA, remove the link.
130+
cluster.removePodFromItsVpa(pod)
131+
}
128132
if !podExists || pod.labelSetKey != newlabelSetKey {
129133
pod.labelSetKey = newlabelSetKey
130134
// Set the links between the containers and aggregations based on the current pod labels.
131135
for containerName, container := range pod.Containers {
132136
containerID := ContainerID{PodID: podID, ContainerName: containerName}
133137
container.aggregator = cluster.findOrCreateAggregateContainerState(containerID)
134138
}
139+
140+
cluster.addPodToItsVpa(pod)
135141
}
136142
pod.Phase = phase
137143
}
138144

145+
// addPodToItsVpa increases the count of Pods associated with a VPA object.
146+
// Does a scan similar to findOrCreateAggregateContainerState so could be optimized if needed.
147+
func (cluster *ClusterState) addPodToItsVpa(pod *PodState) {
148+
for _, vpa := range cluster.Vpas {
149+
if vpa_utils.PodLabelsMatchVPA(pod.ID.Namespace, cluster.labelSetMap[pod.labelSetKey], vpa.ID.Namespace, vpa.PodSelector) {
150+
cluster.VpaPodCount[vpa.ID]++
151+
}
152+
}
153+
}
154+
155+
// removePodFromItsVpa decreases the count of Pods associated with a VPA object.
156+
func (cluster *ClusterState) removePodFromItsVpa(pod *PodState) {
157+
for _, vpa := range cluster.Vpas {
158+
if vpa_utils.PodLabelsMatchVPA(pod.ID.Namespace, cluster.labelSetMap[pod.labelSetKey], vpa.ID.Namespace, vpa.PodSelector) {
159+
cluster.VpaPodCount[vpa.ID]--
160+
}
161+
}
162+
}
163+
139164
// GetContainer returns the ContainerState object for a given ContainerID or
140165
// null if it's not present in the model.
141166
func (cluster *ClusterState) GetContainer(containerID ContainerID) *ContainerState {
@@ -151,6 +176,10 @@ func (cluster *ClusterState) GetContainer(containerID ContainerID) *ContainerSta
151176

152177
// DeletePod removes an existing pod from the cluster.
153178
func (cluster *ClusterState) DeletePod(podID PodID) {
179+
pod, found := cluster.Pods[podID]
180+
if found {
181+
cluster.removePodFromItsVpa(pod)
182+
}
154183
delete(cluster.Pods, podID)
155184
}
156185

@@ -237,10 +266,9 @@ func (cluster *ClusterState) AddOrUpdateVpa(apiObject *vpa_types.VerticalPodAuto
237266
vpa = NewVpa(vpaID, selector, apiObject.CreationTimestamp.Time)
238267
cluster.Vpas[vpaID] = vpa
239268
for aggregationKey, aggregation := range cluster.aggregateStateMap {
240-
if vpa.UseAggregationIfMatching(aggregationKey, aggregation) {
241-
cluster.VpasWithMatchingPods[vpa.ID] = true
242-
}
269+
vpa.UseAggregationIfMatching(aggregationKey, aggregation)
243270
}
271+
cluster.VpaPodCount[vpaID] = len(cluster.GetMatchingPods(vpa))
244272
}
245273
vpa.TargetRef = apiObject.Spec.TargetRef
246274
vpa.Annotations = annotationsMap
@@ -262,7 +290,7 @@ func (cluster *ClusterState) DeleteVpa(vpaID VpaID) error {
262290
}
263291
delete(cluster.Vpas, vpaID)
264292
delete(cluster.EmptyVPAs, vpaID)
265-
delete(cluster.VpasWithMatchingPods, vpaID)
293+
delete(cluster.VpaPodCount, vpaID)
266294
return nil
267295
}
268296

@@ -313,9 +341,7 @@ func (cluster *ClusterState) findOrCreateAggregateContainerState(containerID Con
313341
cluster.aggregateStateMap[aggregateStateKey] = aggregateContainerState
314342
// Link the new aggregation to the existing VPAs.
315343
for _, vpa := range cluster.Vpas {
316-
if vpa.UseAggregationIfMatching(aggregateStateKey, aggregateContainerState) {
317-
cluster.VpasWithMatchingPods[vpa.ID] = true
318-
}
344+
vpa.UseAggregationIfMatching(aggregateStateKey, aggregateContainerState)
319345
}
320346
}
321347
return aggregateContainerState

vertical-pod-autoscaler/pkg/recommender/model/cluster_test.go

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ import (
3232

3333
var (
3434
testPodID = PodID{"namespace-1", "pod-1"}
35+
testPodID3 = PodID{"namespace-1", "pod-3"}
36+
testPodID4 = PodID{"namespace-1", "pod-4"}
3537
testContainerID = ContainerID{testPodID, "container-1"}
3638
testVpaID = VpaID{"namespace-1", "vpa-1"}
3739
testAnnotations = vpaAnnotationsMap{"key-1": "value-1"}
@@ -701,13 +703,13 @@ func TestVPAWithMatchingPods(t *testing.T) {
701703
name string
702704
vpaSelector string
703705
pods []podDesc
704-
expectedMatch bool
706+
expectedMatch int
705707
}{
706708
{
707709
name: "No pods",
708710
vpaSelector: testSelectorStr,
709711
pods: []podDesc{},
710-
expectedMatch: false,
712+
expectedMatch: 0,
711713
},
712714
{
713715
name: "VPA with matching pod",
@@ -719,7 +721,7 @@ func TestVPAWithMatchingPods(t *testing.T) {
719721
apiv1.PodRunning,
720722
},
721723
},
722-
expectedMatch: true,
724+
expectedMatch: 1,
723725
},
724726
{
725727
name: "No matching pod",
@@ -731,19 +733,55 @@ func TestVPAWithMatchingPods(t *testing.T) {
731733
apiv1.PodRunning,
732734
},
733735
},
734-
expectedMatch: false,
736+
expectedMatch: 0,
737+
},
738+
{
739+
name: "VPA with 2 matching pods, 1 not matching",
740+
vpaSelector: testSelectorStr,
741+
pods: []podDesc{
742+
{
743+
testPodID,
744+
emptyLabels, // does not match VPA
745+
apiv1.PodRunning,
746+
},
747+
{
748+
testPodID3,
749+
testLabels,
750+
apiv1.PodRunning,
751+
},
752+
{
753+
testPodID4,
754+
testLabels,
755+
apiv1.PodRunning,
756+
},
757+
},
758+
expectedMatch: 2,
735759
},
736760
}
761+
// Run with adding VPA first
737762
for _, tc := range cases {
738-
t.Run(tc.name, func(t *testing.T) {
763+
t.Run(tc.name+", VPA first", func(t *testing.T) {
739764
cluster := NewClusterState()
740765
vpa := addVpa(cluster, testVpaID, testAnnotations, tc.vpaSelector)
741766
for _, podDesc := range tc.pods {
742767
cluster.AddOrUpdatePod(podDesc.id, podDesc.labels, podDesc.phase)
743768
containerID := ContainerID{testPodID, "foo"}
744769
assert.NoError(t, cluster.AddOrUpdateContainer(containerID, testRequest))
745770
}
746-
assert.Equal(t, tc.expectedMatch, cluster.VpasWithMatchingPods[vpa.ID])
771+
assert.Equal(t, tc.expectedMatch, cluster.VpaPodCount[vpa.ID])
772+
})
773+
}
774+
// Run with adding Pods first
775+
for _, tc := range cases {
776+
t.Run(tc.name+", Pods first", func(t *testing.T) {
777+
cluster := NewClusterState()
778+
for _, podDesc := range tc.pods {
779+
cluster.AddOrUpdatePod(podDesc.id, podDesc.labels, podDesc.phase)
780+
containerID := ContainerID{testPodID, "foo"}
781+
assert.NoError(t, cluster.AddOrUpdateContainer(containerID, testRequest))
782+
}
783+
vpa := addVpa(cluster, testVpaID, testAnnotations, tc.vpaSelector)
784+
assert.Equal(t, tc.expectedMatch, cluster.VpaPodCount[vpa.ID])
747785
})
748786
}
749787
}

vertical-pod-autoscaler/pkg/recommender/model/vpa.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,20 +126,18 @@ func NewVpa(id VpaID, selector labels.Selector, created time.Time) *Vpa {
126126
}
127127

128128
// UseAggregationIfMatching checks if the given aggregation matches (contributes to) this VPA
129-
// and adds it to the set of VPA's aggregations if that is the case. Returns true
130-
// if the aggregation matches VPA.
131-
func (vpa *Vpa) UseAggregationIfMatching(aggregationKey AggregateStateKey, aggregation *AggregateContainerState) bool {
129+
// and adds it to the set of VPA's aggregations if that is the case.
130+
func (vpa *Vpa) UseAggregationIfMatching(aggregationKey AggregateStateKey, aggregation *AggregateContainerState) {
132131
if vpa.UsesAggregation(aggregationKey) {
133-
return true
132+
// Already linked, we can return quickly.
133+
return
134134
}
135135
if vpa.matchesAggregation(aggregationKey) {
136136
vpa.aggregateContainerStates[aggregationKey] = aggregation
137137
aggregation.IsUnderVPA = true
138138
aggregation.UpdateMode = vpa.UpdateMode
139139
aggregation.UpdateFromPolicy(vpa_api_util.GetContainerResourcePolicy(aggregationKey.ContainerName(), vpa.ResourcePolicy))
140-
return true
141140
}
142-
return false
143141
}
144142

145143
// UpdateRecommendation updates the recommended resources in the VPA and its

vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,6 @@ func TestUseAggregationIfMatching(t *testing.T) {
231231
updateMode *vpa_types.UpdateMode
232232
container string
233233
containerLabels map[string]string
234-
expectedMatching bool
235234
expectedUpdateMode *vpa_types.UpdateMode
236235
expectedNeedsRecommendation map[string]bool
237236
}{
@@ -242,7 +241,6 @@ func TestUseAggregationIfMatching(t *testing.T) {
242241
updateMode: &modeOff,
243242
container: "test-container",
244243
containerLabels: testLabels,
245-
expectedMatching: true,
246244
expectedNeedsRecommendation: map[string]bool{"test-container": true},
247245
expectedUpdateMode: &modeOff,
248246
}, {
@@ -252,7 +250,6 @@ func TestUseAggregationIfMatching(t *testing.T) {
252250
updateMode: &modeAuto,
253251
container: "second-container",
254252
containerLabels: testLabels,
255-
expectedMatching: true,
256253
expectedNeedsRecommendation: map[string]bool{"test-container": true, "second-container": true},
257254
expectedUpdateMode: &modeAuto,
258255
}, {
@@ -262,7 +259,6 @@ func TestUseAggregationIfMatching(t *testing.T) {
262259
updateMode: &modeOff,
263260
container: "test-container",
264261
containerLabels: testLabels,
265-
expectedMatching: true,
266262
expectedNeedsRecommendation: map[string]bool{"test-container": true},
267263
expectedUpdateMode: &modeOff,
268264
}, {
@@ -272,7 +268,6 @@ func TestUseAggregationIfMatching(t *testing.T) {
272268
updateMode: &modeAuto,
273269
container: "second-container",
274270
containerLabels: map[string]string{"different": "labels"},
275-
expectedMatching: false,
276271
expectedNeedsRecommendation: map[string]bool{"test-container": true},
277272
expectedUpdateMode: nil,
278273
}, {
@@ -290,7 +285,6 @@ func TestUseAggregationIfMatching(t *testing.T) {
290285
updateMode: &modeAuto,
291286
container: "second-container",
292287
containerLabels: testLabels,
293-
expectedMatching: true,
294288
expectedNeedsRecommendation: map[string]bool{"second-container": false, "test-container": true},
295289
expectedUpdateMode: &modeAuto,
296290
}, {
@@ -308,7 +302,6 @@ func TestUseAggregationIfMatching(t *testing.T) {
308302
updateMode: &modeAuto,
309303
container: "second-container",
310304
containerLabels: testLabels,
311-
expectedMatching: true,
312305
expectedNeedsRecommendation: map[string]bool{"second-container": false, "test-container": false},
313306
expectedUpdateMode: &modeAuto,
314307
}, {
@@ -330,7 +323,6 @@ func TestUseAggregationIfMatching(t *testing.T) {
330323
updateMode: &modeAuto,
331324
container: "second-container",
332325
containerLabels: testLabels,
333-
expectedMatching: true,
334326
expectedNeedsRecommendation: map[string]bool{"second-container": false, "test-container": true},
335327
expectedUpdateMode: &modeAuto,
336328
}, {
@@ -352,7 +344,6 @@ func TestUseAggregationIfMatching(t *testing.T) {
352344
updateMode: &modeAuto,
353345
container: "second-container",
354346
containerLabels: testLabels,
355-
expectedMatching: true,
356347
expectedNeedsRecommendation: map[string]bool{"second-container": true, "test-container": false},
357348
expectedUpdateMode: &modeAuto,
358349
},
@@ -381,8 +372,7 @@ func TestUseAggregationIfMatching(t *testing.T) {
381372
}
382373
vpa.SetResourcePolicy(tc.resourcePolicy)
383374

384-
matching := vpa.UseAggregationIfMatching(key, aggregationUnderTest)
385-
assert.Equal(t, tc.expectedMatching, matching, "Unexpected assessment of aggregation matching")
375+
vpa.UseAggregationIfMatching(key, aggregationUnderTest)
386376
assert.Len(t, vpa.aggregateContainerStates, len(tc.expectedNeedsRecommendation), "AggregateContainerStates has unexpected size")
387377
for container, expectedNeedsRecommendation := range tc.expectedNeedsRecommendation {
388378
found := false

vertical-pod-autoscaler/pkg/recommender/routines/recommender.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,17 +103,19 @@ func (r *recommender) UpdateVPAs() {
103103
if vpa.HasRecommendation() && !had {
104104
metrics_recommender.ObserveRecommendationLatency(vpa.Created)
105105
}
106-
hasMatchingPods := r.clusterState.VpasWithMatchingPods[vpa.ID]
106+
hasMatchingPods := r.clusterState.VpaPodCount[vpa.ID] > 0
107107
vpa.UpdateConditions(hasMatchingPods)
108108
if err := r.clusterState.RecordRecommendation(vpa, time.Now()); err != nil {
109109
klog.Warningf("%v", err)
110110
klog.V(4).Infof("VPA dump")
111111
klog.V(4).Infof("%+v", vpa)
112112
klog.V(4).Infof("HasMatchingPods: %v", hasMatchingPods)
113+
podCount := r.clusterState.VpaPodCount[vpa.ID]
114+
klog.V(4).Infof("VpaPodCount: %v", podCount)
113115
pods := r.clusterState.GetMatchingPods(vpa)
114116
klog.V(4).Infof("MatchingPods: %+v", pods)
115-
if len(pods) > 0 != hasMatchingPods {
116-
klog.Errorf("Aggregated states and matching pods disagree for vpa %v/%v", vpa.ID.Namespace, vpa.ID.VpaName)
117+
if len(pods) != podCount {
118+
klog.Errorf("ClusterState pod count and matching pods disagree for vpa %v/%v", vpa.ID.Namespace, vpa.ID.VpaName)
117119
}
118120
}
119121
cnt.Add(vpa)

0 commit comments

Comments
 (0)