@@ -23,14 +23,18 @@ import (
23
23
"encoding/pem"
24
24
"fmt"
25
25
"math/rand"
26
+ "sync"
26
27
"time"
27
28
29
+ "github.com/go-logr/logr"
28
30
certificatesv1beta1 "k8s.io/api/certificates/v1beta1"
29
31
k8serrors "k8s.io/apimachinery/pkg/api/errors"
30
32
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31
33
lrucache "k8s.io/apimachinery/pkg/util/cache"
32
34
"k8s.io/apimachinery/pkg/util/sets"
35
+ "k8s.io/client-go/informers"
33
36
certinformersv1beta1 "k8s.io/client-go/informers/certificates/v1beta1"
37
+ clientset "k8s.io/client-go/kubernetes"
34
38
certlistersv1beta1 "k8s.io/client-go/listers/certificates/v1beta1"
35
39
"k8s.io/client-go/tools/cache"
36
40
"k8s.io/klog/v2"
@@ -261,3 +265,113 @@ func (m *NoopManager) GetTrustAnchorsByName(name string, allowMissing bool) ([]b
261
265
func (m * NoopManager ) GetTrustAnchorsBySigner (signerName string , labelSelector * metav1.LabelSelector , allowMissing bool ) ([]byte , error ) {
262
266
return nil , fmt .Errorf ("ClusterTrustBundle projection is not supported in static kubelet mode" )
263
267
}
268
+
269
+ // LazyInformerManager decides whether to use the noop or the actual manager on a call to
270
+ // the manager's methods.
271
+ // We cannot determine this upon startup because some may rely on the kubelet to be fully
272
+ // running in order to setup their kube-apiserver.
273
+ type LazyInformerManager struct {
274
+ manager Manager
275
+ managerLock sync.RWMutex
276
+ client clientset.Interface
277
+ cacheSize int
278
+ contextWithLogger context.Context
279
+ logger logr.Logger
280
+ }
281
+
282
+ func NewLazyInformerManager (ctx context.Context , kubeClient clientset.Interface , cacheSize int ) Manager {
283
+ return & LazyInformerManager {
284
+ client : kubeClient ,
285
+ cacheSize : cacheSize ,
286
+ contextWithLogger : ctx ,
287
+ logger : klog .FromContext (ctx ),
288
+ managerLock : sync.RWMutex {},
289
+ }
290
+ }
291
+
292
+ func (m * LazyInformerManager ) GetTrustAnchorsByName (name string , allowMissing bool ) ([]byte , error ) {
293
+ if err := m .ensureManagerSet (); err != nil {
294
+ return nil , fmt .Errorf ("failed to ensure informer manager for ClusterTrustBundles: %w" , err )
295
+ }
296
+ return m .manager .GetTrustAnchorsByName (name , allowMissing )
297
+ }
298
+
299
+ func (m * LazyInformerManager ) GetTrustAnchorsBySigner (signerName string , labelSelector * metav1.LabelSelector , allowMissing bool ) ([]byte , error ) {
300
+ if err := m .ensureManagerSet (); err != nil {
301
+ return nil , fmt .Errorf ("failed to ensure informer manager for ClusterTrustBundles: %w" , err )
302
+ }
303
+ return m .manager .GetTrustAnchorsBySigner (signerName , labelSelector , allowMissing )
304
+ }
305
+
306
+ func (m * LazyInformerManager ) isManagerSet () bool {
307
+ m .managerLock .RLock ()
308
+ defer m .managerLock .RUnlock ()
309
+ return m .manager != nil
310
+ }
311
+
312
+ func (m * LazyInformerManager ) ensureManagerSet () error {
313
+ if m .isManagerSet () {
314
+ return nil
315
+ }
316
+
317
+ m .managerLock .Lock ()
318
+ defer m .managerLock .Unlock ()
319
+ // we need to check again in case the manager was set between locking
320
+ if m .manager != nil {
321
+ return nil
322
+ }
323
+
324
+ ctbAPIAvailable , err := clusterTrustBundlesAvailable (m .client )
325
+ if err != nil {
326
+ return fmt .Errorf ("failed to determine which informer manager to choose: %w" , err )
327
+ }
328
+
329
+ if ! ctbAPIAvailable {
330
+ m .manager = & NoopManager {}
331
+ return nil
332
+ }
333
+
334
+ kubeInformers := informers .NewSharedInformerFactoryWithOptions (m .client , 0 )
335
+ clusterTrustBundleManager , err := NewInformerManager (m .contextWithLogger , kubeInformers .Certificates ().V1beta1 ().ClusterTrustBundles (), m .cacheSize , 5 * time .Minute )
336
+ if err != nil {
337
+ return fmt .Errorf ("error starting informer-based ClusterTrustBundle manager: %w" , err )
338
+ }
339
+ m .manager = clusterTrustBundleManager
340
+ kubeInformers .Start (m .contextWithLogger .Done ())
341
+ m .logger .Info ("Started ClusterTrustBundle informer" )
342
+
343
+ // a cache fetch will likely follow right after, wait for the freshly started
344
+ // informers to sync
345
+ synced := true
346
+ timeoutContext , cancel := context .WithTimeout (m .contextWithLogger , 10 * time .Second )
347
+ defer cancel ()
348
+ m .logger .Info ("Waiting for ClusterTrustBundle informer to sync" )
349
+ for _ , ok := range kubeInformers .WaitForCacheSync (timeoutContext .Done ()) {
350
+ synced = synced && ok
351
+ }
352
+ if synced {
353
+ m .logger .Info ("ClusterTrustBundle informer synced" )
354
+ } else {
355
+ m .logger .Info ("ClusterTrustBundle informer not synced, continuing to attempt in background" )
356
+ }
357
+
358
+ return nil
359
+ }
360
+
361
+ func clusterTrustBundlesAvailable (client clientset.Interface ) (bool , error ) {
362
+ resList , err := client .Discovery ().ServerResourcesForGroupVersion (certificatesv1beta1 .SchemeGroupVersion .String ())
363
+ if k8serrors .IsNotFound (err ) {
364
+ return false , nil
365
+ }
366
+
367
+ if resList != nil {
368
+ // even in case of an error above there might be a partial list for APIs that
369
+ // were already successfully discovered
370
+ for _ , r := range resList .APIResources {
371
+ if r .Name == "clustertrustbundles" {
372
+ return true , nil
373
+ }
374
+ }
375
+ }
376
+ return false , err
377
+ }
0 commit comments