diff --git a/pkg/lib/queueinformer/queueinformer_operator.go b/pkg/lib/queueinformer/queueinformer_operator.go index ecdb4eb896..77795480fc 100644 --- a/pkg/lib/queueinformer/queueinformer_operator.go +++ b/pkg/lib/queueinformer/queueinformer_operator.go @@ -285,6 +285,16 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) logger = logger.WithField("cache-key", key) + if !ok { + // we should really only (sync) resource event items + // otherwise we run the risk of processing the same object concurrently across the number of workers. + // As is, at least, the case for the namespace resolution informer, + // the namespace name gets added to the queue (e.g. nsResolveQueue.Add(obj.GetNamespace())), + // but the QueueInformer's Enqueue method always adds ResourceEvents. + queue.Add(kubestate.NewResourceEvent(kubestate.ResourceUpdated, key)) + return true + } + var resource interface{} if loop.indexer == nil { resource = event.Resource() @@ -305,11 +315,7 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) } } - if !ok { - event = kubestate.NewResourceEvent(kubestate.ResourceUpdated, resource) - } else { - event = kubestate.NewResourceEvent(event.Type(), resource) - } + event = kubestate.NewResourceEvent(event.Type(), resource) } // Sync and requeue on error (throw out failed deletion syncs)