@@ -3,6 +3,7 @@ package registry_checker
3
3
import (
4
4
"crypto/tls"
5
5
"errors"
6
+ appsv1 "k8s.io/api/apps/v1"
6
7
"k8s.io/apimachinery/pkg/util/wait"
7
8
"net/http"
8
9
"sync"
@@ -34,11 +35,11 @@ type RegistryChecker struct {
34
35
35
36
imageStore * store.ImageStore
36
37
37
- deploymentsInformer appsv1informers.DeploymentInformer
38
- statefulSetssInformer appsv1informers.StatefulSetInformer
39
- daemonSetsInformer appsv1informers.DaemonSetInformer
40
- cronJobsInformer v1beta1.CronJobInformer
41
- secretsInformer corev1informers.SecretInformer
38
+ deploymentsInformer appsv1informers.DeploymentInformer
39
+ statefulSetsInformer appsv1informers.StatefulSetInformer
40
+ daemonSetsInformer appsv1informers.DaemonSetInformer
41
+ cronJobsInformer v1beta1.CronJobInformer
42
+ secretsInformer corev1informers.SecretInformer
42
43
43
44
controllerIndexers ControllerIndexers
44
45
@@ -47,6 +48,8 @@ type RegistryChecker struct {
47
48
imageExistsVectors []prometheus.Metric
48
49
49
50
registryTransport * http.Transport
51
+
52
+ kubeClient * kubernetes.Clientset
50
53
}
51
54
52
55
func NewRegistryChecker (
@@ -68,15 +71,17 @@ func NewRegistryChecker(
68
71
rc := & RegistryChecker {
69
72
imageStore : store .NewImageStore (),
70
73
71
- deploymentsInformer : informerFactory .Apps ().V1 ().Deployments (),
72
- statefulSetssInformer : informerFactory .Apps ().V1 ().StatefulSets (),
73
- daemonSetsInformer : informerFactory .Apps ().V1 ().DaemonSets (),
74
- cronJobsInformer : informerFactory .Batch ().V1beta1 ().CronJobs (),
75
- secretsInformer : informerFactory .Core ().V1 ().Secrets (),
74
+ deploymentsInformer : informerFactory .Apps ().V1 ().Deployments (),
75
+ statefulSetsInformer : informerFactory .Apps ().V1 ().StatefulSets (),
76
+ daemonSetsInformer : informerFactory .Apps ().V1 ().DaemonSets (),
77
+ cronJobsInformer : informerFactory .Batch ().V1beta1 ().CronJobs (),
78
+ secretsInformer : informerFactory .Core ().V1 ().Secrets (),
76
79
77
80
ignoredImages : map [string ]struct {}{},
78
81
79
82
registryTransport : customTransport ,
83
+
84
+ kubeClient : kubeClient ,
80
85
}
81
86
82
87
for _ , image := range ignoredImages {
@@ -105,7 +110,7 @@ func (rc *RegistryChecker) Run(stopCh <-chan struct{}) {
105
110
rc .controllerIndexers .deploymentIndexer = rc .deploymentsInformer .Informer ().GetIndexer ()
106
111
go rc .deploymentsInformer .Informer ().Run (stopCh )
107
112
108
- rc .statefulSetssInformer .Informer ().AddEventHandlerWithResyncPeriod (cache.ResourceEventHandlerFuncs {
113
+ rc .statefulSetsInformer .Informer ().AddEventHandlerWithResyncPeriod (cache.ResourceEventHandlerFuncs {
109
114
AddFunc : func (obj interface {}) {
110
115
rc .reconcile (obj )
111
116
},
@@ -116,12 +121,12 @@ func (rc *RegistryChecker) Run(stopCh <-chan struct{}) {
116
121
rc .reconcile (obj )
117
122
},
118
123
}, resyncPeriod )
119
- err = rc .statefulSetssInformer .Informer ().AddIndexers (statefulSetIndexers )
124
+ err = rc .statefulSetsInformer .Informer ().AddIndexers (statefulSetIndexers )
120
125
if err != nil {
121
126
panic (err )
122
127
}
123
- rc .controllerIndexers .statefulSetIndexer = rc .statefulSetssInformer .Informer ().GetIndexer ()
124
- go rc .statefulSetssInformer .Informer ().Run (stopCh )
128
+ rc .controllerIndexers .statefulSetIndexer = rc .statefulSetsInformer .Informer ().GetIndexer ()
129
+ go rc .statefulSetsInformer .Informer ().Run (stopCh )
125
130
126
131
rc .daemonSetsInformer .Informer ().AddEventHandlerWithResyncPeriod (cache.ResourceEventHandlerFuncs {
127
132
AddFunc : func (obj interface {}) {
@@ -163,7 +168,7 @@ func (rc *RegistryChecker) Run(stopCh <-chan struct{}) {
163
168
go rc .secretsInformer .Informer ().Run (stopCh )
164
169
165
170
logrus .Info ("Waiting for cache sync" )
166
- cache .WaitForCacheSync (stopCh , rc .deploymentsInformer .Informer ().HasSynced , rc .statefulSetssInformer .Informer ().HasSynced ,
171
+ cache .WaitForCacheSync (stopCh , rc .deploymentsInformer .Informer ().HasSynced , rc .statefulSetsInformer .Informer ().HasSynced ,
167
172
rc .daemonSetsInformer .Informer ().HasSynced , rc .cronJobsInformer .Informer ().HasSynced , rc .secretsInformer .Informer ().HasSynced )
168
173
logrus .Info ("Caches populated successfully" )
169
174
}
@@ -180,7 +185,24 @@ func (rc *RegistryChecker) reconcile(obj interface{}) {
180
185
continue
181
186
}
182
187
183
- if ! rc .controllerIndexers .CheckImageExistence (image ) {
188
+ var skipObject bool
189
+
190
+ switch typedObj := obj .(type ) {
191
+ case * appsv1.Deployment :
192
+ if typedObj .Status .Replicas == 0 {
193
+ skipObject = true
194
+ }
195
+ case * appsv1.StatefulSet :
196
+ if typedObj .Status .Replicas == 0 {
197
+ skipObject = true
198
+ }
199
+ case * appsv1.DaemonSet :
200
+ if typedObj .Status .CurrentNumberScheduled == 0 {
201
+ skipObject = true
202
+ }
203
+ }
204
+
205
+ if skipObject || ! rc .controllerIndexers .CheckImageExistence (image ) {
184
206
rc .imageStore .RemoveImage (image )
185
207
continue
186
208
}
@@ -190,7 +212,7 @@ func (rc *RegistryChecker) reconcile(obj interface{}) {
190
212
}
191
213
192
214
func (rc * RegistryChecker ) reconcileUpdate (a , b interface {}) {
193
- if ! EqualObjects (a , b ) {
215
+ if EqualObjects (a , b ) {
194
216
return
195
217
}
196
218
@@ -203,7 +225,7 @@ func (rc *RegistryChecker) Check() {
203
225
204
226
var processingGroup sync.WaitGroup
205
227
for _ , image := range oldImages {
206
- keyChain := rc .controllerIndexers .GetKeychainForImage (image )
228
+ keyChain := rc .controllerIndexers .GetKeychainForImage (rc . kubeClient , image )
207
229
208
230
// TODO: backoff
209
231
processingGroup .Add (1 )
0 commit comments