@@ -69,7 +69,9 @@ const (
69
69
)
70
70
71
71
type apiMeta struct {
72
- namespaced bool
72
+ namespaced bool
73
+ // watchCancel stops the watch of all resources for this API. This gets called when the cache is invalidated or when
74
+ // the watched API ceases to exist (e.g. a CRD gets deleted).
73
75
watchCancel context.CancelFunc
74
76
}
75
77
@@ -468,7 +470,7 @@ func (c *clusterCache) stopWatching(gk schema.GroupKind, ns string) {
468
470
}
469
471
}
470
472
471
- // startMissingWatches lists supported cluster resources and start watching for changes unless watch is already running
473
+ // startMissingWatches lists supported cluster resources and starts watching for changes unless watch is already running
472
474
func (c * clusterCache ) startMissingWatches () error {
473
475
apis , err := c .kubectl .GetAPIResources (c .config , true , c .settings .ResourcesFilter )
474
476
if err != nil {
@@ -570,6 +572,7 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso
570
572
return resourceVersion , callback (listPager )
571
573
}
572
574
575
+ // loadInitialState loads the state of all the resources retrieved by the given resource client.
573
576
func (c * clusterCache ) loadInitialState (ctx context.Context , api kube.APIResourceInfo , resClient dynamic.ResourceInterface , ns string , lock bool ) (string , error ) {
574
577
var items []* Resource
575
578
resourceVersion , err := c .listResources (ctx , resClient , func (listPager * pager.ListPager ) error {
@@ -728,6 +731,9 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
728
731
})
729
732
}
730
733
734
+ // processApi processes all the resources for a given API. First we construct an API client for the given API. Then we
735
+ // call the callback. If we're managing the whole cluster, we call the callback with the client and an empty namespace.
736
+ // If we're managing specific namespaces, we call the callback for each namespace.
731
737
func (c * clusterCache ) processApi (client dynamic.Interface , api kube.APIResourceInfo , callback func (resClient dynamic.ResourceInterface , ns string ) error ) error {
732
738
resClient := client .Resource (api .GroupVersionResource )
733
739
switch {
@@ -797,6 +803,17 @@ func (c *clusterCache) checkPermission(ctx context.Context, reviewInterface auth
797
803
return true , nil
798
804
}
799
805
806
+ // sync retrieves the current state of the cluster and stores relevant information in the clusterCache fields.
807
+ //
808
+ // First we get some metadata from the cluster, like the server version, OpenAPI document, and the list of all API
809
+ // resources.
810
+ //
811
+ // Then we get a list of the preferred versions of all API resources which are to be monitored (it's possible to exclude
812
+ // resources from monitoring). We loop through those APIs asynchronously and for each API we list all resources. We also
813
+ // kick off a goroutine to watch the resources for that API and update the cache constantly.
814
+ //
815
+ // When this function exits, the cluster cache is up to date, and the appropriate resources are being watched for
816
+ // changes.
800
817
func (c * clusterCache ) sync () error {
801
818
c .log .Info ("Start syncing cluster" )
802
819
@@ -843,6 +860,8 @@ func (c *clusterCache) sync() error {
843
860
if err != nil {
844
861
return err
845
862
}
863
+
864
+ // Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields.
846
865
lock := sync.Mutex {}
847
866
err = kube .RunAllAsync (len (apis ), func (i int ) error {
848
867
api := apis [i ]
0 commit comments