Skip to content

Commit c080864

Browse files
authored
Merge pull request #663 from jetstack/pass_context
Use new client-go functions for contextual logging
2 parents 5476b9d + 3622734 commit c080864

File tree

7 files changed

+23
-30
lines changed

7 files changed

+23
-30
lines changed

pkg/agent/dummy_data_gatherer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ type dummyDataGatherer struct {
2929
FailedAttempts int
3030
}
3131

32-
func (g *dummyDataGatherer) Run(stopCh <-chan struct{}) error {
32+
func (g *dummyDataGatherer) Run(ctx context.Context) error {
3333
// no async functionality, see Fetch
3434
return nil
3535
}
3636

37-
func (g *dummyDataGatherer) WaitForCacheSync(stopCh <-chan struct{}) error {
37+
func (g *dummyDataGatherer) WaitForCacheSync(ctx context.Context) error {
3838
// no async functionality, see Fetch
3939
return nil
4040
}

pkg/agent/run.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) {
191191
// blocks until the supplied channel is closed.
192192
// For this reason, we must allow these errgroup Go routines to exit
193193
// without cancelling the other Go routines in the group.
194-
if err := newDg.Run(gctx.Done()); err != nil {
194+
if err := newDg.Run(gctx); err != nil {
195195
return fmt.Errorf("failed to start %q data gatherer %q: %v", kind, dgConfig.Name, err)
196196
}
197197
return nil
@@ -220,7 +220,7 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) {
220220
// wait for the informer to complete an initial sync, we do this to
221221
// attempt to have an initial set of data for the first upload of
222222
// the run.
223-
if err := dg.WaitForCacheSync(bootCtx.Done()); err != nil {
223+
if err := dg.WaitForCacheSync(bootCtx); err != nil {
224224
// log sync failure, this might recover in future
225225
if errors.Is(err, k8s.ErrCacheSyncTimeout) {
226226
timedoutDGs = append(timedoutDGs, dgConfig.Name)

pkg/datagatherer/datagatherer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ type DataGatherer interface {
1717
Fetch() (data interface{}, count int, err error)
1818
// Run starts the data gatherer's informers for resource collection.
1919
// Returns error if the data gatherer informer wasn't initialized
20-
Run(stopCh <-chan struct{}) error
20+
Run(ctx context.Context) error
2121
// WaitForCacheSync waits for the data gatherer's informers cache to sync.
22-
WaitForCacheSync(stopCh <-chan struct{}) error
22+
WaitForCacheSync(ctx context.Context) error
2323
// Delete, clear the cache of the DataGatherer if one is being used
2424
Delete() error
2525
}

pkg/datagatherer/k8s/discovery.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ type DataGathererDiscovery struct {
4747
cl *discovery.DiscoveryClient
4848
}
4949

50-
func (g *DataGathererDiscovery) Run(stopCh <-chan struct{}) error {
50+
func (g *DataGathererDiscovery) Run(ctx context.Context) error {
5151
// no async functionality, see Fetch
5252
return nil
5353
}
5454

55-
func (g *DataGathererDiscovery) WaitForCacheSync(stopCh <-chan struct{}) error {
55+
func (g *DataGathererDiscovery) WaitForCacheSync(ctx context.Context) error {
5656
// no async functionality, see Fetch
5757
return nil
5858
}

pkg/datagatherer/k8s/dynamic.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,6 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami
181181
dgCache := cache.New(5*time.Minute, 30*time.Second)
182182

183183
newDataGatherer := &DataGathererDynamic{
184-
ctx: ctx,
185184
groupVersionResource: c.GroupVersionResource,
186185
fieldSelector: fieldSelector.String(),
187186
namespaces: c.IncludeNamespaces,
@@ -217,7 +216,7 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami
217216
newDataGatherer.informer = factory.ForResource(c.GroupVersionResource).Informer()
218217
}
219218

220-
registration, err := newDataGatherer.informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{
219+
registration, err := newDataGatherer.informer.AddEventHandlerWithOptions(k8scache.ResourceEventHandlerFuncs{
221220
AddFunc: func(obj interface{}) {
222221
onAdd(log, obj, dgCache)
223222
},
@@ -227,6 +226,8 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami
227226
DeleteFunc: func(obj interface{}) {
228227
onDelete(log, obj, dgCache)
229228
},
229+
}, k8scache.HandlerOptions{
230+
Logger: &log,
230231
})
231232
if err != nil {
232233
return nil, err
@@ -243,7 +244,6 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami
243244
// This is to allow us to support arbitrary CRDs and resources that Preflight
244245
// does not have registered as part of its `runtime.Scheme`.
245246
type DataGathererDynamic struct {
246-
ctx context.Context
247247
// groupVersionResource is the name of the API group, version and resource
248248
// that should be fetched by this data gatherer.
249249
groupVersionResource schema.GroupVersionResource
@@ -269,8 +269,8 @@ type DataGathererDynamic struct {
269269
// Run starts the dynamic data gatherer's informers for resource collection.
270270
// Returns error if the data gatherer informer wasn't initialized, Run blocks
271271
// until the stopCh is closed.
272-
func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error {
273-
log := klog.FromContext(g.ctx)
272+
func (g *DataGathererDynamic) Run(ctx context.Context) error {
273+
log := klog.FromContext(ctx)
274274
if g.informer == nil {
275275
return fmt.Errorf("informer was not initialized, impossible to start")
276276
}
@@ -288,7 +288,7 @@ func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error {
288288
}
289289

290290
// start shared informer
291-
g.informer.Run(stopCh)
291+
g.informer.RunWithContext(ctx)
292292

293293
return nil
294294
}
@@ -298,8 +298,9 @@ var ErrCacheSyncTimeout = fmt.Errorf("timed out waiting for Kubernetes cache to
298298
// WaitForCacheSync waits for the data gatherer's informers cache to sync before
299299
// collecting the resources. Use errors.Is(err, ErrCacheSyncTimeout) to check if
300300
// the cache sync failed.
301-
func (g *DataGathererDynamic) WaitForCacheSync(stopCh <-chan struct{}) error {
302-
if !k8scache.WaitForCacheSync(stopCh, g.registration.HasSynced) {
301+
func (g *DataGathererDynamic) WaitForCacheSync(ctx context.Context) error {
302+
// Don't use WaitForNamedCacheSync, since we don't want to log extra messages.
303+
if !k8scache.WaitForCacheSync(ctx.Done(), g.registration.HasSynced) {
303304
return ErrCacheSyncTimeout
304305
}
305306

pkg/datagatherer/k8s/dynamic_test.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,6 @@ func TestNewDataGathererWithClientAndDynamicInformer(t *testing.T) {
134134
}
135135

136136
expected := &DataGathererDynamic{
137-
ctx: ctx,
138137
groupVersionResource: config.GroupVersionResource,
139138
// it's important that the namespaces are set as the IncludeNamespaces
140139
// during initialization
@@ -144,9 +143,6 @@ func TestNewDataGathererWithClientAndDynamicInformer(t *testing.T) {
144143

145144
gatherer := dg.(*DataGathererDynamic)
146145
// test gatherer's fields
147-
if !reflect.DeepEqual(gatherer.ctx, expected.ctx) {
148-
t.Errorf("expected %v, got %v", expected, dg)
149-
}
150146
if !reflect.DeepEqual(gatherer.groupVersionResource, expected.groupVersionResource) {
151147
t.Errorf("expected %v, got %v", expected, dg)
152148
}
@@ -180,7 +176,6 @@ func TestNewDataGathererWithClientAndSharedIndexInformer(t *testing.T) {
180176
}
181177

182178
expected := &DataGathererDynamic{
183-
ctx: ctx,
184179
groupVersionResource: config.GroupVersionResource,
185180
// it's important that the namespaces are set as the IncludeNamespaces
186181
// during initialization
@@ -189,9 +184,6 @@ func TestNewDataGathererWithClientAndSharedIndexInformer(t *testing.T) {
189184

190185
gatherer := dg.(*DataGathererDynamic)
191186
// test gatherer's fields
192-
if !reflect.DeepEqual(gatherer.ctx, expected.ctx) {
193-
t.Errorf("expected %v, got %v", expected, dg)
194-
}
195187
if !reflect.DeepEqual(gatherer.groupVersionResource, expected.groupVersionResource) {
196188
t.Errorf("expected %v, got %v", expected, dg)
197189
}
@@ -693,11 +685,11 @@ func TestDynamicGatherer_Fetch(t *testing.T) {
693685
// start data gatherer informer
694686
dynamiDg := dg
695687
go func() {
696-
if err = dynamiDg.Run(ctx.Done()); err != nil {
688+
if err = dynamiDg.Run(ctx); err != nil {
697689
t.Errorf("unexpected client error: %+v", err)
698690
}
699691
}()
700-
err = dynamiDg.WaitForCacheSync(ctx.Done())
692+
err = dynamiDg.WaitForCacheSync(ctx)
701693
if err != nil {
702694
t.Fatalf("unexpected client error: %+v", err)
703695
}
@@ -1010,11 +1002,11 @@ func TestDynamicGathererNativeResources_Fetch(t *testing.T) {
10101002
// start data gatherer informer
10111003
dynamiDg := dg
10121004
go func() {
1013-
if err = dynamiDg.Run(ctx.Done()); err != nil {
1005+
if err = dynamiDg.Run(ctx); err != nil {
10141006
t.Errorf("unexpected client error: %+v", err)
10151007
}
10161008
}()
1017-
err = dynamiDg.WaitForCacheSync(ctx.Done())
1009+
err = dynamiDg.WaitForCacheSync(ctx)
10181010
if err != nil {
10191011
t.Fatalf("unexpected client error: %+v", err)
10201012
}

pkg/datagatherer/local/local.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func (c *Config) NewDataGatherer(ctx context.Context) (datagatherer.DataGatherer
3838
}, nil
3939
}
4040

41-
func (g *DataGatherer) Run(stopCh <-chan struct{}) error {
41+
func (g *DataGatherer) Run(ctx context.Context) error {
4242
// no async functionality, see Fetch
4343
return nil
4444
}
@@ -48,7 +48,7 @@ func (g *DataGatherer) Delete() error {
4848
return nil
4949
}
5050

51-
func (g *DataGatherer) WaitForCacheSync(stopCh <-chan struct{}) error {
51+
func (g *DataGatherer) WaitForCacheSync(ctx context.Context) error {
5252
// no async functionality, see Fetch
5353
return nil
5454
}

0 commit comments

Comments
 (0)