Skip to content

Commit 8251159

Browse files
authored
Merge pull request #7891 from voelzmo/enh/drop-metrics-for-init-containers
Drop metrics for init containers
2 parents 94ae175 + 9043687 commit 8251159

File tree

13 files changed

+406
-147
lines changed

13 files changed

+406
-147
lines changed

vertical-pod-autoscaler/pkg/recommender/checkpoint/checkpoint_writer.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,11 @@ type CheckpointWriter interface {
4242

4343
type checkpointWriter struct {
4444
vpaCheckpointClient vpa_api.VerticalPodAutoscalerCheckpointsGetter
45-
cluster *model.ClusterState
45+
cluster model.ClusterState
4646
}
4747

4848
// NewCheckpointWriter returns new instance of a CheckpointWriter
49-
func NewCheckpointWriter(cluster *model.ClusterState, vpaCheckpointClient vpa_api.VerticalPodAutoscalerCheckpointsGetter) CheckpointWriter {
49+
func NewCheckpointWriter(cluster model.ClusterState, vpaCheckpointClient vpa_api.VerticalPodAutoscalerCheckpointsGetter) CheckpointWriter {
5050
return &checkpointWriter{
5151
vpaCheckpointClient: vpaCheckpointClient,
5252
cluster: cluster,
@@ -77,7 +77,7 @@ func getVpasToCheckpoint(clusterVpas map[model.VpaID]*model.Vpa) []*model.Vpa {
7777
}
7878

7979
func (writer *checkpointWriter) StoreCheckpoints(ctx context.Context, now time.Time, minCheckpoints int) error {
80-
vpas := getVpasToCheckpoint(writer.cluster.Vpas)
80+
vpas := getVpasToCheckpoint(writer.cluster.VPAs())
8181
for _, vpa := range vpas {
8282

8383
// Draining ctx.Done() channel. ctx.Err() will be checked if timeout occurred, but minCheckpoints have
@@ -123,13 +123,13 @@ func (writer *checkpointWriter) StoreCheckpoints(ctx context.Context, now time.T
123123
// Build the AggregateContainerState for the purpose of the checkpoint. This is an aggregation of state of all
124124
// containers that belong to pods matched by the VPA.
125125
// Note however that we exclude the most recent memory peak for each container (see below).
126-
func buildAggregateContainerStateMap(vpa *model.Vpa, cluster *model.ClusterState, now time.Time) map[string]*model.AggregateContainerState {
126+
func buildAggregateContainerStateMap(vpa *model.Vpa, cluster model.ClusterState, now time.Time) map[string]*model.AggregateContainerState {
127127
aggregateContainerStateMap := vpa.AggregateStateByContainerName()
128128
// Note: the memory peak from the current (ongoing) aggregation interval is not included in the
129129
// checkpoint to avoid having multiple peaks in the same interval after the state is restored from
130130
// the checkpoint. Therefore we are extracting the current peak from all containers.
131131
// TODO: Avoid the nested loop over all containers for each VPA.
132-
for _, pod := range cluster.Pods {
132+
for _, pod := range cluster.Pods() {
133133
for containerName, container := range pod.Containers {
134134
aggregateKey := cluster.MakeAggregateStateKey(pod, containerName)
135135
if vpa.UsesAggregation(aggregateKey) {

vertical-pod-autoscaler/pkg/recommender/checkpoint/checkpoint_writer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ var (
5454

5555
const testGcPeriod = time.Minute
5656

57-
func addVpa(t *testing.T, cluster *model.ClusterState, vpaID model.VpaID, selector string) *model.Vpa {
57+
func addVpa(t *testing.T, cluster model.ClusterState, vpaID model.VpaID, selector string) *model.Vpa {
5858
var apiObject vpa_types.VerticalPodAutoscaler
5959
apiObject.Namespace = vpaID.Namespace
6060
apiObject.Name = vpaID.VpaName
@@ -64,7 +64,7 @@ func addVpa(t *testing.T, cluster *model.ClusterState, vpaID model.VpaID, select
6464
if err != nil {
6565
t.Fatalf("AddOrUpdateVpa() failed: %v", err)
6666
}
67-
return cluster.Vpas[vpaID]
67+
return cluster.VPAs()[vpaID]
6868
}
6969

7070
func TestMergeContainerStateForCheckpointDropsRecentMemoryPeak(t *testing.T) {

vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ const (
5555
DefaultRecommenderName = "default"
5656
)
5757

58-
// ClusterStateFeeder can update state of ClusterState object.
58+
// ClusterStateFeeder can update state of clusterState object.
5959
type ClusterStateFeeder interface {
6060
// InitFromHistoryProvider loads historical pod spec into clusterState.
6161
InitFromHistoryProvider(historyProvider history.HistoryProvider)
@@ -78,7 +78,7 @@ type ClusterStateFeeder interface {
7878

7979
// ClusterStateFeederFactory makes instances of ClusterStateFeeder.
8080
type ClusterStateFeederFactory struct {
81-
ClusterState *model.ClusterState
81+
ClusterState model.ClusterState
8282
KubeClient kube_client.Interface
8383
MetricsClient metrics.MetricsClient
8484
VpaCheckpointClient vpa_api.VerticalPodAutoscalerCheckpointsGetter
@@ -202,7 +202,7 @@ type clusterStateFeeder struct {
202202
oomChan <-chan oom.OomInfo
203203
vpaCheckpointClient vpa_api.VerticalPodAutoscalerCheckpointsGetter
204204
vpaLister vpa_lister.VerticalPodAutoscalerLister
205-
clusterState *model.ClusterState
205+
clusterState model.ClusterState
206206
selectorFetcher target.VpaTargetSelectorFetcher
207207
memorySaveMode bool
208208
controllerFetcher controllerfetcher.ControllerFetcher
@@ -244,7 +244,7 @@ func (feeder *clusterStateFeeder) InitFromHistoryProvider(historyProvider histor
244244

245245
func (feeder *clusterStateFeeder) setVpaCheckpoint(checkpoint *vpa_types.VerticalPodAutoscalerCheckpoint) error {
246246
vpaID := model.VpaID{Namespace: checkpoint.Namespace, VpaName: checkpoint.Spec.VPAObjectName}
247-
vpa, exists := feeder.clusterState.Vpas[vpaID]
247+
vpa, exists := feeder.clusterState.VPAs()[vpaID]
248248
if !exists {
249249
return fmt.Errorf("cannot load checkpoint to missing VPA object %s/%s", vpaID.Namespace, vpaID.VpaName)
250250
}
@@ -263,7 +263,7 @@ func (feeder *clusterStateFeeder) InitFromCheckpoints() {
263263
feeder.LoadVPAs(context.TODO())
264264

265265
namespaces := make(map[string]bool)
266-
for _, v := range feeder.clusterState.Vpas {
266+
for _, v := range feeder.clusterState.VPAs() {
267267
namespaces[v.ID.Namespace] = true
268268
}
269269

@@ -431,23 +431,23 @@ func (feeder *clusterStateFeeder) LoadVPAs(ctx context.Context) {
431431

432432
for _, condition := range conditions {
433433
if condition.delete {
434-
delete(feeder.clusterState.Vpas[vpaID].Conditions, condition.conditionType)
434+
delete(feeder.clusterState.VPAs()[vpaID].Conditions, condition.conditionType)
435435
} else {
436-
feeder.clusterState.Vpas[vpaID].Conditions.Set(condition.conditionType, true, "", condition.message)
436+
feeder.clusterState.VPAs()[vpaID].Conditions.Set(condition.conditionType, true, "", condition.message)
437437
}
438438
}
439439
}
440440
}
441441
// Delete non-existent VPAs from the model.
442-
for vpaID := range feeder.clusterState.Vpas {
442+
for vpaID := range feeder.clusterState.VPAs() {
443443
if _, exists := vpaKeys[vpaID]; !exists {
444444
klog.V(3).InfoS("Deleting VPA", "vpa", klog.KRef(vpaID.Namespace, vpaID.VpaName))
445445
if err := feeder.clusterState.DeleteVpa(vpaID); err != nil {
446446
klog.ErrorS(err, "Deleting VPA failed", "vpa", klog.KRef(vpaID.Namespace, vpaID.VpaName))
447447
}
448448
}
449449
}
450-
feeder.clusterState.ObservedVpas = vpaCRDs
450+
feeder.clusterState.SetObservedVPAs(vpaCRDs)
451451
}
452452

453453
// LoadPods loads pod into the cluster state.
@@ -460,7 +460,7 @@ func (feeder *clusterStateFeeder) LoadPods() {
460460
for _, spec := range podSpecs {
461461
pods[spec.ID] = spec
462462
}
463-
for key := range feeder.clusterState.Pods {
463+
for key := range feeder.clusterState.Pods() {
464464
if _, exists := pods[key]; !exists {
465465
klog.V(3).InfoS("Deleting Pod", "pod", klog.KRef(key.Namespace, key.PodName))
466466
feeder.clusterState.DeletePod(key)
@@ -476,6 +476,11 @@ func (feeder *clusterStateFeeder) LoadPods() {
476476
klog.V(0).InfoS("Failed to add container", "container", container.ID, "error", err)
477477
}
478478
}
479+
for _, initContainer := range pod.InitContainers {
480+
podInitContainers := feeder.clusterState.Pods()[pod.ID].InitContainers
481+
feeder.clusterState.Pods()[pod.ID].InitContainers = append(podInitContainers, initContainer.ID.ContainerName)
482+
483+
}
479484
}
480485
}
481486

@@ -488,6 +493,12 @@ func (feeder *clusterStateFeeder) LoadRealTimeMetrics() {
488493
sampleCount := 0
489494
droppedSampleCount := 0
490495
for _, containerMetrics := range containersMetrics {
496+
podInitContainers := feeder.clusterState.Pods()[containerMetrics.ID.PodID].InitContainers
497+
if slices.Contains(podInitContainers, containerMetrics.ID.ContainerName) {
498+
klog.V(3).InfoS("Skipping metric samples for init container", "pod", klog.KRef(containerMetrics.ID.PodID.Namespace, containerMetrics.ID.PodID.PodName), "container", containerMetrics.ID.ContainerName)
499+
droppedSampleCount += len(containerMetrics.Usage)
500+
continue
501+
}
491502
for _, sample := range newContainerUsageSamplesWithKey(containerMetrics) {
492503
if err := feeder.clusterState.AddSample(sample); err != nil {
493504
// Not all pod states are tracked in memory saver mode
@@ -518,7 +529,7 @@ Loop:
518529
}
519530

520531
func (feeder *clusterStateFeeder) matchesVPA(pod *spec.BasicPodSpec) bool {
521-
for vpaKey, vpa := range feeder.clusterState.Vpas {
532+
for vpaKey, vpa := range feeder.clusterState.VPAs() {
522533
podLabels := labels.Set(pod.PodLabels)
523534
if vpaKey.Namespace == pod.ID.Namespace && vpa.PodSelector.Matches(podLabels) {
524535
return true

0 commit comments

Comments
 (0)