Skip to content

Commit 924390f

Browse files
authored
Merge pull request #571 from jetstack/cleanup_datagather
Cleanup Kubernetes datagather
2 parents feb4836 + 7f756c3 commit 924390f

File tree

8 files changed

+95
-139
lines changed

8 files changed

+95
-139
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ predicate.json
1414
*.tgz
1515

1616
_bin
17+
.vscode

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ require (
2020
github.com/spf13/cobra v1.8.0
2121
github.com/spf13/pflag v1.0.5
2222
github.com/stretchr/testify v1.9.0
23+
golang.org/x/sync v0.8.0
2324
gopkg.in/yaml.v2 v2.4.0
2425
k8s.io/api v0.30.3
2526
k8s.io/apimachinery v0.30.3
@@ -53,7 +54,6 @@ require (
5354
golang.org/x/crypto v0.24.0 // indirect
5455
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect
5556
golang.org/x/net v0.26.0 // indirect
56-
golang.org/x/sync v0.8.0 // indirect
5757
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
5858
google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe // indirect
5959
google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe // indirect

pkg/agent/run.go

Lines changed: 35 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ import (
1111
_ "net/http/pprof"
1212
"os"
1313
"strings"
14-
"sync"
1514
"time"
1615

1716
"github.com/cenkalti/backoff"
1817
"github.com/hashicorp/go-multierror"
1918
"github.com/prometheus/client_golang/prometheus"
2019
"github.com/prometheus/client_golang/prometheus/promhttp"
2120
"github.com/spf13/cobra"
21+
"golang.org/x/sync/errgroup"
2222
"sigs.k8s.io/controller-runtime/pkg/manager"
2323

2424
"github.com/jetstack/preflight/api"
@@ -101,7 +101,15 @@ func Run(cmd *cobra.Command, args []string) {
101101
}
102102

103103
dataGatherers := map[string]datagatherer.DataGatherer{}
104-
var wg sync.WaitGroup
104+
group, gctx := errgroup.WithContext(ctx)
105+
106+
defer func() {
107+
// TODO: replace Fatalf log calls with Errorf and return the error
108+
cancel()
109+
if err := group.Wait(); err != nil {
110+
logs.Log.Fatalf("failed to wait for controller-runtime component to stop: %v", err)
111+
}
112+
}()
105113

106114
// load datagatherer config and boot each one
107115
for _, dgConfig := range config.DataGatherers {
@@ -111,32 +119,20 @@ func Run(cmd *cobra.Command, args []string) {
111119
logs.Log.Fatalf("running data gatherer %s of type %s as Local, data-path override present: %s", dgConfig.Name, dgConfig.Kind, dgConfig.DataPath)
112120
}
113121

114-
newDg, err := dgConfig.Config.NewDataGatherer(ctx)
122+
newDg, err := dgConfig.Config.NewDataGatherer(gctx)
115123
if err != nil {
116124
logs.Log.Fatalf("failed to instantiate %q data gatherer %q: %v", kind, dgConfig.Name, err)
117125
}
118126

119127
logs.Log.Printf("starting %q datagatherer", dgConfig.Name)
120128

121129
// start the data gatherers and wait for the cache sync
122-
if err := newDg.Run(ctx.Done()); err != nil {
123-
logs.Log.Printf("failed to start %q data gatherer %q: %v", kind, dgConfig.Name, err)
124-
}
125-
126-
// bootCtx is a context with a timeout to allow the informer 5
127-
// seconds to perform an initial sync. It may fail, and that's fine
128-
// too, it will backoff and retry of its own accord. Initial boot
129-
// will only be delayed by a max of 5 seconds.
130-
bootCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
131-
defer cancel()
132-
133-
// wait for the informer to complete an initial sync, we do this to
134-
// attempt to have an initial set of data for the first upload of
135-
// the run.
136-
if err := newDg.WaitForCacheSync(bootCtx.Done()); err != nil {
137-
// log sync failure, this might recover in future
138-
logs.Log.Printf("failed to complete initial sync of %q data gatherer %q: %v", kind, dgConfig.Name, err)
139-
}
130+
group.Go(func() error {
131+
if err := newDg.Run(gctx.Done()); err != nil {
132+
return fmt.Errorf("failed to start %q data gatherer %q: %v", kind, dgConfig.Name, err)
133+
}
134+
return nil
135+
})
140136

141137
// regardless of success, this dataGatherers has been given a
142138
// chance to sync its cache and we will now continue as normal. We
@@ -145,19 +141,24 @@ func Run(cmd *cobra.Command, args []string) {
145141
dataGatherers[dgConfig.Name] = newDg
146142
}
147143

148-
// wait for initial sync period to complete. if unsuccessful, then crash
149-
// and restart.
150-
c := make(chan struct{})
151-
go func() {
152-
defer close(c)
153-
logs.Log.Printf("waiting for datagatherers to complete inital syncs")
154-
wg.Wait()
155-
}()
156-
select {
157-
case <-c:
158-
logs.Log.Printf("datagatherers inital sync completed")
159-
case <-time.After(60 * time.Second):
160-
logs.Log.Fatalf("datagatherers inital sync failed due to timeout of 60 seconds")
144+
// Wait for 5 seconds for all informers to sync. If they fail to sync
145+
// we continue (as we have no way to know if they will recover or not).
146+
//
147+
// bootCtx is a context with a timeout to allow the informer 5
148+
// seconds to perform an initial sync. It may fail, and that's fine
149+
// too, it will backoff and retry of its own accord. Initial boot
150+
// will only be delayed by a max of 5 seconds.
151+
bootCtx, bootCancel := context.WithTimeout(gctx, 5*time.Second)
152+
defer bootCancel()
153+
for _, dgConfig := range config.DataGatherers {
154+
dg := dataGatherers[dgConfig.Name]
155+
// wait for the informer to complete an initial sync, we do this to
156+
// attempt to have an initial set of data for the first upload of
157+
// the run.
158+
if err := dg.WaitForCacheSync(bootCtx.Done()); err != nil {
159+
// log sync failure, this might recover in future
160+
logs.Log.Printf("failed to complete initial sync of %q data gatherer %q: %v", dgConfig.Kind, dgConfig.Name, err)
161+
}
161162
}
162163

163164
// begin the datagathering loop, periodically sending data to the

pkg/datagatherer/k8s/cache.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,7 @@ func onDelete(obj interface{}, dgCache *cache.Cache) {
7575
// creates a new updated instance of a cache object, with the resource
7676
// argument. If the object is present in the cache it fetches the object's
7777
// properties.
78-
func updateCacheGatheredResource(cacheKey string, resource interface{},
79-
dgCache *cache.Cache) *api.GatheredResource {
78+
func updateCacheGatheredResource(cacheKey string, resource interface{}, dgCache *cache.Cache) *api.GatheredResource {
8079
// updated cache object
8180
cacheObject := &api.GatheredResource{
8281
Resource: resource,

pkg/datagatherer/k8s/client.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,44 +18,45 @@ func NewDynamicClient(kubeconfigPath string) (dynamic.Interface, error) {
1818
if err != nil {
1919
return nil, errors.WithStack(err)
2020
}
21+
2122
cl, err := dynamic.NewForConfig(cfg)
2223
if err != nil {
2324
return nil, errors.WithStack(err)
2425
}
26+
2527
return cl, nil
2628
}
2729

2830
// NewDiscoveryClient creates a new 'discovery' client using the provided
2931
// kubeconfig. If kubeconfigPath is not set/empty, it will attempt to load
3032
// configuration using the default loading rules.
31-
func NewDiscoveryClient(kubeconfigPath string) (discovery.DiscoveryClient, error) {
32-
var discoveryClient *discovery.DiscoveryClient
33-
33+
func NewDiscoveryClient(kubeconfigPath string) (*discovery.DiscoveryClient, error) {
3434
cfg, err := kubeconfig.LoadRESTConfig(kubeconfigPath)
3535
if err != nil {
36-
return discovery.DiscoveryClient{}, errors.WithStack(err)
36+
return nil, errors.WithStack(err)
3737
}
3838

39-
discoveryClient, err = discovery.NewDiscoveryClientForConfig(cfg)
39+
discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg)
4040
if err != nil {
41-
return *discoveryClient, errors.WithStack(err)
41+
return nil, errors.WithStack(err)
4242
}
4343

44-
return *discoveryClient, nil
44+
return discoveryClient, nil
4545
}
4646

4747
// NewClientSet creates a new kubernetes clientset using the provided kubeconfig.
4848
// If kubeconfigPath is not set/empty, it will attempt to load configuration using
4949
// the default loading rules.
5050
func NewClientSet(kubeconfigPath string) (kubernetes.Interface, error) {
51-
var clientset *kubernetes.Clientset
5251
cfg, err := kubeconfig.LoadRESTConfig(kubeconfigPath)
5352
if err != nil {
5453
return nil, errors.WithStack(err)
5554
}
56-
clientset, err = kubernetes.NewForConfig(cfg)
55+
56+
clientset, err := kubernetes.NewForConfig(cfg)
5757
if err != nil {
5858
return nil, errors.WithStack(err)
5959
}
60+
6061
return clientset, nil
6162
}

pkg/datagatherer/k8s/discovery.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func (c *ConfigDiscovery) NewDataGatherer(ctx context.Context) (datagatherer.Dat
4343
// DataGathererDiscovery stores the config for a k8s-discovery datagatherer
4444
type DataGathererDiscovery struct {
4545
// The 'discovery' client used for fetching data.
46-
cl discovery.DiscoveryClient
46+
cl *discovery.DiscoveryClient
4747
}
4848

4949
func (g *DataGathererDiscovery) Run(stopCh <-chan struct{}) error {

pkg/datagatherer/k8s/dynamic.go

Lines changed: 32 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -143,20 +143,21 @@ var kubernetesNativeResources = map[schema.GroupVersionResource]sharedInformerFu
143143

144144
// NewDataGatherer constructs a new instance of the generic K8s data-gatherer for the provided
145145
func (c *ConfigDynamic) NewDataGatherer(ctx context.Context) (datagatherer.DataGatherer, error) {
146-
cl, err := NewDynamicClient(c.KubeConfigPath)
147-
if err != nil {
148-
return nil, err
149-
}
150-
151146
if isNativeResource(c.GroupVersionResource) {
152147
clientset, err := NewClientSet(c.KubeConfigPath)
153148
if err != nil {
154149
return nil, errors.WithStack(err)
155150
}
151+
156152
return c.newDataGathererWithClient(ctx, nil, clientset)
157-
}
153+
} else {
154+
cl, err := NewDynamicClient(c.KubeConfigPath)
155+
if err != nil {
156+
return nil, err
157+
}
158158

159-
return c.newDataGathererWithClient(ctx, cl, nil)
159+
return c.newDataGathererWithClient(ctx, cl, nil)
160+
}
160161
}
161162

162163
func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynamic.Interface, clientset kubernetes.Interface) (datagatherer.DataGatherer, error) {
@@ -178,8 +179,6 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami
178179

179180
newDataGatherer := &DataGathererDynamic{
180181
ctx: ctx,
181-
cl: cl,
182-
k8sClientSet: clientset,
183182
groupVersionResource: c.GroupVersionResource,
184183
fieldSelector: fieldSelector.String(),
185184
namespaces: c.IncludeNamespaces,
@@ -200,34 +199,22 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami
200199
informers.WithNamespace(metav1.NamespaceAll),
201200
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
202201
options.FieldSelector = fieldSelector.String()
203-
}))
204-
newDataGatherer.nativeSharedInformer = factory
205-
informer := informerFunc(factory)
206-
informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{
207-
AddFunc: func(obj interface{}) {
208-
onAdd(obj, dgCache)
209-
},
210-
UpdateFunc: func(old, new interface{}) {
211-
onUpdate(old, new, dgCache)
212-
},
213-
DeleteFunc: func(obj interface{}) {
214-
onDelete(obj, dgCache)
202+
}),
203+
)
204+
newDataGatherer.informer = informerFunc(factory)
205+
} else {
206+
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(
207+
cl,
208+
60*time.Second,
209+
metav1.NamespaceAll,
210+
func(options *metav1.ListOptions) {
211+
options.FieldSelector = fieldSelector.String()
215212
},
216-
})
217-
newDataGatherer.informer = informer
218-
return newDataGatherer, nil
213+
)
214+
newDataGatherer.informer = factory.ForResource(c.GroupVersionResource).Informer()
219215
}
220216

221-
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(
222-
cl,
223-
60*time.Second,
224-
metav1.NamespaceAll,
225-
func(options *metav1.ListOptions) { options.FieldSelector = fieldSelector.String() },
226-
)
227-
resourceInformer := factory.ForResource(c.GroupVersionResource)
228-
informer := resourceInformer.Informer()
229-
newDataGatherer.dynamicSharedInformer = factory
230-
informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{
217+
registration, err := newDataGatherer.informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{
231218
AddFunc: func(obj interface{}) {
232219
onAdd(obj, dgCache)
233220
},
@@ -238,7 +225,10 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami
238225
onDelete(obj, dgCache)
239226
},
240227
})
241-
newDataGatherer.informer = informer
228+
if err != nil {
229+
return nil, err
230+
}
231+
newDataGatherer.registration = registration
242232

243233
return newDataGatherer, nil
244234
}
@@ -251,10 +241,6 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami
251241
// does not have registered as part of its `runtime.Scheme`.
252242
type DataGathererDynamic struct {
253243
ctx context.Context
254-
// The 'dynamic' client used for fetching data.
255-
cl dynamic.Interface
256-
// The k8s clientset used for fetching known resources.
257-
k8sClientSet kubernetes.Interface
258244
// groupVersionResource is the name of the API group, version and resource
259245
// that should be fetched by this data gatherer.
260246
groupVersionResource schema.GroupVersionResource
@@ -270,19 +256,15 @@ type DataGathererDynamic struct {
270256
// 30 seconds purge time https://pkg.go.dev/github.com/patrickmn/go-cache
271257
cache *cache.Cache
272258
// informer watches the events around the targeted resource and updates the cache
273-
informer k8scache.SharedIndexInformer
274-
dynamicSharedInformer dynamicinformer.DynamicSharedInformerFactory
275-
nativeSharedInformer informers.SharedInformerFactory
276-
277-
// isInitialized is set to true when data is first collected, prior to
278-
// this the fetch method will return an error
279-
isInitialized bool
259+
informer k8scache.SharedIndexInformer
260+
registration k8scache.ResourceEventHandlerRegistration
280261
}
281262

282263
// Run starts the dynamic data gatherer's informers for resource collection.
283-
// Returns error if the data gatherer informer wasn't initialized
264+
// Returns error if the data gatherer informer wasn't initialized, Run blocks
265+
// until the stopCh is closed.
284266
func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error {
285-
if g.dynamicSharedInformer == nil && g.nativeSharedInformer == nil {
267+
if g.informer == nil {
286268
return fmt.Errorf("informer was not initialized, impossible to start")
287269
}
288270

@@ -299,21 +281,15 @@ func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error {
299281
}
300282

301283
// start shared informer
302-
if g.dynamicSharedInformer != nil {
303-
g.dynamicSharedInformer.Start(stopCh)
304-
}
305-
306-
if g.nativeSharedInformer != nil {
307-
g.nativeSharedInformer.Start(stopCh)
308-
}
284+
g.informer.Run(stopCh)
309285

310286
return nil
311287
}
312288

313289
// WaitForCacheSync waits for the data gatherer's informers cache to sync
314290
// before collecting the resources.
315291
func (g *DataGathererDynamic) WaitForCacheSync(stopCh <-chan struct{}) error {
316-
if !k8scache.WaitForCacheSync(stopCh, g.informer.HasSynced) {
292+
if !k8scache.WaitForCacheSync(stopCh, g.registration.HasSynced) {
317293
return fmt.Errorf("timed out waiting for Kubernetes caches to sync")
318294
}
319295

@@ -432,16 +408,6 @@ func redactList(list []*api.GatheredResource) error {
432408
return nil
433409
}
434410

435-
// namespaceResourceInterface will 'namespace' a NamespaceableResourceInterface
436-
// if the 'namespace' parameter is non-empty, otherwise it will return the
437-
// given ResourceInterface as-is.
438-
func namespaceResourceInterface(iface dynamic.NamespaceableResourceInterface, namespace string) dynamic.ResourceInterface {
439-
if namespace == "" {
440-
return iface
441-
}
442-
return iface.Namespace(namespace)
443-
}
444-
445411
// generateExcludedNamespacesFieldSelector creates a field selector string from
446412
// a list of namespaces to exclude.
447413
func generateExcludedNamespacesFieldSelector(excludeNamespaces []string) fields.Selector {

0 commit comments

Comments
 (0)