From 36b33573f81858eeabe837042da61cb475ea4df5 Mon Sep 17 00:00:00 2001 From: Per Goncalves da Silva Date: Thu, 19 Dec 2024 14:05:05 +0100 Subject: [PATCH 1/7] queue item fix Signed-off-by: Per Goncalves da Silva --- pkg/controller/operators/catalog/operator.go | 57 ++++++++++------- .../operators/catalog/operator_test.go | 12 ++-- .../operators/catalog/subscription/config.go | 4 +- .../operators/catalogtemplate/operator.go | 7 ++- pkg/controller/operators/olm/operator.go | 61 +++++++++++-------- pkg/controller/operators/olm/operatorgroup.go | 6 +- pkg/lib/kubestate/kubestate.go | 7 +++ pkg/lib/queueinformer/config.go | 10 +-- pkg/lib/queueinformer/queueinformer.go | 2 +- .../queueinformer/queueinformer_operator.go | 15 ++--- pkg/lib/queueinformer/resourcequeue.go | 8 +-- pkg/package-server/server/server.go | 10 +-- 12 files changed, 116 insertions(+), 83 deletions(-) diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index ebdd6313a4..42094728e8 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -11,6 +11,8 @@ import ( "sync" "time" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" + "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/validatingroundtripper" errorwrap "github.com/pkg/errors" @@ -114,7 +116,7 @@ type Operator struct { subQueueSet *queueinformer.ResourceQueueSet ipQueueSet *queueinformer.ResourceQueueSet ogQueueSet *queueinformer.ResourceQueueSet - nsResolveQueue workqueue.TypedRateLimitingInterface[any] + nsResolveQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] namespace string recorder record.EventRecorder sources *grpc.SourceStore @@ -268,8 +270,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Wire InstallPlans ipInformer := crInformerFactory.Operators().V1alpha1().InstallPlans() op.lister.OperatorsV1alpha1().RegisterInstallPlanLister(metav1.NamespaceAll, ipInformer.Lister()) - ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "ips", }) op.ipQueueSet.Set(metav1.NamespaceAll, ipQueue) @@ -290,8 +293,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups() op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister()) - ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "ogs", }) op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue) @@ -312,8 +316,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Wire CatalogSources catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources() op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister()) - catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "catsrcs", }) op.catsrcQueueSet.Set(metav1.NamespaceAll, catsrcQueue) @@ -341,8 +346,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo subIndexer := subInformer.Informer().GetIndexer() op.catalogSubscriberIndexer[metav1.NamespaceAll] = subIndexer - subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "subs", }) op.subQueueSet.Set(metav1.NamespaceAll, subQueue) @@ -415,9 +421,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String(), "index": idx}) logger.Info("registering labeller") - queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{ - Name: gvr.String(), - }) + queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ + Name: gvr.String(), + }, + ) queueInformer, err := queueinformer.NewQueueInformer( ctx, queueinformer.WithQueue(queue), @@ -560,9 +569,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String()}) logger.Info("registering owner reference fixer") - queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{ - Name: gvr.String(), - }) + queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ + Name: gvr.String(), + }, + ) queueInformer, err := queueinformer.NewQueueInformer( ctx, queueinformer.WithQueue(queue), @@ -745,8 +757,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Namespace sync for resolving subscriptions namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces() op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister()) - op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "resolve", }) namespaceQueueInformer, err := queueinformer.NewQueueInformer( @@ -787,12 +800,12 @@ func (o *Operator) syncSourceState(state grpc.SourceState) { if err == nil { for ns := range namespaces { - o.nsResolveQueue.Add(ns) + o.nsResolveQueue.Add(kubestate.NewUpdateEvent(ns)) } } } - o.nsResolveQueue.Add(state.Key.Namespace) + o.nsResolveQueue.Add(kubestate.NewUpdateEvent(state.Key.Namespace)) } if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil { o.logger.WithError(err).Info("couldn't requeue catalogsource from catalog status change") @@ -1411,7 +1424,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { } logger.Info("unpacking is not complete yet, requeueing") - o.nsResolveQueue.AddAfter(namespace, 5*time.Second) + o.nsResolveQueue.AddAfter(kubestate.NewUpdateEvent(namespace), 5*time.Second) return nil } } @@ -1506,7 +1519,7 @@ func (o *Operator) syncSubscriptions(obj interface{}) error { return fmt.Errorf("casting Subscription failed") } - o.nsResolveQueue.Add(sub.GetNamespace()) + o.nsResolveQueue.Add(kubestate.NewUpdateEvent(sub.GetNamespace())) return nil } @@ -1520,7 +1533,7 @@ func (o *Operator) syncOperatorGroups(obj interface{}) error { return fmt.Errorf("casting OperatorGroup failed") } - o.nsResolveQueue.Add(og.GetNamespace()) + o.nsResolveQueue.Add(kubestate.NewUpdateEvent(og.GetNamespace())) return nil } diff --git a/pkg/controller/operators/catalog/operator_test.go b/pkg/controller/operators/catalog/operator_test.go index c251cfc77b..79575e02ae 100644 --- a/pkg/controller/operators/catalog/operator_test.go +++ b/pkg/controller/operators/catalog/operator_test.go @@ -13,6 +13,8 @@ import ( "testing/quick" "time" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" + "k8s.io/utils/ptr" controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client" @@ -2156,13 +2158,13 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string, client: clientFake, lister: lister, namespace: namespace, - nsResolveQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.NewTypedMaxOfRateLimiter[any]( - workqueue.NewTypedItemExponentialFailureRateLimiter[any](1*time.Second, 1000*time.Second), + nsResolveQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.NewTypedMaxOfRateLimiter[kubestate.ResourceEvent]( + workqueue.NewTypedItemExponentialFailureRateLimiter[kubestate.ResourceEvent](1*time.Second, 1000*time.Second), // 1 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) - &workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(1), 100)}, + &workqueue.TypedBucketRateLimiter[kubestate.ResourceEvent]{Limiter: rate.NewLimiter(rate.Limit(1), 100)}, ), - workqueue.TypedRateLimitingQueueConfig[any]{ + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "resolver", }), resolver: config.resolver, diff --git a/pkg/controller/operators/catalog/subscription/config.go b/pkg/controller/operators/catalog/subscription/config.go index c4c1877b64..1e16395154 100644 --- a/pkg/controller/operators/catalog/subscription/config.go +++ b/pkg/controller/operators/catalog/subscription/config.go @@ -23,7 +23,7 @@ type syncerConfig struct { subscriptionInformer cache.SharedIndexInformer catalogInformer cache.SharedIndexInformer installPlanInformer cache.SharedIndexInformer - subscriptionQueue workqueue.TypedRateLimitingInterface[any] + subscriptionQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] reconcilers kubestate.ReconcilerChain registryReconcilerFactory reconciler.RegistryReconcilerFactory globalCatalogNamespace string @@ -97,7 +97,7 @@ func WithOperatorLister(lister operatorlister.OperatorLister) SyncerOption { } // WithSubscriptionQueue sets a syncer's subscription queue. -func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[any]) SyncerOption { +func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) SyncerOption { return func(config *syncerConfig) { config.subscriptionQueue = subscriptionQueue } diff --git a/pkg/controller/operators/catalogtemplate/operator.go b/pkg/controller/operators/catalogtemplate/operator.go index ea10454506..9c4ba85b74 100644 --- a/pkg/controller/operators/catalogtemplate/operator.go +++ b/pkg/controller/operators/catalogtemplate/operator.go @@ -6,6 +6,8 @@ import ( "strings" "time" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" + "github.com/distribution/reference" "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/sirupsen/logrus" @@ -101,8 +103,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, logger *logrus.Logg // Wire CatalogSources catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources() op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister()) - catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "catalogSourceTemplate", }) op.catalogSourceTemplateQueueSet.Set(metav1.NamespaceAll, catalogTemplateSrcQueue) diff --git a/pkg/controller/operators/olm/operator.go b/pkg/controller/operators/olm/operator.go index e6dfaaf1d0..721a23e055 100644 --- a/pkg/controller/operators/olm/operator.go +++ b/pkg/controller/operators/olm/operator.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" + "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/olm/plugins" "github.com/sirupsen/logrus" @@ -83,11 +85,11 @@ type Operator struct { copiedCSVLister metadatalister.Lister ogQueueSet *queueinformer.ResourceQueueSet csvQueueSet *queueinformer.ResourceQueueSet - olmConfigQueue workqueue.TypedRateLimitingInterface[any] + olmConfigQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] csvCopyQueueSet *queueinformer.ResourceQueueSet copiedCSVGCQueueSet *queueinformer.ResourceQueueSet - nsQueueSet workqueue.TypedRateLimitingInterface[any] - apiServiceQueue workqueue.TypedRateLimitingInterface[any] + nsQueueSet workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] + apiServiceQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] csvIndexers map[string]cache.Indexer recorder record.EventRecorder resolver install.StrategyResolverInterface @@ -198,17 +200,17 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat client: config.externalClient, ogQueueSet: queueinformer.NewEmptyResourceQueueSet(), csvQueueSet: queueinformer.NewEmptyResourceQueueSet(), - olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "olmConfig", }), csvCopyQueueSet: queueinformer.NewEmptyResourceQueueSet(), copiedCSVGCQueueSet: queueinformer.NewEmptyResourceQueueSet(), - apiServiceQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + apiServiceQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "apiservice", }), resolver: config.strategyResolver, @@ -246,9 +248,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat ).Operators().V1alpha1().ClusterServiceVersions() informersByNamespace[namespace].CSVInformer = csvInformer op.lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, csvInformer.Lister()) - csvQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + csvQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: fmt.Sprintf("%s/csv", namespace), }) op.csvQueueSet.Set(namespace, csvQueue) @@ -273,7 +275,11 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat op.csvIndexers[namespace] = csvIndexer // Register separate queue for copying csvs - csvCopyQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), fmt.Sprintf("%s/csv-copy", namespace)) + csvCopyQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ + Name: fmt.Sprintf("%s/csv-copy", namespace), + }) op.csvCopyQueueSet.Set(namespace, csvCopyQueue) csvCopyQueueInformer, err := queueinformer.NewQueueInformer( ctx, @@ -307,9 +313,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat informersByNamespace[namespace].CopiedCSVLister = op.copiedCSVLister // Register separate queue for gcing copied csvs - copiedCSVGCQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + copiedCSVGCQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: fmt.Sprintf("%s/csv-gc", namespace), }) op.copiedCSVGCQueueSet.Set(namespace, copiedCSVGCQueue) @@ -333,9 +339,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat operatorGroupInformer := extInformerFactory.Operators().V1().OperatorGroups() informersByNamespace[namespace].OperatorGroupInformer = operatorGroupInformer op.lister.OperatorsV1().RegisterOperatorGroupLister(namespace, operatorGroupInformer.Lister()) - ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: fmt.Sprintf("%s/og", namespace), }) op.ogQueueSet.Set(namespace, ogQueue) @@ -522,9 +528,12 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String(), "index": idx}) logger.Info("registering labeller") - queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{ - Name: gvr.String(), - }) + queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ + Name: gvr.String(), + }, + ) queueInformer, err := queueinformer.NewQueueInformer( ctx, queueinformer.WithQueue(queue), @@ -696,9 +705,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), config.resyncPeriod()).Core().V1().Namespaces() informersByNamespace[metav1.NamespaceAll].NamespaceInformer = namespaceInformer op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister()) - op.nsQueueSet = workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + op.nsQueueSet = workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "resolver", }) namespaceInformer.Informer().AddEventHandler( @@ -1665,7 +1674,7 @@ func (a *Operator) syncCopyCSV(obj interface{}) (syncError error) { } if err == nil { - go a.olmConfigQueue.AddAfter(olmConfig, time.Second*5) + go a.olmConfigQueue.AddAfter(kubestate.NewUpdateEvent(olmConfig), time.Second*5) } logger := a.logger.WithFields(logrus.Fields{ diff --git a/pkg/controller/operators/olm/operatorgroup.go b/pkg/controller/operators/olm/operatorgroup.go index 18c8b19008..cb382cd9b3 100644 --- a/pkg/controller/operators/olm/operatorgroup.go +++ b/pkg/controller/operators/olm/operatorgroup.go @@ -8,6 +8,8 @@ import ( "reflect" "strings" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" + "k8s.io/apimachinery/pkg/api/equality" "github.com/sirupsen/logrus" @@ -182,7 +184,7 @@ func (a *Operator) syncOperatorGroups(obj interface{}) error { logger.Debug("Requeueing out of sync namespaces") for _, ns := range outOfSyncNamespaces { logger.WithField("namespace", ns).Debug("requeueing") - a.nsQueueSet.Add(ns) + a.nsQueueSet.Add(kubestate.NewUpdateEvent(ns)) } // CSV requeue is handled by the succeeding sync in `annotateCSVs` @@ -263,7 +265,7 @@ func (a *Operator) operatorGroupDeleted(obj interface{}) { logger.Debug("OperatorGroup deleted, requeueing out of sync namespaces") for _, ns := range op.Status.Namespaces { logger.WithField("namespace", ns).Debug("requeueing") - a.nsQueueSet.Add(ns) + a.nsQueueSet.Add(kubestate.NewUpdateEvent(ns)) } } diff --git a/pkg/lib/kubestate/kubestate.go b/pkg/lib/kubestate/kubestate.go index 3f656069de..b249cf26ff 100644 --- a/pkg/lib/kubestate/kubestate.go +++ b/pkg/lib/kubestate/kubestate.go @@ -163,6 +163,13 @@ func (r resourceEvent) Resource() interface{} { return r.resource } +func NewUpdateEvent(resource interface{}) ResourceEvent { + return resourceEvent{ + eventType: ResourceUpdated, + resource: resource, + } +} + func NewResourceEvent(eventType ResourceEventType, resource interface{}) ResourceEvent { return resourceEvent{ eventType: eventType, diff --git a/pkg/lib/queueinformer/config.go b/pkg/lib/queueinformer/config.go index bd69d2403b..3842aa9db4 100644 --- a/pkg/lib/queueinformer/config.go +++ b/pkg/lib/queueinformer/config.go @@ -14,7 +14,7 @@ import ( type queueInformerConfig struct { provider metrics.MetricsProvider logger *logrus.Logger - queue workqueue.RateLimitingInterface + queue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] informer cache.SharedIndexInformer indexer cache.Indexer keyFunc KeyFunc @@ -105,9 +105,9 @@ func defaultKeyFunc(obj interface{}) (string, bool) { func defaultConfig() *queueInformerConfig { return &queueInformerConfig{ provider: metrics.NewMetricsNil(), - queue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + queue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "default", }), logger: logrus.New(), @@ -130,7 +130,7 @@ func WithLogger(logger *logrus.Logger) Option { } // WithQueue sets the queue used by a QueueInformer. -func WithQueue(queue workqueue.RateLimitingInterface) Option { +func WithQueue(queue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) Option { return func(config *queueInformerConfig) { config.queue = queue } diff --git a/pkg/lib/queueinformer/queueinformer.go b/pkg/lib/queueinformer/queueinformer.go index 02a66cb527..6187a8da5b 100644 --- a/pkg/lib/queueinformer/queueinformer.go +++ b/pkg/lib/queueinformer/queueinformer.go @@ -23,7 +23,7 @@ type QueueInformer struct { metrics.MetricsProvider logger *logrus.Logger - queue workqueue.RateLimitingInterface + queue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] informer cache.SharedIndexInformer indexer cache.Indexer keyFunc KeyFunc diff --git a/pkg/lib/queueinformer/queueinformer_operator.go b/pkg/lib/queueinformer/queueinformer_operator.go index ecdb4eb896..3cd4a143b5 100644 --- a/pkg/lib/queueinformer/queueinformer_operator.go +++ b/pkg/lib/queueinformer/queueinformer_operator.go @@ -273,8 +273,8 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) logger := o.logger.WithField("item", item) logger.WithField("queue-length", queue.Len()).Trace("popped queue") - event, ok := item.(kubestate.ResourceEvent) - if !ok || event.Type() != kubestate.ResourceDeleted { + var event = item + if item.Type() != kubestate.ResourceDeleted { // Get the key key, keyable := loop.key(item) if !keyable { @@ -287,7 +287,7 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) var resource interface{} if loop.indexer == nil { - resource = event.Resource() + resource = item.Resource() } else { // Get the current cached version of the resource var exists bool @@ -304,17 +304,12 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) return true } } - - if !ok { - event = kubestate.NewResourceEvent(kubestate.ResourceUpdated, resource) - } else { - event = kubestate.NewResourceEvent(event.Type(), resource) - } + event = kubestate.NewResourceEvent(item.Type(), resource) } // Sync and requeue on error (throw out failed deletion syncs) err := loop.Sync(ctx, event) - if requeues := queue.NumRequeues(item); err != nil && requeues < 8 && event.Type() != kubestate.ResourceDeleted { + if requeues := queue.NumRequeues(item); err != nil && requeues < 8 && item.Type() != kubestate.ResourceDeleted { logger.WithField("requeues", requeues).Trace("requeuing with rate limiting") utilruntime.HandleError(errors.Wrap(err, fmt.Sprintf("sync %q failed", item))) queue.AddRateLimited(item) diff --git a/pkg/lib/queueinformer/resourcequeue.go b/pkg/lib/queueinformer/resourcequeue.go index 0e4da56cde..6881b9784b 100644 --- a/pkg/lib/queueinformer/resourcequeue.go +++ b/pkg/lib/queueinformer/resourcequeue.go @@ -14,22 +14,22 @@ import ( // ResourceQueueSet is a set of workqueues that is assumed to be keyed by namespace type ResourceQueueSet struct { - queueSet map[string]workqueue.RateLimitingInterface + queueSet map[string]workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] mutex sync.RWMutex } // NewResourceQueueSet returns a new queue set with the given queue map -func NewResourceQueueSet(queueSet map[string]workqueue.RateLimitingInterface) *ResourceQueueSet { +func NewResourceQueueSet(queueSet map[string]workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) *ResourceQueueSet { return &ResourceQueueSet{queueSet: queueSet} } // NewEmptyResourceQueueSet returns a new queue set with an empty but initialized queue map func NewEmptyResourceQueueSet() *ResourceQueueSet { - return &ResourceQueueSet{queueSet: make(map[string]workqueue.RateLimitingInterface)} + return &ResourceQueueSet{queueSet: make(map[string]workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent])} } // Set sets the queue at the given key -func (r *ResourceQueueSet) Set(key string, queue workqueue.RateLimitingInterface) { +func (r *ResourceQueueSet) Set(key string, queue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) { r.mutex.Lock() defer r.mutex.Unlock() r.queueSet[key] = queue diff --git a/pkg/package-server/server/server.go b/pkg/package-server/server/server.go index 85ad4931dd..030bf001f6 100644 --- a/pkg/package-server/server/server.go +++ b/pkg/package-server/server/server.go @@ -8,6 +8,8 @@ import ( "os" "time" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -35,7 +37,7 @@ const DefaultWakeupInterval = 12 * time.Hour type Operator struct { queueinformer.Operator - olmConfigQueue workqueue.TypedRateLimitingInterface[any] + olmConfigQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] options *PackageServerOptions } @@ -239,9 +241,9 @@ func (o *PackageServerOptions) Run(ctx context.Context) error { op := &Operator{ Operator: queueOperator, - olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "olmConfig", }), options: o, From f8c273cf4132e65ab34b1e12359bcdd3d1aa9d34 Mon Sep 17 00:00:00 2001 From: Per Goncalves da Silva Date: Mon, 6 Jan 2025 14:51:13 +0100 Subject: [PATCH 2/7] small fixes Signed-off-by: Per Goncalves da Silva --- pkg/controller/operators/catalog/operator.go | 18 +++++------ pkg/lib/queueinformer/queueinformer.go | 33 +++++--------------- 2 files changed, 15 insertions(+), 36 deletions(-) diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 42094728e8..1863e2bf98 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -886,18 +886,16 @@ func (o *Operator) handleDeletion(obj interface{}) { func (o *Operator) handleCatSrcDeletion(obj interface{}) { catsrc, ok := obj.(metav1.Object) if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) - return - } + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) + return + } - catsrc, ok = tombstone.Obj.(metav1.Object) - if !ok { - utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Namespace %#v", obj)) - return - } + catsrc, ok = tombstone.Obj.(metav1.Object) + if !ok { + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Namespace %#v", obj)) + return } } sourceKey := registry.CatalogKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()} diff --git a/pkg/lib/queueinformer/queueinformer.go b/pkg/lib/queueinformer/queueinformer.go index 6187a8da5b..39295f591a 100644 --- a/pkg/lib/queueinformer/queueinformer.go +++ b/pkg/lib/queueinformer/queueinformer.go @@ -44,12 +44,12 @@ func (q *QueueInformer) Enqueue(event kubestate.ResourceEvent) { } resource := event.Resource() - if event.Type() == kubestate.ResourceDeleted { - // Get object from tombstone if possible - if tombstone, ok := resource.(cache.DeletedFinalStateUnknown); ok { - resource = tombstone - } - } else { + + // Delete operations should always carry either something assignable to + // metev1.Object or cache.DeletedFinalStateUnknown. + // Add/Update events coming from the informer should have their resource + // converted to a key (string) before being enqueued. + if event.Type() != kubestate.ResourceDeleted { // Extract key for add and update events if key, ok := q.key(resource); ok { resource = key @@ -69,7 +69,7 @@ func (q *QueueInformer) key(obj interface{}) (string, bool) { // resourceHandlers provides the default implementation for responding to events // these simply Log the event and add the object's key to the queue for later processing. -func (q *QueueInformer) resourceHandlers(ctx context.Context) *cache.ResourceEventHandlerFuncs { +func (q *QueueInformer) resourceHandlers(_ context.Context) *cache.ResourceEventHandlerFuncs { return &cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { q.Enqueue(kubestate.NewResourceEvent(kubestate.ResourceUpdated, obj)) @@ -104,25 +104,6 @@ func (q *QueueInformer) metricHandlers() *cache.ResourceEventHandlerFuncs { } } -func NewQueue(ctx context.Context, options ...Option) (*QueueInformer, error) { - config := defaultConfig() - config.apply(options) - - if err := config.validateQueue(); err != nil { - return nil, err - } - - queue := &QueueInformer{ - MetricsProvider: config.provider, - logger: config.logger, - queue: config.queue, - keyFunc: config.keyFunc, - syncer: config.syncer, - } - - return queue, nil -} - // NewQueueInformer returns a new QueueInformer configured with options. func NewQueueInformer(ctx context.Context, options ...Option) (*QueueInformer, error) { // Get default config and apply given options From e016e734857ac8da4a3f10950509ad106a6120fe Mon Sep 17 00:00:00 2001 From: Per Goncalves da Silva Date: Mon, 6 Jan 2025 16:33:56 +0100 Subject: [PATCH 3/7] more small fixes Signed-off-by: Per Goncalves da Silva --- pkg/lib/queueinformer/queueinformer.go | 13 +++++--- .../queueinformer/queueinformer_operator.go | 33 ++++++++----------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/pkg/lib/queueinformer/queueinformer.go b/pkg/lib/queueinformer/queueinformer.go index 39295f591a..fb7a1a30ef 100644 --- a/pkg/lib/queueinformer/queueinformer.go +++ b/pkg/lib/queueinformer/queueinformer.go @@ -2,6 +2,7 @@ package queueinformer import ( "context" + "fmt" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -43,7 +44,7 @@ func (q *QueueInformer) Enqueue(event kubestate.ResourceEvent) { return } - resource := event.Resource() + e := event // Delete operations should always carry either something assignable to // metev1.Object or cache.DeletedFinalStateUnknown. @@ -51,13 +52,17 @@ func (q *QueueInformer) Enqueue(event kubestate.ResourceEvent) { // converted to a key (string) before being enqueued. if event.Type() != kubestate.ResourceDeleted { // Extract key for add and update events - if key, ok := q.key(resource); ok { - resource = key + if key, ok := q.key(e.Resource()); ok { + e = kubestate.NewResourceEvent(event.Type(), key) + } else { + // if the resource cannot be keyed the worker will not be able to process it + // since it will not be able to retrieve the resource + q.logger.WithField("event", e).Warn(fmt.Sprintf("resource of type %T is not keyable - skipping enqueue", e.Resource())) + return } } // Create new resource event and add to queue - e := kubestate.NewResourceEvent(event.Type(), resource) q.logger.WithField("event", e).Trace("enqueuing resource event") q.queue.Add(e) } diff --git a/pkg/lib/queueinformer/queueinformer_operator.go b/pkg/lib/queueinformer/queueinformer_operator.go index 3cd4a143b5..907b24a26c 100644 --- a/pkg/lib/queueinformer/queueinformer_operator.go +++ b/pkg/lib/queueinformer/queueinformer_operator.go @@ -284,25 +284,20 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) } logger = logger.WithField("cache-key", key) - - var resource interface{} - if loop.indexer == nil { - resource = item.Resource() - } else { - // Get the current cached version of the resource - var exists bool - var err error - resource, exists, err = loop.indexer.GetByKey(key) - if err != nil { - logger.WithError(err).Error("cache get failed") - queue.Forget(item) - return true - } - if !exists { - logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache") - queue.Forget(item) - return true - } + + // Get the current cached version of the resource + var exists bool + var err error + resource, exists, err := loop.indexer.GetByKey(key) + if err != nil { + logger.WithError(err).Error("cache get failed") + queue.Forget(item) + return true + } + if !exists { + logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache") + queue.Forget(item) + return true } event = kubestate.NewResourceEvent(item.Type(), resource) } From 31a325097fa807e275e73d06e0377b9529f4027f Mon Sep 17 00:00:00 2001 From: Per Goncalves da Silva Date: Mon, 6 Jan 2025 17:32:04 +0100 Subject: [PATCH 4/7] next little change Signed-off-by: Per Goncalves da Silva --- pkg/lib/queueinformer/queueinformer_operator.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/pkg/lib/queueinformer/queueinformer_operator.go b/pkg/lib/queueinformer/queueinformer_operator.go index 907b24a26c..c9a76c1cce 100644 --- a/pkg/lib/queueinformer/queueinformer_operator.go +++ b/pkg/lib/queueinformer/queueinformer_operator.go @@ -276,15 +276,19 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) var event = item if item.Type() != kubestate.ResourceDeleted { // Get the key - key, keyable := loop.key(item) - if !keyable { - logger.WithField("item", item).Warn("could not form key") - queue.Forget(item) - return true + //key, keyable := loop.key(item) + //if !keyable { + // logger.WithField("item", item).Warn("could not form key") + // queue.Forget(item) + // return true + //} + key, ok := item.Resource().(string) + if !ok { + panic(fmt.Sprintf("unexpected item resource type: %T", item.Resource())) } logger = logger.WithField("cache-key", key) - + // Get the current cached version of the resource var exists bool var err error From 8db7f9f25c583ef6cf48f79830ecbc088de2d072 Mon Sep 17 00:00:00 2001 From: Per Goncalves da Silva Date: Tue, 7 Jan 2025 12:13:09 +0100 Subject: [PATCH 5/7] next little fix Signed-off-by: Per Goncalves da Silva --- pkg/controller/operators/catalog/subscription/syncer.go | 2 +- pkg/controller/operators/olm/operator.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/controller/operators/catalog/subscription/syncer.go b/pkg/controller/operators/catalog/subscription/syncer.go index 04e8edeb5a..5101a2d404 100644 --- a/pkg/controller/operators/catalog/subscription/syncer.go +++ b/pkg/controller/operators/catalog/subscription/syncer.go @@ -179,7 +179,7 @@ func (s *subscriptionSyncer) notifyOnInstallPlan(ctx context.Context, obj interf for _, owner := range owners { subKey := fmt.Sprintf("%s/%s", plan.GetNamespace(), owner.Name) logger.Tracef("notifying subscription %s", subKey) - s.Notify(kubestate.NewResourceEvent(kubestate.ResourceUpdated, cache.ExplicitKey(subKey))) + s.Notify(kubestate.NewResourceEvent(kubestate.ResourceUpdated, subKey)) } } diff --git a/pkg/controller/operators/olm/operator.go b/pkg/controller/operators/olm/operator.go index 721a23e055..87d120afe2 100644 --- a/pkg/controller/operators/olm/operator.go +++ b/pkg/controller/operators/olm/operator.go @@ -1674,7 +1674,7 @@ func (a *Operator) syncCopyCSV(obj interface{}) (syncError error) { } if err == nil { - go a.olmConfigQueue.AddAfter(kubestate.NewUpdateEvent(olmConfig), time.Second*5) + go a.olmConfigQueue.AddAfter(kubestate.NewUpdateEvent(olmConfig.GetName()), time.Second*5) } logger := a.logger.WithFields(logrus.Fields{ From 87ba582ea5cc36dee24f03bb26541beb747bacd5 Mon Sep 17 00:00:00 2001 From: Per Goncalves da Silva Date: Tue, 7 Jan 2025 15:50:35 +0100 Subject: [PATCH 6/7] enforce ResourceEvent types Signed-off-by: Per Goncalves da Silva --- pkg/lib/kubestate/kubestate.go | 31 +++++++++++-- pkg/lib/queueinformer/config.go | 13 ------ pkg/lib/queueinformer/main/main.go | 12 ++++++ .../queueinformer/queueinformer_operator.go | 43 +++++++++++++------ 4 files changed, 70 insertions(+), 29 deletions(-) create mode 100644 pkg/lib/queueinformer/main/main.go diff --git a/pkg/lib/kubestate/kubestate.go b/pkg/lib/kubestate/kubestate.go index b249cf26ff..3bc40f6fc0 100644 --- a/pkg/lib/kubestate/kubestate.go +++ b/pkg/lib/kubestate/kubestate.go @@ -2,6 +2,9 @@ package kubestate import ( "context" + "fmt" + "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/client" ) type State interface { @@ -148,6 +151,7 @@ const ( type ResourceEvent interface { Type() ResourceEventType Resource() interface{} + String() string } type resourceEvent struct { @@ -163,14 +167,33 @@ func (r resourceEvent) Resource() interface{} { return r.resource } -func NewUpdateEvent(resource interface{}) ResourceEvent { - return resourceEvent{ - eventType: ResourceUpdated, - resource: resource, +func (r resourceEvent) String() string { + key, err := cache.MetaNamespaceKeyFunc(r.resource) + + // should not happen as resources must be either cache.ExplicitKey + // or client.Object + if err != nil { + panic("could not get resource key: " + err.Error()) } + return key +} + +func NewUpdateEvent(resource interface{}) ResourceEvent { + return NewResourceEvent(ResourceUpdated, resource) } func NewResourceEvent(eventType ResourceEventType, resource interface{}) ResourceEvent { + // assert resource type + // only accept cache.ExplicitKey or client.Objects + switch r := resource.(type) { + case string: + resource = cache.ExplicitKey(r) + case cache.ExplicitKey: + case client.Object: + default: + panic(fmt.Sprintf("NewResourceEvent called with invalid resource type: %T", resource)) + } + return resourceEvent{ eventType: eventType, resource: resource, diff --git a/pkg/lib/queueinformer/config.go b/pkg/lib/queueinformer/config.go index 3842aa9db4..db809adba2 100644 --- a/pkg/lib/queueinformer/config.go +++ b/pkg/lib/queueinformer/config.go @@ -81,19 +81,6 @@ func (c *queueInformerConfig) validateQueue() (err error) { } func defaultKeyFunc(obj interface{}) (string, bool) { - // Get keys nested in resource events up to depth 2 - keyable := false - for d := 0; d < 2 && !keyable; d++ { - switch v := obj.(type) { - case string: - return v, true - case kubestate.ResourceEvent: - obj = v.Resource() - default: - keyable = true - } - } - k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { return k, false diff --git a/pkg/lib/queueinformer/main/main.go b/pkg/lib/queueinformer/main/main.go new file mode 100644 index 0000000000..883347e5a1 --- /dev/null +++ b/pkg/lib/queueinformer/main/main.go @@ -0,0 +1,12 @@ +package main + +import ( + "fmt" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" + "k8s.io/client-go/tools/cache" +) + +func main() { + k, ok := cache.MetaNamespaceKeyFunc(kubestate.NewUpdateEvent("bob")) + fmt.Printf("key: %s (%t)\n", k, ok) +} diff --git a/pkg/lib/queueinformer/queueinformer_operator.go b/pkg/lib/queueinformer/queueinformer_operator.go index c9a76c1cce..7d1faf74ad 100644 --- a/pkg/lib/queueinformer/queueinformer_operator.go +++ b/pkg/lib/queueinformer/queueinformer_operator.go @@ -262,6 +262,30 @@ func (o *operator) worker(ctx context.Context, loop *QueueInformer) { } func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) bool { + // **************************** WARNING **************************** + // The QueueInformer listens to resource events raised by its + // (client-go) informer. For Add/Update event, it extracts the key + // for the resource and adds to its queue (that we Get() from below) + // a ResourceEvent carrying the key. + // **Except** if it is a deletion event. In that case, + // ResourceEvent carries the resource object (or tombstone). + // The sync'er expects a ResourceEvent carrying the resource. + // So, in the case of an Add/Update event coming from the queue, + // the resource is acquired from the index (through the key), and then + // a ResourceEvent carrying the resource is handed to the syncer. + // It should also be noted that throughout the code, items are added to + // queueinformers out of band of informer notifications. + // The fact that the queueinformers queue processes ResourceEvents, which + // themselves encapsulate an interface{} "Resource" make it tricky for the + // queue to dedup. Previous to the writing of this comment, the queue was + // processing strings and ResourceEvents, which led to concurrent processing + // of the same resource. To address this, we enforce (with panic) that the resource + // in the ResourceEvent must either be a cache.ExplicitKey or a client.Object. + // We then make sure that the ResourceEvent's String() returns the key for the + // encapsulated resource. Thus, independent of the resource type, the queue always + // processes it by key and dedups appropriately. + // Furthermore, we also enforce here that Add/Update events always contain + // cache.ExplicitKey as their Resource queue := loop.queue item, quit := queue.Get() @@ -271,20 +295,15 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) defer queue.Done(item) logger := o.logger.WithField("item", item) - logger.WithField("queue-length", queue.Len()).Trace("popped queue") + logger.WithField("queue-length", queue.Len()).Info("popped queue") var event = item if item.Type() != kubestate.ResourceDeleted { - // Get the key - //key, keyable := loop.key(item) - //if !keyable { - // logger.WithField("item", item).Warn("could not form key") - // queue.Forget(item) - // return true - //} - key, ok := item.Resource().(string) - if !ok { - panic(fmt.Sprintf("unexpected item resource type: %T", item.Resource())) + key, keyable := loop.key(item) + if !keyable { + logger.WithField("item", item).Warn("could not form key") + queue.Forget(item) + return true } logger = logger.WithField("cache-key", key) @@ -292,7 +311,7 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) // Get the current cached version of the resource var exists bool var err error - resource, exists, err := loop.indexer.GetByKey(key) + resource, exists, err := loop.indexer.GetByKey(string(key)) if err != nil { logger.WithError(err).Error("cache get failed") queue.Forget(item) From a683bfefc346cf2f2487c03eb52b67548859bbe3 Mon Sep 17 00:00:00 2001 From: Per Goncalves da Silva Date: Wed, 8 Jan 2025 10:50:55 +0100 Subject: [PATCH 7/7] try again Signed-off-by: Per Goncalves da Silva --- pkg/lib/kubestate/kubestate.go | 16 +++++++++---- pkg/lib/queueinformer/config.go | 5 ++++ pkg/lib/queueinformer/main/main.go | 12 ---------- pkg/lib/queueinformer/queueinformer.go | 24 +++++++++---------- .../queueinformer/queueinformer_operator.go | 6 ++--- 5 files changed, 29 insertions(+), 34 deletions(-) delete mode 100644 pkg/lib/queueinformer/main/main.go diff --git a/pkg/lib/kubestate/kubestate.go b/pkg/lib/kubestate/kubestate.go index 3bc40f6fc0..4d9af8e77c 100644 --- a/pkg/lib/kubestate/kubestate.go +++ b/pkg/lib/kubestate/kubestate.go @@ -168,20 +168,25 @@ func (r resourceEvent) Resource() interface{} { } func (r resourceEvent) String() string { - key, err := cache.MetaNamespaceKeyFunc(r.resource) - - // should not happen as resources must be either cache.ExplicitKey - // or client.Object + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(r.resource) + // should not happen as resources must be either cache.ExplicitKey or client.Object + // and this should be enforced in NewResourceEvent if err != nil { panic("could not get resource key: " + err.Error()) } - return key + return fmt.Sprintf("{%s %s}", string(r.eventType), key) } func NewUpdateEvent(resource interface{}) ResourceEvent { return NewResourceEvent(ResourceUpdated, resource) } +// NewResourceEvent creates a new resource event. The resource parameter must either be +// a client.Object, a string, a cache.DeletedFinalStateUnknown, or a cache.ExplicitKey. In case it is a string, it will be +// coerced to cache.ExplicitKey. This ensures that whether a reference (string/cache.ExplicitKey) +// or a resource, workqueue will treat the items in the same way and dedup appropriately. +// This behavior is guaranteed by the String() method, which will also ignore the type of event. +// I.e. Add/Update/Delete events for the same resource object or reference will be ded func NewResourceEvent(eventType ResourceEventType, resource interface{}) ResourceEvent { // assert resource type // only accept cache.ExplicitKey or client.Objects @@ -190,6 +195,7 @@ func NewResourceEvent(eventType ResourceEventType, resource interface{}) Resourc resource = cache.ExplicitKey(r) case cache.ExplicitKey: case client.Object: + case cache.DeletedFinalStateUnknown: default: panic(fmt.Sprintf("NewResourceEvent called with invalid resource type: %T", resource)) } diff --git a/pkg/lib/queueinformer/config.go b/pkg/lib/queueinformer/config.go index db809adba2..a94a9f2e70 100644 --- a/pkg/lib/queueinformer/config.go +++ b/pkg/lib/queueinformer/config.go @@ -1,6 +1,7 @@ package queueinformer import ( + "fmt" "github.com/pkg/errors" "github.com/sirupsen/logrus" "k8s.io/client-go/discovery" @@ -81,8 +82,12 @@ func (c *queueInformerConfig) validateQueue() (err error) { } func defaultKeyFunc(obj interface{}) (string, bool) { + if re, ok := obj.(kubestate.ResourceEvent); ok { + obj = re.Resource() + } k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { + fmt.Printf("error getting key for object %v: %v", obj, err) return k, false } diff --git a/pkg/lib/queueinformer/main/main.go b/pkg/lib/queueinformer/main/main.go deleted file mode 100644 index 883347e5a1..0000000000 --- a/pkg/lib/queueinformer/main/main.go +++ /dev/null @@ -1,12 +0,0 @@ -package main - -import ( - "fmt" - "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" - "k8s.io/client-go/tools/cache" -) - -func main() { - k, ok := cache.MetaNamespaceKeyFunc(kubestate.NewUpdateEvent("bob")) - fmt.Printf("key: %s (%t)\n", k, ok) -} diff --git a/pkg/lib/queueinformer/queueinformer.go b/pkg/lib/queueinformer/queueinformer.go index fb7a1a30ef..ff7a7a6aa7 100644 --- a/pkg/lib/queueinformer/queueinformer.go +++ b/pkg/lib/queueinformer/queueinformer.go @@ -2,8 +2,6 @@ package queueinformer import ( "context" - "fmt" - "github.com/pkg/errors" "github.com/sirupsen/logrus" "k8s.io/client-go/tools/cache" @@ -50,17 +48,17 @@ func (q *QueueInformer) Enqueue(event kubestate.ResourceEvent) { // metev1.Object or cache.DeletedFinalStateUnknown. // Add/Update events coming from the informer should have their resource // converted to a key (string) before being enqueued. - if event.Type() != kubestate.ResourceDeleted { - // Extract key for add and update events - if key, ok := q.key(e.Resource()); ok { - e = kubestate.NewResourceEvent(event.Type(), key) - } else { - // if the resource cannot be keyed the worker will not be able to process it - // since it will not be able to retrieve the resource - q.logger.WithField("event", e).Warn(fmt.Sprintf("resource of type %T is not keyable - skipping enqueue", e.Resource())) - return - } - } + //if event.Type() != kubestate.ResourceDeleted { + // // Extract key for add and update events + // if key, ok := q.key(e.Resource()); ok { + // e = kubestate.NewResourceEvent(event.Type(), cache.ExplicitKey(key)) + // } else { + // // if the resource cannot be keyed the worker will not be able to process it + // // since it will not be able to retrieve the resource + // q.logger.WithField("event", e).Warn(fmt.Sprintf("resource of type %T is not keyable - skipping enqueue", e.Resource())) + // return + // } + //} // Create new resource event and add to queue q.logger.WithField("event", e).Trace("enqueuing resource event") diff --git a/pkg/lib/queueinformer/queueinformer_operator.go b/pkg/lib/queueinformer/queueinformer_operator.go index 7d1faf74ad..e4589be1a5 100644 --- a/pkg/lib/queueinformer/queueinformer_operator.go +++ b/pkg/lib/queueinformer/queueinformer_operator.go @@ -284,8 +284,6 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) // We then make sure that the ResourceEvent's String() returns the key for the // encapsulated resource. Thus, independent of the resource type, the queue always // processes it by key and dedups appropriately. - // Furthermore, we also enforce here that Add/Update events always contain - // cache.ExplicitKey as their Resource queue := loop.queue item, quit := queue.Get() @@ -301,7 +299,7 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) if item.Type() != kubestate.ResourceDeleted { key, keyable := loop.key(item) if !keyable { - logger.WithField("item", item).Warn("could not form key") + logger.WithField("item", item).Warnf("could not form key %s", item) queue.Forget(item) return true } @@ -311,7 +309,7 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) // Get the current cached version of the resource var exists bool var err error - resource, exists, err := loop.indexer.GetByKey(string(key)) + resource, exists, err := loop.indexer.GetByKey(key) if err != nil { logger.WithError(err).Error("cache get failed") queue.Forget(item)