Skip to content

Commit 3ffc106

Browse files
author
Per Goncalves da Silva
committed
refactor key function and ensure items always carry strings
Signed-off-by: Per Goncalves da Silva <[email protected]>
1 parent 4133943 commit 3ffc106

File tree

3 files changed

+49
-63
lines changed

3 files changed

+49
-63
lines changed

pkg/lib/queueinformer/config.go

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -82,24 +82,13 @@ func (c *queueInformerConfig) validateQueue() (err error) {
8282

8383
func defaultKeyFunc(obj interface{}) (string, bool) {
8484
// Get keys nested in resource events up to depth 2
85-
keyable := false
86-
for d := 0; d < 2 && !keyable; d++ {
87-
switch v := obj.(type) {
88-
case string:
89-
return v, true
90-
case kubestate.ResourceEvent:
91-
obj = v.Resource()
92-
default:
93-
keyable = true
94-
}
85+
switch v := obj.(type) {
86+
case string:
87+
return v, true
88+
default:
89+
k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
90+
return k, err == nil
9591
}
96-
97-
k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
98-
if err != nil {
99-
return k, false
100-
}
101-
102-
return k, true
10392
}
10493

10594
func defaultConfig() *queueInformerConfig {

pkg/lib/queueinformer/queueinformer.go

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,29 +35,23 @@ func (q *QueueInformer) Sync(ctx context.Context, event kubestate.ResourceEvent)
3535
return q.syncer.Sync(ctx, event)
3636
}
3737

38-
// Enqueue adds a key to the queue. If obj is a key already it gets added directly.
38+
// enqueue adds a key to the queue. If obj is a key already it gets added directly.
3939
// Otherwise, the key is extracted via keyFunc.
40-
func (q *QueueInformer) Enqueue(event kubestate.ResourceEvent) {
41-
if event == nil {
40+
func (q *QueueInformer) enqueue(eventType kubestate.ResourceEventType, obj interface{}) {
41+
if obj == nil {
4242
// Don't enqueue nil events
4343
return
4444
}
4545

46-
resource := event.Resource()
47-
if event.Type() == kubestate.ResourceDeleted {
48-
// Get object from tombstone if possible
49-
if tombstone, ok := resource.(cache.DeletedFinalStateUnknown); ok {
50-
resource = tombstone
51-
}
52-
} else {
53-
// Extract key for add and update events
54-
if key, ok := q.key(resource); ok {
55-
resource = key
56-
}
46+
// Extract key for add and update events
47+
key, ok := q.key(obj)
48+
if !ok {
49+
q.logger.Warnf("object %v not keyable: %v", obj, key)
50+
return
5751
}
5852

5953
// Create new resource event and add to queue
60-
e := kubestate.NewResourceEvent(event.Type(), resource)
54+
e := kubestate.NewResourceEvent(eventType, key)
6155
q.logger.WithField("event", e).Trace("enqueuing resource event")
6256
q.queue.Add(e)
6357
}
@@ -72,13 +66,16 @@ func (q *QueueInformer) key(obj interface{}) (string, bool) {
7266
func (q *QueueInformer) resourceHandlers(ctx context.Context) *cache.ResourceEventHandlerFuncs {
7367
return &cache.ResourceEventHandlerFuncs{
7468
AddFunc: func(obj interface{}) {
75-
q.Enqueue(kubestate.NewResourceEvent(kubestate.ResourceUpdated, obj))
69+
q.enqueue(kubestate.ResourceUpdated, obj)
7670
},
7771
UpdateFunc: func(oldObj, newObj interface{}) {
78-
q.Enqueue(kubestate.NewResourceEvent(kubestate.ResourceUpdated, newObj))
72+
q.enqueue(kubestate.ResourceUpdated, newObj)
7973
},
8074
DeleteFunc: func(obj interface{}) {
81-
q.Enqueue(kubestate.NewResourceEvent(kubestate.ResourceDeleted, obj))
75+
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
76+
obj = tombstone
77+
}
78+
q.enqueue(kubestate.ResourceDeleted, obj)
8279
},
8380
}
8481
}

pkg/lib/queueinformer/queueinformer_operator.go

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -273,39 +273,39 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
273273
logger := o.logger.WithField("item", item)
274274
logger.WithField("queue-length", queue.Len()).Trace("popped queue")
275275

276-
var event = item
277-
if item.Type() != kubestate.ResourceDeleted {
278-
// Get the key
279-
key, keyable := loop.key(item)
280-
if !keyable {
281-
logger.WithField("item", item).Warn("could not form key")
276+
if _, ok := item.Resource().(string); !ok {
277+
panic(fmt.Sprintf("unexpected item resource type: %T", item.Resource()))
278+
}
279+
280+
key, keyable := loop.key(item.Resource())
281+
if !keyable {
282+
logger.WithField("item", item).Warn("could not form key")
283+
queue.Forget(item)
284+
return true
285+
}
286+
287+
logger = logger.WithField("cache-key", key)
288+
289+
var resource interface{}
290+
if loop.indexer == nil {
291+
resource = item.Resource()
292+
} else {
293+
// Get the current cached version of the resource
294+
var exists bool
295+
var err error
296+
resource, exists, err = loop.indexer.GetByKey(key)
297+
if err != nil {
298+
logger.WithError(err).Error("cache get failed")
282299
queue.Forget(item)
283300
return true
284301
}
285-
286-
logger = logger.WithField("cache-key", key)
287-
288-
var resource interface{}
289-
if loop.indexer == nil {
290-
resource = item.Resource()
291-
} else {
292-
// Get the current cached version of the resource
293-
var exists bool
294-
var err error
295-
resource, exists, err = loop.indexer.GetByKey(key)
296-
if err != nil {
297-
logger.WithError(err).Error("cache get failed")
298-
queue.Forget(item)
299-
return true
300-
}
301-
if !exists {
302-
logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache")
303-
queue.Forget(item)
304-
return true
305-
}
302+
if !exists {
303+
logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache")
304+
queue.Forget(item)
305+
return true
306306
}
307-
event = kubestate.NewResourceEvent(item.Type(), resource)
308307
}
308+
event := kubestate.NewResourceEvent(item.Type(), resource)
309309

310310
// Sync and requeue on error (throw out failed deletion syncs)
311311
err := loop.Sync(ctx, event)

0 commit comments

Comments
 (0)