Skip to content

Commit c19f185

Browse files
author
Per Goncalves da Silva
committed
add safeguards around queue item resource types
Signed-off-by: Per Goncalves da Silva <[email protected]>
1 parent 36b3357 commit c19f185

File tree

3 files changed

+51
-59
lines changed

3 files changed

+51
-59
lines changed

pkg/controller/operators/catalog/operator.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -886,18 +886,16 @@ func (o *Operator) handleDeletion(obj interface{}) {
886886
func (o *Operator) handleCatSrcDeletion(obj interface{}) {
887887
catsrc, ok := obj.(metav1.Object)
888888
if !ok {
889+
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
889890
if !ok {
890-
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
891-
if !ok {
892-
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
893-
return
894-
}
891+
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
892+
return
893+
}
895894

896-
catsrc, ok = tombstone.Obj.(metav1.Object)
897-
if !ok {
898-
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Namespace %#v", obj))
899-
return
900-
}
895+
catsrc, ok = tombstone.Obj.(metav1.Object)
896+
if !ok {
897+
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Namespace %#v", obj))
898+
return
901899
}
902900
}
903901
sourceKey := registry.CatalogKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()}

pkg/lib/queueinformer/queueinformer.go

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@ func (q *QueueInformer) Enqueue(event kubestate.ResourceEvent) {
4444
}
4545

4646
resource := event.Resource()
47-
if event.Type() == kubestate.ResourceDeleted {
48-
// Get object from tombstone if possible
49-
if tombstone, ok := resource.(cache.DeletedFinalStateUnknown); ok {
50-
resource = tombstone
51-
}
52-
} else {
47+
48+
// Delete operations should always carry either something assignable to
49+
// metev1.Object or cache.DeletedFinalStateUnknown.
50+
// Add/Update events coming from the informer should have their resource
51+
// converted to a key (string) before being enqueued.
52+
if event.Type() != kubestate.ResourceDeleted {
5353
// Extract key for add and update events
5454
if key, ok := q.key(resource); ok {
5555
resource = key
@@ -69,7 +69,7 @@ func (q *QueueInformer) key(obj interface{}) (string, bool) {
6969

7070
// resourceHandlers provides the default implementation for responding to events
7171
// these simply Log the event and add the object's key to the queue for later processing.
72-
func (q *QueueInformer) resourceHandlers(ctx context.Context) *cache.ResourceEventHandlerFuncs {
72+
func (q *QueueInformer) resourceHandlers(_ context.Context) *cache.ResourceEventHandlerFuncs {
7373
return &cache.ResourceEventHandlerFuncs{
7474
AddFunc: func(obj interface{}) {
7575
q.Enqueue(kubestate.NewResourceEvent(kubestate.ResourceUpdated, obj))
@@ -104,25 +104,6 @@ func (q *QueueInformer) metricHandlers() *cache.ResourceEventHandlerFuncs {
104104
}
105105
}
106106

107-
func NewQueue(ctx context.Context, options ...Option) (*QueueInformer, error) {
108-
config := defaultConfig()
109-
config.apply(options)
110-
111-
if err := config.validateQueue(); err != nil {
112-
return nil, err
113-
}
114-
115-
queue := &QueueInformer{
116-
MetricsProvider: config.provider,
117-
logger: config.logger,
118-
queue: config.queue,
119-
keyFunc: config.keyFunc,
120-
syncer: config.syncer,
121-
}
122-
123-
return queue, nil
124-
}
125-
126107
// NewQueueInformer returns a new QueueInformer configured with options.
127108
func NewQueueInformer(ctx context.Context, options ...Option) (*QueueInformer, error) {
128109
// Get default config and apply given options

pkg/lib/queueinformer/queueinformer_operator.go

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package queueinformer
33
import (
44
"context"
55
"fmt"
6+
"sigs.k8s.io/controller-runtime/pkg/client"
67
"sync"
78
"time"
89

@@ -270,41 +271,53 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
270271
}
271272
defer queue.Done(item)
272273

274+
// As per the 'validateQueueInformer' function in config.go in this package
275+
// a valid queue informer must have either an indexer or an informer defined.
276+
// As per the 'complete' function in the same file, if an indexer is not defined
277+
// the informer indexer will be used.
278+
// NewQueueInformer calls complete after applying the options. Therefore,
279+
// an indexer should always be present.
280+
if loop.indexer == nil {
281+
panic("invalid queueinformer: no indexer defined")
282+
}
283+
273284
logger := o.logger.WithField("item", item)
274285
logger.WithField("queue-length", queue.Len()).Trace("popped queue")
275286

276287
var event = item
288+
277289
if item.Type() != kubestate.ResourceDeleted {
278-
// Get the key
279-
key, keyable := loop.key(item)
280-
if !keyable {
281-
logger.WithField("item", item).Warn("could not form key")
282-
queue.Forget(item)
283-
return true
290+
// Get the key - add/update resource operations must always contain a string resource
291+
key, ok := item.Resource().(string)
292+
if !ok {
293+
panic(fmt.Sprintf("unexpected item resource type: %T for add/update operation", item.Resource()))
284294
}
285295

286296
logger = logger.WithField("cache-key", key)
287297

288298
var resource interface{}
289-
if loop.indexer == nil {
290-
resource = item.Resource()
291-
} else {
292-
// Get the current cached version of the resource
293-
var exists bool
294-
var err error
295-
resource, exists, err = loop.indexer.GetByKey(key)
296-
if err != nil {
297-
logger.WithError(err).Error("cache get failed")
298-
queue.Forget(item)
299-
return true
300-
}
301-
if !exists {
302-
logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache")
303-
queue.Forget(item)
304-
return true
305-
}
299+
300+
// Get the current cached version of the resource
301+
resource, exists, err := loop.indexer.GetByKey(key)
302+
if err != nil {
303+
logger.WithError(err).Error("cache get failed")
304+
queue.Forget(item)
305+
return true
306+
}
307+
if !exists {
308+
logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache")
309+
queue.Forget(item)
310+
return true
306311
}
312+
307313
event = kubestate.NewResourceEvent(item.Type(), resource)
314+
} else {
315+
switch item.Resource().(type) {
316+
case client.Object:
317+
case cache.DeletedFinalStateUnknown:
318+
default:
319+
panic(fmt.Sprintf("unexpected item resource type: %T for delete operation", item.Resource()))
320+
}
308321
}
309322

310323
// Sync and requeue on error (throw out failed deletion syncs)

0 commit comments

Comments
 (0)