@@ -18,101 +18,35 @@ package controllers
18
18
19
19
import (
20
20
"context"
21
- "encoding/json"
22
- "errors"
23
21
"fmt"
24
- "strconv"
25
- "strings"
26
22
"time"
27
- "unicode/utf8"
28
23
29
- "github.com/go-logr/logr"
30
- "github.com/labstack/gommon/log"
31
24
corev1 "k8s.io/api/core/v1"
32
25
v1 "k8s.io/api/core/v1"
33
26
apierrors "k8s.io/apimachinery/pkg/api/errors"
34
27
"k8s.io/apimachinery/pkg/api/resource"
35
28
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36
- "k8s.io/apimachinery/pkg/runtime"
37
- "k8s.io/client-go/kubernetes"
38
29
"k8s.io/client-go/tools/cache"
39
- "k8s.io/klog"
40
30
41
31
ctrl "sigs.k8s.io/controller-runtime"
42
- "sigs.k8s.io/controller-runtime/pkg/client"
43
32
)
44
33
45
- // PodReconciler reconciles a Pod object
46
- type PodReconciler struct {
47
- client.Client
48
- Log logr.Logger
49
- Scheme * runtime.Scheme
50
- ClientSet * kubernetes.Clientset
51
- SampleSize int
52
- EnableLabel bool
53
- }
54
-
55
- type PodRequests struct {
56
- Name string
57
- ContainerRequests []ContainerRequests
58
- Sample int
59
- }
60
-
61
- type ContainerRequests struct {
62
- Name string
63
- CPU int64 // Nanocores
64
- Memory int64 // Mi
65
- }
66
-
67
- type NewContainerRequests struct {
68
- Name string
69
- Requests v1.ResourceRequirements
70
- }
71
-
72
- type PodMetricsRestData struct {
73
- Kind string `json:"kind"`
74
- APIVersion string `json:"apiVersion"`
75
- Metadata struct {
76
- Name string `json:"name"`
77
- Namespace string `json:"namespace"`
78
- CreationTimestamp time.Time `json:"creationTimestamp"`
79
- Labels struct {
80
- App string `json:"app"`
81
- PodTemplateHash string `json:"pod-template-hash"`
82
- Release string `json:"release"`
83
- SecurityIstioIoTLSMode string `json:"security.istio.io/tlsMode"`
84
- ServiceIstioIoCanonicalName string `json:"service.istio.io/canonical-name"`
85
- ServiceIstioIoCanonicalRevision string `json:"service.istio.io/canonical-revision"`
86
- } `json:"labels"`
87
- } `json:"metadata"`
88
- Timestamp time.Time `json:"timestamp"`
89
- Window string `json:"window"`
90
- Containers []struct {
91
- Name string `json:"name"`
92
- Usage struct {
93
- CPU string `json:"cpu"`
94
- Memory string `json:"memory"`
95
- } `json:"usage"`
96
- } `json:"containers"`
97
- }
98
-
99
34
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;update;patch
100
35
101
36
const (
102
- addPodNameLabelAnnotation = "auto.request.operator/optimize"
37
+ operatorAnnotation = "auto.request.operator/optimize"
38
+ // cacheKeyFunc defines the key function required in TTLStore.
39
+ cacheTTL = 60 * time .Second
103
40
)
104
41
105
- // cacheKeyFunc defines the key function required in TTLStore.
106
- const cacheTTL = 60 * time .Second
107
-
108
42
func cacheKeyFunc (obj interface {}) (string , error ) {
109
43
return obj .(PodRequests ).Name , nil
110
44
}
111
45
112
46
var cacheStore = cache .NewTTLStore (cacheKeyFunc , cacheTTL )
113
47
114
48
// Reconcile handles a reconciliation request for a Pod.
115
- // If the Pod has the addPodNameLabelAnnotation annotation, then Reconcile
49
+ // If the Pod has the podHasAnnotation annotation, then Reconcile
116
50
// will make sure the podNameLabel label is present with the correct value.
117
51
// If the annotation is absent, then Reconcile will make sure the label is too.
118
52
func (r * PodReconciler ) Reconcile (ctx context.Context , req ctrl.Request ) (ctrl.Result , error ) {
@@ -134,62 +68,32 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
134
68
return ctrl.Result {}, err
135
69
}
136
70
137
- /*
138
- Step 1: Add or remove the label.
139
- */
140
-
141
- addPodNameLabelAnnotation := pod .Annotations [addPodNameLabelAnnotation ] == "true"
142
- // labelIsPresent := pod.Labels[podNameLabel] == pod.Name
143
-
144
- // if labelShouldBePresent == labelIsPresent {
145
- // // The desired state and actual state of the Pod are the same.
146
- // // No further action is required by the operator at this moment.
147
- // log.Info("no update required")
148
- // return ctrl.Result{}, nil
149
- // }
150
- // create the config object from kubeconfig
151
- // https://github.com/kubernetes/client-go/tree/master/examples#configuration
71
+ annotation , err := r .NamespaceOrPodHaveAnnotation (pod , ctx )
72
+ if err != nil {
73
+ log .Error (err , "failed to get annotations" )
74
+ return ctrl.Result {}, err
75
+ }
152
76
153
- // {"kind":"PodMetrics","apiVersion":"metrics.k8s.io/v1beta1","metadata":{"name":"swagger-swaggerui-54c4559467-xgs62","namespace":"automation-namespace","creationTimestamp":"2022-12-14T14:14:31Z","labels":{"app":"swaggerui","pod-template-hash":"54c4559467","release":"swagger","security.istio.io/tlsMode":"istio","service.istio.io/canonical-name":"swaggerui","service.istio.io/canonical-revision":"latest"}},"timestamp":"2022-12-14T14:14:10Z","window":"13s","containers":[{"name":"istio-proxy","usage":{"cpu":"31576563n","memory":"198012Ki"}},{"name":"swaggerui","usage":{"cpu":"20527n","memory":"9136Ki"}}]}
154
- if (! r .EnableLabel ) || (r .EnableLabel && addPodNameLabelAnnotation ) {
77
+ if (! r .EnableAnnotation ) || (r .EnableAnnotation && annotation ) {
155
78
log .Info ("Checking Pod: " + pod .Name + " in namespace " + pod .Namespace )
156
79
157
- // go func() {
158
80
data , err := r .ClientSet .RESTClient ().Get ().AbsPath (fmt .Sprintf ("apis/metrics.k8s.io/v1beta1/namespaces/%v/pods/%v" , pod .Namespace , pod .Name )).DoRaw (ctx )
159
81
160
82
if err != nil {
161
83
log .Error (err , "failed to get stats from pod" )
162
84
return ctrl.Result {}, err
163
85
}
164
86
PodUsageData := GeneratePodRequestsObjectFromRestData (data )
165
- // PodRequestsData := GetPodRequests(pod)
166
87
167
88
SumPodRequest := PodRequests {Name : pod .Name , ContainerRequests : []ContainerRequests {}}
168
- // for _, r := range PodRequestsData.ContainerRequests {
169
- // temp := r
170
- // for _, v := range PodUsageData.ContainerRequests {
171
- // if v.Name == r.Name {
172
- // temp.CPU = r.CPU - v.CPU
173
- // temp.Memory = r.Memory - v.Memory
174
- // }
175
- // }
176
- // if temp.CPU < 0 {
177
- // temp.CPU = 0
178
- // }
179
- // if temp.Memory < 0 {
180
- // temp.Memory = 0
181
- // }
89
+
182
90
SumPodRequest .ContainerRequests = PodUsageData .ContainerRequests
183
- // }
184
91
185
92
LatestPodRequest , err := fetchFromCache (cacheStore , pod .Name )
186
93
if err != nil {
187
- // log.Error(err, err.Error())
188
94
SumPodRequest .Sample = 0
189
- // log.Info("Adding to cache sample: 0")
190
95
addToCache (cacheStore , SumPodRequest )
191
96
} else {
192
- // log.Info("Adding to cache sample: " + fmt.Sprint((LatestPodRequest.Sample + 1)))
193
97
if err != nil {
194
98
log .Error (err , err .Error ())
195
99
} else {
@@ -213,13 +117,11 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
213
117
}
214
118
}
215
119
}
216
- // log.Info(fmt.Sprint(SumPodRequest))
217
120
218
121
if SumPodRequest .Sample == r .SampleSize {
219
122
PodChange := false
220
123
Requests := []NewContainerRequests {}
221
124
log .Info (fmt .Sprint (SumPodRequest ))
222
- log .Info ("Pod Being Checked" )
223
125
for _ , c := range SumPodRequest .ContainerRequests {
224
126
AverageUsageCPU := c .CPU / int64 (SumPodRequest .Sample )
225
127
AverageUsageMemory := c .Memory / int64 (SumPodRequest .Sample )
@@ -255,86 +157,55 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
255
157
256
158
if len (pod .OwnerReferences ) == 0 {
257
159
log .Info ("Pod has no owner" )
258
- return r .UpdatePod (& pod , ctx )
160
+ return r .UpdateKubeObject (& pod , ctx )
259
161
}
260
162
261
163
var ownerName string
262
164
263
165
switch pod .OwnerReferences [0 ].Kind {
264
166
case "ReplicaSet" :
265
- log .Info ("Is Owned by Deployment" )
266
167
replica , err := r .ClientSet .AppsV1 ().ReplicaSets (pod .Namespace ).Get (ctx , pod .OwnerReferences [0 ].Name , metav1.GetOptions {})
267
168
if err != nil {
268
169
log .Error (err , err .Error ())
269
170
}
270
171
271
172
ownerName = replica .OwnerReferences [0 ].Name
272
- // ownerKind = "Deployment"
273
173
if replica .OwnerReferences [0 ].Kind == "Deployment" {
174
+ log .Info ("Is Owned by Deployment" )
274
175
deployment , err := r .ClientSet .AppsV1 ().Deployments (pod .Namespace ).Get (ctx , ownerName , metav1.GetOptions {})
275
176
if err != nil {
276
177
log .Error (err , err .Error ())
277
178
}
278
- for _ , podContainer := range Requests {
279
- for i , depContainer := range deployment .Spec .Template .Spec .Containers {
280
- if depContainer .Name == podContainer .Name {
281
- if deployment .Spec .Template .Spec .Containers [i ].Resources .Requests != nil {
282
- log .Info (fmt .Sprint ("Setting" , podContainer .Requests ))
283
- deployment .Spec .Template .Spec .Containers [i ].Resources = podContainer .Requests
284
- }
285
- }
286
- }
287
- }
288
- deployment .Annotations ["auto.request.operator/changed" ] = "true"
289
- //deployment.Spec.Template.Annotations["auto.request.operator/changed"] = "true"
290
- r .UpdatePod (deployment , ctx )
179
+ UpdatePodController (deployment , Requests , ctx )
180
+ return r .UpdateKubeObject (deployment , ctx )
181
+ } else {
182
+ log .Info ("Is Owned by Unknown CRD" )
291
183
}
292
184
case "DaemonSet" :
293
185
log .Info ("Is Owned by DaemonSet" )
294
186
ownerName = pod .OwnerReferences [0 ].Name
295
- // ownerKind = pod.OwnerReferences[0].Kind
296
187
297
188
deployment , err := r .ClientSet .AppsV1 ().DaemonSets (pod .Namespace ).Get (ctx , ownerName , metav1.GetOptions {})
298
189
if err != nil {
299
190
log .Error (err , err .Error ())
300
191
}
301
- for _ , podContainer := range Requests {
302
- for i , depContainer := range deployment .Spec .Template .Spec .Containers {
303
- if depContainer .Name == podContainer .Name {
304
- log .Info (fmt .Sprint ("Setting" , podContainer .Requests ))
305
- deployment .Spec .Template .Spec .Containers [i ].Resources = podContainer .Requests
306
- }
307
- }
308
- }
309
- deployment .Annotations ["auto.request.operator/changed" ] = "true"
310
- //deployment.Spec.Template.Annotations["auto.request.operator/changed"] = "true"
311
- r .UpdatePod (deployment , ctx )
192
+ UpdatePodController (deployment , Requests , ctx )
193
+ return r .UpdateKubeObject (deployment , ctx )
312
194
case "StatefulSet" :
313
195
log .Info ("Is Owned by StatefulSet" )
314
196
ownerName = pod .OwnerReferences [0 ].Name
315
- // ownerKind = pod.OwnerReferences[0].Kind
316
197
317
198
deployment , err := r .ClientSet .AppsV1 ().StatefulSets (pod .Namespace ).Get (ctx , ownerName , metav1.GetOptions {})
318
199
if err != nil {
319
200
log .Error (err , err .Error ())
320
201
}
321
- for _ , podContainer := range Requests {
322
- for i , depContainer := range deployment .Spec .Template .Spec .Containers {
323
- if depContainer .Name == podContainer .Name {
324
- log .Info (fmt .Sprint ("Setting" , podContainer .Requests ))
325
- deployment .Spec .Template .Spec .Containers [i ].Resources = podContainer .Requests
326
- }
327
- }
328
- }
329
- deployment .Annotations ["auto.request.operator/changed" ] = "true"
330
- //deployment.Spec.Template.Annotations["auto.request.operator/changed"] = "true"
331
- r .UpdatePod (deployment , ctx )
202
+
203
+ UpdatePodController (deployment , Requests , ctx )
204
+ return r .UpdateKubeObject (deployment , ctx )
332
205
default :
333
206
fmt .Printf ("Could not find resource manager for type %s\n " , pod .OwnerReferences [0 ].Kind )
334
207
}
335
208
336
- // fmt.Printf("POD %s is managed by %s %s\n", pod.Name, ownerName, ownerKind)
337
-
338
209
}
339
210
err := deleteFromCache (cacheStore , SumPodRequest )
340
211
if err != nil {
@@ -345,96 +216,3 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
345
216
346
217
return ctrl.Result {}, nil
347
218
}
348
-
349
- func (r * PodReconciler ) UpdatePod (pod client.Object , ctx context.Context ) (ctrl.Result , error ) {
350
- if err := r .Update (ctx , pod ); err != nil {
351
- if apierrors .IsConflict (err ) {
352
- // The Pod has been updated since we read it.
353
- // Requeue the Pod to try to reconciliate again.
354
- return ctrl.Result {Requeue : true }, nil
355
- }
356
- if apierrors .IsNotFound (err ) {
357
- // The Pod has been deleted since we read it.
358
- // Requeue the Pod to try to reconciliate again.
359
- return ctrl.Result {Requeue : true }, nil
360
- }
361
- log .Error (err , "unable to update pod" )
362
- return ctrl.Result {}, err
363
- }
364
- return ctrl.Result {}, nil
365
- }
366
-
367
- // SetupWithManager sets up the controller with the Manager.
368
- func (r * PodReconciler ) SetupWithManager (mgr ctrl.Manager ) error {
369
- return ctrl .NewControllerManagedBy (mgr ).
370
- For (& corev1.Pod {}).
371
- Complete (r )
372
- }
373
- func RemoveLastChar (str string ) string {
374
- for len (str ) > 0 {
375
- _ , size := utf8 .DecodeLastRuneInString (str )
376
- return str [:len (str )- size ]
377
- }
378
- return str
379
- }
380
- func GetPodRequests (pod corev1.Pod ) PodRequests {
381
- containerData := []ContainerRequests {}
382
- for _ , c := range pod .Spec .Containers {
383
- nanoCores , _ := strconv .Atoi (RemoveLastChar (c .Resources .Requests .Cpu ().String ()))
384
- memString := c .Resources .Requests .Memory ().String ()
385
- miMemory := 0
386
- if strings .Contains (memString , "Ki" ) {
387
- kkiMemory , _ := strconv .Atoi (strings .ReplaceAll (memString , "Ki" , "" ))
388
- miMemory = kkiMemory / 1000
389
- }
390
- if strings .Contains (memString , "Mi" ) {
391
- miMemory , _ = strconv .Atoi (strings .ReplaceAll (memString , "Mi" , "" ))
392
- }
393
- if strings .Contains (memString , "Gi" ) {
394
- giMemory , _ := strconv .Atoi (strings .ReplaceAll (memString , "Gi" , "" ))
395
- miMemory = giMemory / 1000
396
- }
397
- containerData = append (containerData , ContainerRequests {Name : c .Name , CPU : int64 (nanoCores ), Memory : int64 (miMemory )})
398
- }
399
- return PodRequests {pod .Name , containerData , 0 }
400
- }
401
-
402
- func addToCache (cacheStore cache.Store , object PodRequests ) error {
403
- err := cacheStore .Add (object )
404
- if err != nil {
405
- klog .Errorf ("failed to add key value to cache error" , err )
406
- return err
407
- }
408
- return nil
409
- }
410
-
411
- func fetchFromCache (cacheStore cache.Store , key string ) (PodRequests , error ) {
412
- obj , exists , err := cacheStore .GetByKey (key )
413
- if err != nil {
414
- // klog.Errorf("failed to add key value to cache error", err)
415
- return PodRequests {}, err
416
- }
417
- if ! exists {
418
- // klog.Errorf("object does not exist in the cache")
419
- err = errors .New ("object does not exist in the cache" )
420
- return PodRequests {}, err
421
- }
422
- return obj .(PodRequests ), nil
423
- }
424
-
425
- func deleteFromCache (cacheStore cache.Store , object PodRequests ) error {
426
- return cacheStore .Delete (object )
427
- }
428
-
429
- func GeneratePodRequestsObjectFromRestData (restData []byte ) PodRequests {
430
- s := restData [:]
431
- data := PodMetricsRestData {}
432
- json .Unmarshal ([]byte (s ), & data )
433
- containerData := []ContainerRequests {}
434
- for _ , c := range data .Containers {
435
- nanoCores , _ := strconv .Atoi (RemoveLastChar (c .Usage .CPU ))
436
- kiMemory , _ := strconv .Atoi (strings .ReplaceAll (c .Usage .Memory , "Ki" , "" ))
437
- containerData = append (containerData , ContainerRequests {Name : c .Name , CPU : int64 (nanoCores / 1000000 ), Memory : int64 (kiMemory / 1000 )})
438
- }
439
- return PodRequests {data .Metadata .Name , containerData , 0 }
440
- }
0 commit comments