1
1
package registry_checker
2
2
3
3
import (
4
+ "context"
4
5
"crypto/tls"
5
6
"errors"
6
- appsv1 "k8s.io/api/apps/v1"
7
- "k8s.io/apimachinery/pkg/util/wait"
8
7
"net/http"
9
- "sync "
8
+ "path "
10
9
"time"
11
10
11
+ "github.com/prometheus/client_golang/prometheus"
12
+ appsv1 "k8s.io/api/apps/v1"
13
+ "k8s.io/apimachinery/pkg/util/wait"
14
+ "k8s.io/client-go/tools/cache"
15
+
12
16
"github.com/google/go-containerregistry/pkg/name"
13
17
"github.com/google/go-containerregistry/pkg/v1/remote"
14
18
"github.com/sirupsen/logrus"
@@ -21,11 +25,12 @@ import (
21
25
22
26
"github.com/flant/k8s-image-availability-exporter/pkg/store"
23
27
"k8s.io/client-go/kubernetes"
24
- "k8s.io/client-go/tools/cache"
25
28
)
26
29
27
30
const (
28
- resyncPeriod = time .Hour
31
+ resyncPeriod = time .Hour
32
+ failedCheckBatchSize = 20
33
+ checkBatchSize = 50
29
34
)
30
35
31
36
type registryCheckerConfig struct {
@@ -53,6 +58,7 @@ type RegistryChecker struct {
53
58
}
54
59
55
60
func NewRegistryChecker (
61
+ stopCh <- chan struct {},
56
62
kubeClient * kubernetes.Clientset ,
57
63
skipVerify bool ,
58
64
ignoredImages []string ,
@@ -71,8 +77,6 @@ func NewRegistryChecker(
71
77
}
72
78
73
79
rc := & RegistryChecker {
74
- imageStore : store .NewImageStore (),
75
-
76
80
deploymentsInformer : informerFactory .Apps ().V1 ().Deployments (),
77
81
statefulSetsInformer : informerFactory .Apps ().V1 ().StatefulSets (),
78
82
daemonSetsInformer : informerFactory .Apps ().V1 ().DaemonSets (),
@@ -94,10 +98,8 @@ func NewRegistryChecker(
94
98
rc .ignoredImages [image ] = struct {}{}
95
99
}
96
100
97
- return rc
98
- }
101
+ rc .imageStore = store .NewImageStore (rc .Check , checkBatchSize , failedCheckBatchSize )
99
102
100
- func (rc * RegistryChecker ) Run (stopCh <- chan struct {}) {
101
103
rc .deploymentsInformer .Informer ().AddEventHandlerWithResyncPeriod (cache.ResourceEventHandlerFuncs {
102
104
AddFunc : func (obj interface {}) {
103
105
rc .reconcile (obj )
@@ -177,6 +179,26 @@ func (rc *RegistryChecker) Run(stopCh <-chan struct{}) {
177
179
cache .WaitForCacheSync (stopCh , rc .deploymentsInformer .Informer ().HasSynced , rc .statefulSetsInformer .Informer ().HasSynced ,
178
180
rc .daemonSetsInformer .Informer ().HasSynced , rc .cronJobsInformer .Informer ().HasSynced , rc .secretsInformer .Informer ().HasSynced )
179
181
logrus .Info ("Caches populated successfully" )
182
+
183
+ rc .imageStore .RunGC (rc .controllerIndexers .GetContainerInfosForImage )
184
+
185
+ return rc
186
+ }
187
+
188
+ // Collect implements prometheus.Collector.
189
+ func (rc * RegistryChecker ) Collect (ch chan <- prometheus.Metric ) {
190
+ metrics := rc .imageStore .ExtractMetrics ()
191
+
192
+ for _ , m := range metrics {
193
+ ch <- m
194
+ }
195
+ }
196
+
197
+ // Describe implements prometheus.Collector.
198
+ func (rc * RegistryChecker ) Describe (_ chan <- * prometheus.Desc ) {}
199
+
200
+ func (rc RegistryChecker ) Tick () {
201
+ rc .imageStore .Check ()
180
202
}
181
203
182
204
func (rc * RegistryChecker ) reconcile (obj interface {}) {
@@ -208,12 +230,13 @@ func (rc *RegistryChecker) reconcile(obj interface{}) {
208
230
}
209
231
}
210
232
211
- if skipObject || ! rc . controllerIndexers . CheckImageExistence ( image ) {
212
- rc .imageStore .RemoveImage (image )
233
+ if skipObject {
234
+ rc .imageStore .ReconcileImage (image , []store. ContainerInfo {} )
213
235
continue
214
236
}
215
237
216
- rc .imageStore .AddOrUpdateImage (image , time.Time {})
238
+ containerInfos := rc .controllerIndexers .GetContainerInfosForImage (image )
239
+ rc .imageStore .ReconcileImage (image , containerInfos )
217
240
}
218
241
}
219
242
@@ -225,30 +248,11 @@ func (rc *RegistryChecker) reconcileUpdate(a, b interface{}) {
225
248
rc .reconcile (b )
226
249
}
227
250
228
- func (rc * RegistryChecker ) Check () {
229
- // TODO: tweak const
230
- oldImages := rc .imageStore .PopOldestImages (rc .imageStore .Length () / 40 )
231
-
232
- var processingGroup sync.WaitGroup
233
- for _ , image := range oldImages {
234
- keyChain := rc .controllerIndexers .GetKeychainForImage (rc .kubeClient , image )
251
+ func (rc * RegistryChecker ) Check (imageName string ) store.AvailabilityMode {
252
+ keyChain := rc .controllerIndexers .GetKeychainForImage (rc .kubeClient , imageName )
235
253
236
- // TODO: backoff
237
- processingGroup .Add (1 )
238
- go func (imageName string , kc * keychain ) {
239
- defer processingGroup .Done ()
240
-
241
- log := logrus .WithField ("image_name" , imageName )
242
- availMode := rc .checkImageAvailability (log , imageName , kc )
243
-
244
- rc .imageStore .AddOrUpdateImage (imageName , time .Now (), availMode )
245
- }(image , keyChain )
246
-
247
- containerInfos := rc .controllerIndexers .GetContainerInfosForImage (image )
248
- rc .imageStore .UpdateContainerAssociations (image , containerInfos )
249
- }
250
-
251
- processingGroup .Wait ()
254
+ log := logrus .WithField ("image_name" , imageName )
255
+ return rc .checkImageAvailability (log , imageName , keyChain )
252
256
}
253
257
254
258
func checkImageNameParseErr (log * logrus.Entry , err error ) store.AvailabilityMode {
@@ -286,12 +290,16 @@ func (rc *RegistryChecker) checkImageAvailability(log *logrus.Entry, imageName s
286
290
Factor : 2 ,
287
291
Steps : 2 ,
288
292
}, func () (bool , error ) {
293
+ ctx , cancel := context .WithTimeout (context .Background (), 15 * time .Second )
294
+ defer cancel ()
295
+
289
296
if kc != nil {
290
297
for i := 0 ; i < kc .size ; i ++ {
291
298
indexedKeychain := * kc
292
299
indexedKeychain .index = i
293
300
294
- _ , imgErr = remote .Head (ref , remote .WithAuthFromKeychain (& indexedKeychain ), remote .WithTransport (rc .registryTransport ))
301
+ _ , imgErr = remote .Head (ref , remote .WithAuthFromKeychain (& indexedKeychain ), remote .WithTransport (rc .registryTransport ),
302
+ remote .WithContext (ctx ))
295
303
296
304
if imgErr != nil {
297
305
continue
@@ -302,7 +310,7 @@ func (rc *RegistryChecker) checkImageAvailability(log *logrus.Entry, imageName s
302
310
}
303
311
}
304
312
} else {
305
- _ , imgErr = remote .Head (ref , remote .WithTransport (rc .registryTransport ))
313
+ _ , imgErr = remote .Head (ref , remote .WithTransport (rc .registryTransport ), remote . WithContext ( ctx ) )
306
314
}
307
315
308
316
availMode = store .Available
0 commit comments