Skip to content

Commit 4bd2e67

Browse files
authored
Merge pull request #8331 from adrianmoisey/list
Switch VPA checkpoint to use a lister
2 parents 72bf359 + aae2a01 commit 4bd2e67

File tree

5 files changed

+101
-28
lines changed

5 files changed

+101
-28
lines changed

vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ type ClusterStateFeederFactory struct {
8181
KubeClient kube_client.Interface
8282
MetricsClient metrics.MetricsClient
8383
VpaCheckpointClient vpa_api.VerticalPodAutoscalerCheckpointsGetter
84+
VpaCheckpointLister vpa_lister.VerticalPodAutoscalerCheckpointLister
8485
VpaLister vpa_lister.VerticalPodAutoscalerLister
8586
PodLister v1lister.PodLister
8687
OOMObserver oom.Observer
@@ -99,6 +100,7 @@ func (m ClusterStateFeederFactory) Make() *clusterStateFeeder {
99100
metricsClient: m.MetricsClient,
100101
oomChan: m.OOMObserver.GetObservedOomsChannel(),
101102
vpaCheckpointClient: m.VpaCheckpointClient,
103+
vpaCheckpointLister: m.VpaCheckpointLister,
102104
vpaLister: m.VpaLister,
103105
clusterState: m.ClusterState,
104106
specClient: spec.NewSpecClient(m.PodLister),
@@ -206,6 +208,7 @@ type clusterStateFeeder struct {
206208
metricsClient metrics.MetricsClient
207209
oomChan <-chan oom.OomInfo
208210
vpaCheckpointClient vpa_api.VerticalPodAutoscalerCheckpointsGetter
211+
vpaCheckpointLister vpa_lister.VerticalPodAutoscalerCheckpointLister
209212
vpaLister vpa_lister.VerticalPodAutoscalerLister
210213
clusterState model.ClusterState
211214
selectorFetcher target.VpaTargetSelectorFetcher
@@ -267,25 +270,29 @@ func (feeder *clusterStateFeeder) InitFromCheckpoints(ctx context.Context) {
267270
klog.V(3).InfoS("Initializing VPA from checkpoints")
268271
feeder.LoadVPAs(ctx)
269272

273+
klog.V(3).InfoS("Fetching VPA checkpoints")
274+
checkpointList, err := feeder.vpaCheckpointLister.List(labels.Everything())
275+
if err != nil {
276+
klog.ErrorS(err, "Cannot list VPA checkpoints")
277+
}
278+
270279
namespaces := make(map[string]bool)
271280
for _, v := range feeder.clusterState.VPAs() {
272281
namespaces[v.ID.Namespace] = true
273282
}
274283

275284
for namespace := range namespaces {
276-
klog.V(3).InfoS("Fetching checkpoints", "namespace", namespace)
277-
checkpointList, err := feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).List(ctx, metav1.ListOptions{})
278-
if err != nil {
279-
klog.ErrorS(err, "Cannot list VPA checkpoints", "namespace", namespace)
285+
if feeder.shouldIgnoreNamespace(namespace) {
286+
klog.V(3).InfoS("Skipping loading VPA Checkpoints from namespace.", "namespace", namespace, "vpaObjectNamespace", feeder.vpaObjectNamespace, "ignoredNamespaces", feeder.ignoredNamespaces)
287+
continue
280288
}
281-
for _, checkpoint := range checkpointList.Items {
282289

290+
for _, checkpoint := range checkpointList {
283291
klog.V(3).InfoS("Loading checkpoint for VPA", "checkpoint", klog.KRef(checkpoint.Namespace, checkpoint.Spec.VPAObjectName), "container", checkpoint.Spec.ContainerName)
284-
err = feeder.setVpaCheckpoint(&checkpoint)
292+
err = feeder.setVpaCheckpoint(checkpoint)
285293
if err != nil {
286294
klog.ErrorS(err, "Error while loading checkpoint")
287295
}
288-
289296
}
290297
}
291298
}
@@ -345,11 +352,12 @@ func (feeder *clusterStateFeeder) shouldIgnoreNamespace(namespace string) bool {
345352

346353
func (feeder *clusterStateFeeder) cleanupCheckpointsForNamespace(ctx context.Context, namespace string, allVPAKeys map[model.VpaID]bool) error {
347354
var err error
348-
checkpointList, err := feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).List(ctx, metav1.ListOptions{})
355+
checkpointList, err := feeder.vpaCheckpointLister.VerticalPodAutoscalerCheckpoints(namespace).List(labels.Everything())
356+
349357
if err != nil {
350358
return err
351359
}
352-
for _, checkpoint := range checkpointList.Items {
360+
for _, checkpoint := range checkpointList {
353361
vpaID := model.VpaID{Namespace: checkpoint.Namespace, VpaName: checkpoint.Spec.VPAObjectName}
354362
if !allVPAKeys[vpaID] {
355363
if errFeeder := feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).Delete(ctx, checkpoint.Name, metav1.DeleteOptions{}); errFeeder != nil {

vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder_test.go

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -859,11 +859,12 @@ func TestFilterVPAsIgnoreNamespaces(t *testing.T) {
859859
func TestCanCleanupCheckpoints(t *testing.T) {
860860
_, tctx := ktesting.NewTestContext(t)
861861
client := fake.NewSimpleClientset()
862+
namespace := "testNamespace"
862863

863-
_, err := client.CoreV1().Namespaces().Create(tctx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "testNamespace"}}, metav1.CreateOptions{})
864+
_, err := client.CoreV1().Namespaces().Create(tctx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{})
864865
assert.NoError(t, err)
865866

866-
vpaBuilder := test.VerticalPodAutoscaler().WithContainer("container").WithNamespace("testNamespace").WithTargetRef(&autoscalingv1.CrossVersionObjectReference{
867+
vpaBuilder := test.VerticalPodAutoscaler().WithContainer("container").WithNamespace(namespace).WithTargetRef(&autoscalingv1.CrossVersionObjectReference{
867868
Kind: kind,
868869
Name: name1,
869870
APIVersion: apiVersion,
@@ -878,22 +879,19 @@ func TestCanCleanupCheckpoints(t *testing.T) {
878879
vpaLister := &test.VerticalPodAutoscalerListerMock{}
879880
vpaLister.On("List").Return(vpas, nil)
880881

881-
checkpoints := &vpa_types.VerticalPodAutoscalerCheckpointList{
882-
Items: []vpa_types.VerticalPodAutoscalerCheckpoint{
883-
{
884-
ObjectMeta: metav1.ObjectMeta{
885-
Namespace: "testNamespace",
886-
Name: "nonExistentVPA",
887-
},
888-
Spec: vpa_types.VerticalPodAutoscalerCheckpointSpec{
889-
VPAObjectName: "nonExistentVPA",
890-
},
891-
},
882+
vpaCheckPoint := &vpa_types.VerticalPodAutoscalerCheckpoint{
883+
ObjectMeta: metav1.ObjectMeta{
884+
Namespace: namespace,
885+
Name: "nonExistentVPA",
886+
},
887+
Spec: vpa_types.VerticalPodAutoscalerCheckpointSpec{
888+
VPAObjectName: "nonExistentVPA",
892889
},
893890
}
891+
vpacheckpoints := []*vpa_types.VerticalPodAutoscalerCheckpoint{vpaCheckPoint}
894892

895893
for _, vpa := range vpas {
896-
checkpoints.Items = append(checkpoints.Items, vpa_types.VerticalPodAutoscalerCheckpoint{
894+
vpacheckpoints = append(vpacheckpoints, &vpa_types.VerticalPodAutoscalerCheckpoint{
897895
ObjectMeta: metav1.ObjectMeta{
898896
Namespace: vpa.Namespace,
899897
Name: vpa.Name,
@@ -904,23 +902,29 @@ func TestCanCleanupCheckpoints(t *testing.T) {
904902
})
905903
}
906904

905+
// Create a mock checkpoint client to track deletions
907906
checkpointClient := &fakeautoscalingv1.FakeAutoscalingV1{Fake: &core.Fake{}}
908-
checkpointClient.AddReactor("list", "verticalpodautoscalercheckpoints", func(action core.Action) (bool, runtime.Object, error) {
909-
return true, checkpoints, nil
910-
})
911-
907+
// Track deleted checkpoints
912908
deletedCheckpoints := []string{}
913909
checkpointClient.AddReactor("delete", "verticalpodautoscalercheckpoints", func(action core.Action) (bool, runtime.Object, error) {
914910
deleteAction := action.(core.DeleteAction)
915911
deletedCheckpoints = append(deletedCheckpoints, deleteAction.GetName())
916-
917912
return true, nil, nil
918913
})
919914

915+
// Create namespace lister mock that will return the checkpoint list
916+
checkpointNamespaceLister := &test.VerticalPodAutoscalerCheckPointListerMock{}
917+
checkpointNamespaceLister.On("List").Return(vpacheckpoints, nil)
918+
919+
// Create main checkpoint mock that will return the namespace lister
920+
checkpointLister := &test.VerticalPodAutoscalerCheckPointListerMock{}
921+
checkpointLister.On("VerticalPodAutoscalerCheckpoints", namespace).Return(checkpointNamespaceLister)
922+
920923
feeder := clusterStateFeeder{
921924
coreClient: client.CoreV1(),
922925
vpaLister: vpaLister,
923926
vpaCheckpointClient: checkpointClient,
927+
vpaCheckpointLister: checkpointLister,
924928
clusterState: model.NewClusterState(testGcPeriod),
925929
recommenderName: "default",
926930
}

vertical-pod-autoscaler/pkg/recommender/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ func run(ctx context.Context, healthCheck *metrics.HealthCheck, commonFlag *comm
297297
MetricsClient: input_metrics.NewMetricsClient(source, commonFlag.VpaObjectNamespace, "default-metrics-client"),
298298
VpaCheckpointClient: vpa_clientset.NewForConfigOrDie(config).AutoscalingV1(),
299299
VpaLister: vpa_api_util.NewVpasLister(vpa_clientset.NewForConfigOrDie(config), make(chan struct{}), commonFlag.VpaObjectNamespace),
300+
VpaCheckpointLister: vpa_api_util.NewVpaCheckpointLister(vpa_clientset.NewForConfigOrDie(config), make(chan struct{}), commonFlag.VpaObjectNamespace),
300301
ClusterState: clusterState,
301302
SelectorFetcher: target.NewVpaTargetSelectorFetcher(config, kubeClient, factory),
302303
MemorySaveMode: *memorySaver,

vertical-pod-autoscaler/pkg/utils/test/test_utils.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,36 @@ func (m *VerticalPodAutoscalerListerMock) Get(name string) (*vpa_types.VerticalP
200200
return nil, fmt.Errorf("unimplemented")
201201
}
202202

203+
// VerticalPodAutoscalerCheckPointListerMock is a mock of VerticalPodAutoscalerCheckPointLister
204+
type VerticalPodAutoscalerCheckPointListerMock struct {
205+
mock.Mock
206+
}
207+
208+
// List is a mock implementation of VerticalPodAutoscalerLister.List
209+
func (m *VerticalPodAutoscalerCheckPointListerMock) List(selector labels.Selector) (ret []*vpa_types.VerticalPodAutoscalerCheckpoint, err error) {
210+
args := m.Called()
211+
var returnArg []*vpa_types.VerticalPodAutoscalerCheckpoint
212+
if args.Get(0) != nil {
213+
returnArg = args.Get(0).([]*vpa_types.VerticalPodAutoscalerCheckpoint)
214+
}
215+
return returnArg, args.Error(1)
216+
}
217+
218+
// VerticalPodAutoscalerCheckpoints is a mock implementation of returning a lister for namespace.
219+
func (m *VerticalPodAutoscalerCheckPointListerMock) VerticalPodAutoscalerCheckpoints(namespace string) vpa_lister.VerticalPodAutoscalerCheckpointNamespaceLister {
220+
args := m.Called(namespace)
221+
var returnArg vpa_lister.VerticalPodAutoscalerCheckpointNamespaceLister
222+
if args.Get(0) != nil {
223+
returnArg = args.Get(0).(vpa_lister.VerticalPodAutoscalerCheckpointNamespaceLister)
224+
}
225+
return returnArg
226+
}
227+
228+
// Get is not implemented for this mock
229+
func (m *VerticalPodAutoscalerCheckPointListerMock) Get(name string) (*vpa_types.VerticalPodAutoscalerCheckpoint, error) {
230+
return nil, fmt.Errorf("unimplemented")
231+
}
232+
203233
// VerticalPodAutoscalerV1Beta1ListerMock is a mock of VerticalPodAutoscalerLister or
204234
// VerticalPodAutoscalerNamespaceLister - the crucial List method is the same.
205235
type VerticalPodAutoscalerV1Beta1ListerMock struct {

vertical-pod-autoscaler/pkg/utils/vpa/api.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,36 @@ func NewVpasLister(vpaClient *vpa_clientset.Clientset, stopChannel <-chan struct
107107
return vpaLister
108108
}
109109

110+
// NewVpaCheckpointLister returns VerticalPodAutoscalerCheckpointLister configured to fetch all VPACheckpoint objects from namespace,
111+
// set namespace to k8sapiv1.NamespaceAll to select all namespaces.
112+
// The method blocks until vpaCheckpointLister is initially populated.
113+
func NewVpaCheckpointLister(vpaClient *vpa_clientset.Clientset, stopChannel <-chan struct{}, namespace string) vpa_lister.VerticalPodAutoscalerCheckpointLister {
114+
vpaListWatch := cache.NewListWatchFromClient(vpaClient.AutoscalingV1().RESTClient(), "verticalpodautoscalercheckpoints", namespace, fields.Everything())
115+
informerOptions := cache.InformerOptions{
116+
ObjectType: &vpa_types.VerticalPodAutoscalerCheckpoint{},
117+
ListerWatcher: vpaListWatch,
118+
Handler: &cache.ResourceEventHandlerFuncs{},
119+
ResyncPeriod: 1 * time.Hour,
120+
Indexers: cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
121+
}
122+
123+
store, controller := cache.NewInformerWithOptions(informerOptions)
124+
indexer, ok := store.(cache.Indexer)
125+
if !ok {
126+
klog.ErrorS(nil, "Expected Indexer, but got a Store that does not implement Indexer")
127+
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
128+
}
129+
vpaCheckpointLister := vpa_lister.NewVerticalPodAutoscalerCheckpointLister(indexer)
130+
go controller.Run(stopChannel)
131+
if !cache.WaitForCacheSync(stopChannel, controller.HasSynced) {
132+
klog.ErrorS(nil, "Failed to sync VPA checkpoint cache during initialization")
133+
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
134+
} else {
135+
klog.InfoS("Initial VPA checkpoint synced successfully")
136+
}
137+
return vpaCheckpointLister
138+
}
139+
110140
// PodMatchesVPA returns true iff the vpaWithSelector matches the Pod.
111141
func PodMatchesVPA(pod *core.Pod, vpaWithSelector *VpaWithSelector) bool {
112142
return PodLabelsMatchVPA(pod.Namespace, labels.Set(pod.GetLabels()), vpaWithSelector.Vpa.Namespace, vpaWithSelector.Selector)

0 commit comments

Comments
 (0)