Skip to content

Commit a683bfe

Browse files
author
Per Goncalves da Silva
committed
try again
Signed-off-by: Per Goncalves da Silva <[email protected]>
1 parent 87ba582 commit a683bfe

File tree

5 files changed

+29
-34
lines changed

5 files changed

+29
-34
lines changed

pkg/lib/kubestate/kubestate.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -168,20 +168,25 @@ func (r resourceEvent) Resource() interface{} {
168168
}
169169

170170
func (r resourceEvent) String() string {
171-
key, err := cache.MetaNamespaceKeyFunc(r.resource)
172-
173-
// should not happen as resources must be either cache.ExplicitKey
174-
// or client.Object
171+
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(r.resource)
172+
// should not happen as resources must be either cache.ExplicitKey or client.Object
173+
// and this should be enforced in NewResourceEvent
175174
if err != nil {
176175
panic("could not get resource key: " + err.Error())
177176
}
178-
return key
177+
return fmt.Sprintf("{%s %s}", string(r.eventType), key)
179178
}
180179

181180
func NewUpdateEvent(resource interface{}) ResourceEvent {
182181
return NewResourceEvent(ResourceUpdated, resource)
183182
}
184183

184+
// NewResourceEvent creates a new resource event. The resource parameter must either be
185+
// a client.Object, a string, a cache.DeletedFinalStateUnknown, or a cache.ExplicitKey. In case it is a string, it will be
186+
// coerced to cache.ExplicitKey. This ensures that whether a reference (string/cache.ExplicitKey)
187+
// or a resource, workqueue will treat the items in the same way and dedup appropriately.
188+
// This behavior is guaranteed by the String() method, which will also ignore the type of event.
189+
// I.e. Add/Update/Delete events for the same resource object or reference will be ded
185190
func NewResourceEvent(eventType ResourceEventType, resource interface{}) ResourceEvent {
186191
// assert resource type
187192
// only accept cache.ExplicitKey or client.Objects
@@ -190,6 +195,7 @@ func NewResourceEvent(eventType ResourceEventType, resource interface{}) Resourc
190195
resource = cache.ExplicitKey(r)
191196
case cache.ExplicitKey:
192197
case client.Object:
198+
case cache.DeletedFinalStateUnknown:
193199
default:
194200
panic(fmt.Sprintf("NewResourceEvent called with invalid resource type: %T", resource))
195201
}

pkg/lib/queueinformer/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package queueinformer
22

33
import (
4+
"fmt"
45
"github.com/pkg/errors"
56
"github.com/sirupsen/logrus"
67
"k8s.io/client-go/discovery"
@@ -81,8 +82,12 @@ func (c *queueInformerConfig) validateQueue() (err error) {
8182
}
8283

8384
func defaultKeyFunc(obj interface{}) (string, bool) {
85+
if re, ok := obj.(kubestate.ResourceEvent); ok {
86+
obj = re.Resource()
87+
}
8488
k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
8589
if err != nil {
90+
fmt.Printf("error getting key for object %v: %v", obj, err)
8691
return k, false
8792
}
8893

pkg/lib/queueinformer/main/main.go

Lines changed: 0 additions & 12 deletions
This file was deleted.

pkg/lib/queueinformer/queueinformer.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package queueinformer
22

33
import (
44
"context"
5-
"fmt"
6-
75
"github.com/pkg/errors"
86
"github.com/sirupsen/logrus"
97
"k8s.io/client-go/tools/cache"
@@ -50,17 +48,17 @@ func (q *QueueInformer) Enqueue(event kubestate.ResourceEvent) {
5048
// metev1.Object or cache.DeletedFinalStateUnknown.
5149
// Add/Update events coming from the informer should have their resource
5250
// converted to a key (string) before being enqueued.
53-
if event.Type() != kubestate.ResourceDeleted {
54-
// Extract key for add and update events
55-
if key, ok := q.key(e.Resource()); ok {
56-
e = kubestate.NewResourceEvent(event.Type(), key)
57-
} else {
58-
// if the resource cannot be keyed the worker will not be able to process it
59-
// since it will not be able to retrieve the resource
60-
q.logger.WithField("event", e).Warn(fmt.Sprintf("resource of type %T is not keyable - skipping enqueue", e.Resource()))
61-
return
62-
}
63-
}
51+
//if event.Type() != kubestate.ResourceDeleted {
52+
// // Extract key for add and update events
53+
// if key, ok := q.key(e.Resource()); ok {
54+
// e = kubestate.NewResourceEvent(event.Type(), cache.ExplicitKey(key))
55+
// } else {
56+
// // if the resource cannot be keyed the worker will not be able to process it
57+
// // since it will not be able to retrieve the resource
58+
// q.logger.WithField("event", e).Warn(fmt.Sprintf("resource of type %T is not keyable - skipping enqueue", e.Resource()))
59+
// return
60+
// }
61+
//}
6462

6563
// Create new resource event and add to queue
6664
q.logger.WithField("event", e).Trace("enqueuing resource event")

pkg/lib/queueinformer/queueinformer_operator.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -284,8 +284,6 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
284284
// We then make sure that the ResourceEvent's String() returns the key for the
285285
// encapsulated resource. Thus, independent of the resource type, the queue always
286286
// processes it by key and dedups appropriately.
287-
// Furthermore, we also enforce here that Add/Update events always contain
288-
// cache.ExplicitKey as their Resource
289287
queue := loop.queue
290288
item, quit := queue.Get()
291289

@@ -301,7 +299,7 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
301299
if item.Type() != kubestate.ResourceDeleted {
302300
key, keyable := loop.key(item)
303301
if !keyable {
304-
logger.WithField("item", item).Warn("could not form key")
302+
logger.WithField("item", item).Warnf("could not form key %s", item)
305303
queue.Forget(item)
306304
return true
307305
}
@@ -311,7 +309,7 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
311309
// Get the current cached version of the resource
312310
var exists bool
313311
var err error
314-
resource, exists, err := loop.indexer.GetByKey(string(key))
312+
resource, exists, err := loop.indexer.GetByKey(key)
315313
if err != nil {
316314
logger.WithError(err).Error("cache get failed")
317315
queue.Forget(item)

0 commit comments

Comments
 (0)