diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index ebdd6313a4..1863e2bf98 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -11,6 +11,8 @@ import ( "sync" "time" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" + "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/validatingroundtripper" errorwrap "github.com/pkg/errors" @@ -114,7 +116,7 @@ type Operator struct { subQueueSet *queueinformer.ResourceQueueSet ipQueueSet *queueinformer.ResourceQueueSet ogQueueSet *queueinformer.ResourceQueueSet - nsResolveQueue workqueue.TypedRateLimitingInterface[any] + nsResolveQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] namespace string recorder record.EventRecorder sources *grpc.SourceStore @@ -268,8 +270,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Wire InstallPlans ipInformer := crInformerFactory.Operators().V1alpha1().InstallPlans() op.lister.OperatorsV1alpha1().RegisterInstallPlanLister(metav1.NamespaceAll, ipInformer.Lister()) - ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "ips", }) op.ipQueueSet.Set(metav1.NamespaceAll, ipQueue) @@ -290,8 +293,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups() op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister()) - ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "ogs", }) op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue) @@ -312,8 +316,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Wire CatalogSources catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources() op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister()) - catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "catsrcs", }) op.catsrcQueueSet.Set(metav1.NamespaceAll, catsrcQueue) @@ -341,8 +346,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo subIndexer := subInformer.Informer().GetIndexer() op.catalogSubscriberIndexer[metav1.NamespaceAll] = subIndexer - subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "subs", }) op.subQueueSet.Set(metav1.NamespaceAll, subQueue) @@ -415,9 +421,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String(), "index": idx}) logger.Info("registering labeller") - queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{ - Name: gvr.String(), - }) + queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ + Name: gvr.String(), + }, + ) queueInformer, err := queueinformer.NewQueueInformer( ctx, queueinformer.WithQueue(queue), @@ -560,9 +569,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String()}) logger.Info("registering owner reference fixer") - queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{ - Name: gvr.String(), - }) + queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ + Name: gvr.String(), + }, + ) queueInformer, err := queueinformer.NewQueueInformer( ctx, queueinformer.WithQueue(queue), @@ -745,8 +757,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Namespace sync for resolving subscriptions namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces() op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister()) - op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "resolve", }) namespaceQueueInformer, err := queueinformer.NewQueueInformer( @@ -787,12 +800,12 @@ func (o *Operator) syncSourceState(state grpc.SourceState) { if err == nil { for ns := range namespaces { - o.nsResolveQueue.Add(ns) + o.nsResolveQueue.Add(kubestate.NewUpdateEvent(ns)) } } } - o.nsResolveQueue.Add(state.Key.Namespace) + o.nsResolveQueue.Add(kubestate.NewUpdateEvent(state.Key.Namespace)) } if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil { o.logger.WithError(err).Info("couldn't requeue catalogsource from catalog status change") @@ -873,18 +886,16 @@ func (o *Operator) handleDeletion(obj interface{}) { func (o *Operator) handleCatSrcDeletion(obj interface{}) { catsrc, ok := obj.(metav1.Object) if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) - return - } + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) + return + } - catsrc, ok = tombstone.Obj.(metav1.Object) - if !ok { - utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Namespace %#v", obj)) - return - } + catsrc, ok = tombstone.Obj.(metav1.Object) + if !ok { + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Namespace %#v", obj)) + return } } sourceKey := registry.CatalogKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()} @@ -1411,7 +1422,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { } logger.Info("unpacking is not complete yet, requeueing") - o.nsResolveQueue.AddAfter(namespace, 5*time.Second) + o.nsResolveQueue.AddAfter(kubestate.NewUpdateEvent(namespace), 5*time.Second) return nil } } @@ -1506,7 +1517,7 @@ func (o *Operator) syncSubscriptions(obj interface{}) error { return fmt.Errorf("casting Subscription failed") } - o.nsResolveQueue.Add(sub.GetNamespace()) + o.nsResolveQueue.Add(kubestate.NewUpdateEvent(sub.GetNamespace())) return nil } @@ -1520,7 +1531,7 @@ func (o *Operator) syncOperatorGroups(obj interface{}) error { return fmt.Errorf("casting OperatorGroup failed") } - o.nsResolveQueue.Add(og.GetNamespace()) + o.nsResolveQueue.Add(kubestate.NewUpdateEvent(og.GetNamespace())) return nil } diff --git a/pkg/controller/operators/catalog/operator_test.go b/pkg/controller/operators/catalog/operator_test.go index c251cfc77b..79575e02ae 100644 --- a/pkg/controller/operators/catalog/operator_test.go +++ b/pkg/controller/operators/catalog/operator_test.go @@ -13,6 +13,8 @@ import ( "testing/quick" "time" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" + "k8s.io/utils/ptr" controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client" @@ -2156,13 +2158,13 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string, client: clientFake, lister: lister, namespace: namespace, - nsResolveQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.NewTypedMaxOfRateLimiter[any]( - workqueue.NewTypedItemExponentialFailureRateLimiter[any](1*time.Second, 1000*time.Second), + nsResolveQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.NewTypedMaxOfRateLimiter[kubestate.ResourceEvent]( + workqueue.NewTypedItemExponentialFailureRateLimiter[kubestate.ResourceEvent](1*time.Second, 1000*time.Second), // 1 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) - &workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(1), 100)}, + &workqueue.TypedBucketRateLimiter[kubestate.ResourceEvent]{Limiter: rate.NewLimiter(rate.Limit(1), 100)}, ), - workqueue.TypedRateLimitingQueueConfig[any]{ + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "resolver", }), resolver: config.resolver, diff --git a/pkg/controller/operators/catalog/subscription/config.go b/pkg/controller/operators/catalog/subscription/config.go index c4c1877b64..1e16395154 100644 --- a/pkg/controller/operators/catalog/subscription/config.go +++ b/pkg/controller/operators/catalog/subscription/config.go @@ -23,7 +23,7 @@ type syncerConfig struct { subscriptionInformer cache.SharedIndexInformer catalogInformer cache.SharedIndexInformer installPlanInformer cache.SharedIndexInformer - subscriptionQueue workqueue.TypedRateLimitingInterface[any] + subscriptionQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] reconcilers kubestate.ReconcilerChain registryReconcilerFactory reconciler.RegistryReconcilerFactory globalCatalogNamespace string @@ -97,7 +97,7 @@ func WithOperatorLister(lister operatorlister.OperatorLister) SyncerOption { } // WithSubscriptionQueue sets a syncer's subscription queue. -func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[any]) SyncerOption { +func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) SyncerOption { return func(config *syncerConfig) { config.subscriptionQueue = subscriptionQueue } diff --git a/pkg/controller/operators/catalog/subscription/syncer.go b/pkg/controller/operators/catalog/subscription/syncer.go index 04e8edeb5a..5101a2d404 100644 --- a/pkg/controller/operators/catalog/subscription/syncer.go +++ b/pkg/controller/operators/catalog/subscription/syncer.go @@ -179,7 +179,7 @@ func (s *subscriptionSyncer) notifyOnInstallPlan(ctx context.Context, obj interf for _, owner := range owners { subKey := fmt.Sprintf("%s/%s", plan.GetNamespace(), owner.Name) logger.Tracef("notifying subscription %s", subKey) - s.Notify(kubestate.NewResourceEvent(kubestate.ResourceUpdated, cache.ExplicitKey(subKey))) + s.Notify(kubestate.NewResourceEvent(kubestate.ResourceUpdated, subKey)) } } diff --git a/pkg/controller/operators/catalogtemplate/operator.go b/pkg/controller/operators/catalogtemplate/operator.go index ea10454506..9c4ba85b74 100644 --- a/pkg/controller/operators/catalogtemplate/operator.go +++ b/pkg/controller/operators/catalogtemplate/operator.go @@ -6,6 +6,8 @@ import ( "strings" "time" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" + "github.com/distribution/reference" "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/sirupsen/logrus" @@ -101,8 +103,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, logger *logrus.Logg // Wire CatalogSources catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources() op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister()) - catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "catalogSourceTemplate", }) op.catalogSourceTemplateQueueSet.Set(metav1.NamespaceAll, catalogTemplateSrcQueue) diff --git a/pkg/controller/operators/olm/operator.go b/pkg/controller/operators/olm/operator.go index e6dfaaf1d0..87d120afe2 100644 --- a/pkg/controller/operators/olm/operator.go +++ b/pkg/controller/operators/olm/operator.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" + "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/olm/plugins" "github.com/sirupsen/logrus" @@ -83,11 +85,11 @@ type Operator struct { copiedCSVLister metadatalister.Lister ogQueueSet *queueinformer.ResourceQueueSet csvQueueSet *queueinformer.ResourceQueueSet - olmConfigQueue workqueue.TypedRateLimitingInterface[any] + olmConfigQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] csvCopyQueueSet *queueinformer.ResourceQueueSet copiedCSVGCQueueSet *queueinformer.ResourceQueueSet - nsQueueSet workqueue.TypedRateLimitingInterface[any] - apiServiceQueue workqueue.TypedRateLimitingInterface[any] + nsQueueSet workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] + apiServiceQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] csvIndexers map[string]cache.Indexer recorder record.EventRecorder resolver install.StrategyResolverInterface @@ -198,17 +200,17 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat client: config.externalClient, ogQueueSet: queueinformer.NewEmptyResourceQueueSet(), csvQueueSet: queueinformer.NewEmptyResourceQueueSet(), - olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "olmConfig", }), csvCopyQueueSet: queueinformer.NewEmptyResourceQueueSet(), copiedCSVGCQueueSet: queueinformer.NewEmptyResourceQueueSet(), - apiServiceQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + apiServiceQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "apiservice", }), resolver: config.strategyResolver, @@ -246,9 +248,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat ).Operators().V1alpha1().ClusterServiceVersions() informersByNamespace[namespace].CSVInformer = csvInformer op.lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, csvInformer.Lister()) - csvQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + csvQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: fmt.Sprintf("%s/csv", namespace), }) op.csvQueueSet.Set(namespace, csvQueue) @@ -273,7 +275,11 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat op.csvIndexers[namespace] = csvIndexer // Register separate queue for copying csvs - csvCopyQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), fmt.Sprintf("%s/csv-copy", namespace)) + csvCopyQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ + Name: fmt.Sprintf("%s/csv-copy", namespace), + }) op.csvCopyQueueSet.Set(namespace, csvCopyQueue) csvCopyQueueInformer, err := queueinformer.NewQueueInformer( ctx, @@ -307,9 +313,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat informersByNamespace[namespace].CopiedCSVLister = op.copiedCSVLister // Register separate queue for gcing copied csvs - copiedCSVGCQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + copiedCSVGCQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: fmt.Sprintf("%s/csv-gc", namespace), }) op.copiedCSVGCQueueSet.Set(namespace, copiedCSVGCQueue) @@ -333,9 +339,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat operatorGroupInformer := extInformerFactory.Operators().V1().OperatorGroups() informersByNamespace[namespace].OperatorGroupInformer = operatorGroupInformer op.lister.OperatorsV1().RegisterOperatorGroupLister(namespace, operatorGroupInformer.Lister()) - ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: fmt.Sprintf("%s/og", namespace), }) op.ogQueueSet.Set(namespace, ogQueue) @@ -522,9 +528,12 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String(), "index": idx}) logger.Info("registering labeller") - queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{ - Name: gvr.String(), - }) + queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ + Name: gvr.String(), + }, + ) queueInformer, err := queueinformer.NewQueueInformer( ctx, queueinformer.WithQueue(queue), @@ -696,9 +705,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), config.resyncPeriod()).Core().V1().Namespaces() informersByNamespace[metav1.NamespaceAll].NamespaceInformer = namespaceInformer op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister()) - op.nsQueueSet = workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + op.nsQueueSet = workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "resolver", }) namespaceInformer.Informer().AddEventHandler( @@ -1665,7 +1674,7 @@ func (a *Operator) syncCopyCSV(obj interface{}) (syncError error) { } if err == nil { - go a.olmConfigQueue.AddAfter(olmConfig, time.Second*5) + go a.olmConfigQueue.AddAfter(kubestate.NewUpdateEvent(olmConfig.GetName()), time.Second*5) } logger := a.logger.WithFields(logrus.Fields{ diff --git a/pkg/controller/operators/olm/operatorgroup.go b/pkg/controller/operators/olm/operatorgroup.go index 18c8b19008..cb382cd9b3 100644 --- a/pkg/controller/operators/olm/operatorgroup.go +++ b/pkg/controller/operators/olm/operatorgroup.go @@ -8,6 +8,8 @@ import ( "reflect" "strings" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" + "k8s.io/apimachinery/pkg/api/equality" "github.com/sirupsen/logrus" @@ -182,7 +184,7 @@ func (a *Operator) syncOperatorGroups(obj interface{}) error { logger.Debug("Requeueing out of sync namespaces") for _, ns := range outOfSyncNamespaces { logger.WithField("namespace", ns).Debug("requeueing") - a.nsQueueSet.Add(ns) + a.nsQueueSet.Add(kubestate.NewUpdateEvent(ns)) } // CSV requeue is handled by the succeeding sync in `annotateCSVs` @@ -263,7 +265,7 @@ func (a *Operator) operatorGroupDeleted(obj interface{}) { logger.Debug("OperatorGroup deleted, requeueing out of sync namespaces") for _, ns := range op.Status.Namespaces { logger.WithField("namespace", ns).Debug("requeueing") - a.nsQueueSet.Add(ns) + a.nsQueueSet.Add(kubestate.NewUpdateEvent(ns)) } } diff --git a/pkg/lib/kubestate/kubestate.go b/pkg/lib/kubestate/kubestate.go index 3f656069de..4d9af8e77c 100644 --- a/pkg/lib/kubestate/kubestate.go +++ b/pkg/lib/kubestate/kubestate.go @@ -2,6 +2,9 @@ package kubestate import ( "context" + "fmt" + "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/client" ) type State interface { @@ -148,6 +151,7 @@ const ( type ResourceEvent interface { Type() ResourceEventType Resource() interface{} + String() string } type resourceEvent struct { @@ -163,7 +167,39 @@ func (r resourceEvent) Resource() interface{} { return r.resource } +func (r resourceEvent) String() string { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(r.resource) + // should not happen as resources must be either cache.ExplicitKey or client.Object + // and this should be enforced in NewResourceEvent + if err != nil { + panic("could not get resource key: " + err.Error()) + } + return fmt.Sprintf("{%s %s}", string(r.eventType), key) +} + +func NewUpdateEvent(resource interface{}) ResourceEvent { + return NewResourceEvent(ResourceUpdated, resource) +} + +// NewResourceEvent creates a new resource event. The resource parameter must either be +// a client.Object, a string, a cache.DeletedFinalStateUnknown, or a cache.ExplicitKey. In case it is a string, it will be +// coerced to cache.ExplicitKey. This ensures that whether a reference (string/cache.ExplicitKey) +// or a resource, workqueue will treat the items in the same way and dedup appropriately. +// This behavior is guaranteed by the String() method, which will also ignore the type of event. +// I.e. Add/Update/Delete events for the same resource object or reference will be ded func NewResourceEvent(eventType ResourceEventType, resource interface{}) ResourceEvent { + // assert resource type + // only accept cache.ExplicitKey or client.Objects + switch r := resource.(type) { + case string: + resource = cache.ExplicitKey(r) + case cache.ExplicitKey: + case client.Object: + case cache.DeletedFinalStateUnknown: + default: + panic(fmt.Sprintf("NewResourceEvent called with invalid resource type: %T", resource)) + } + return resourceEvent{ eventType: eventType, resource: resource, diff --git a/pkg/lib/queueinformer/config.go b/pkg/lib/queueinformer/config.go index bd69d2403b..a94a9f2e70 100644 --- a/pkg/lib/queueinformer/config.go +++ b/pkg/lib/queueinformer/config.go @@ -1,6 +1,7 @@ package queueinformer import ( + "fmt" "github.com/pkg/errors" "github.com/sirupsen/logrus" "k8s.io/client-go/discovery" @@ -14,7 +15,7 @@ import ( type queueInformerConfig struct { provider metrics.MetricsProvider logger *logrus.Logger - queue workqueue.RateLimitingInterface + queue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] informer cache.SharedIndexInformer indexer cache.Indexer keyFunc KeyFunc @@ -81,21 +82,12 @@ func (c *queueInformerConfig) validateQueue() (err error) { } func defaultKeyFunc(obj interface{}) (string, bool) { - // Get keys nested in resource events up to depth 2 - keyable := false - for d := 0; d < 2 && !keyable; d++ { - switch v := obj.(type) { - case string: - return v, true - case kubestate.ResourceEvent: - obj = v.Resource() - default: - keyable = true - } + if re, ok := obj.(kubestate.ResourceEvent); ok { + obj = re.Resource() } - k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { + fmt.Printf("error getting key for object %v: %v", obj, err) return k, false } @@ -105,9 +97,9 @@ func defaultKeyFunc(obj interface{}) (string, bool) { func defaultConfig() *queueInformerConfig { return &queueInformerConfig{ provider: metrics.NewMetricsNil(), - queue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + queue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "default", }), logger: logrus.New(), @@ -130,7 +122,7 @@ func WithLogger(logger *logrus.Logger) Option { } // WithQueue sets the queue used by a QueueInformer. -func WithQueue(queue workqueue.RateLimitingInterface) Option { +func WithQueue(queue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) Option { return func(config *queueInformerConfig) { config.queue = queue } diff --git a/pkg/lib/queueinformer/queueinformer.go b/pkg/lib/queueinformer/queueinformer.go index 02a66cb527..ff7a7a6aa7 100644 --- a/pkg/lib/queueinformer/queueinformer.go +++ b/pkg/lib/queueinformer/queueinformer.go @@ -2,7 +2,6 @@ package queueinformer import ( "context" - "github.com/pkg/errors" "github.com/sirupsen/logrus" "k8s.io/client-go/tools/cache" @@ -23,7 +22,7 @@ type QueueInformer struct { metrics.MetricsProvider logger *logrus.Logger - queue workqueue.RateLimitingInterface + queue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] informer cache.SharedIndexInformer indexer cache.Indexer keyFunc KeyFunc @@ -43,21 +42,25 @@ func (q *QueueInformer) Enqueue(event kubestate.ResourceEvent) { return } - resource := event.Resource() - if event.Type() == kubestate.ResourceDeleted { - // Get object from tombstone if possible - if tombstone, ok := resource.(cache.DeletedFinalStateUnknown); ok { - resource = tombstone - } - } else { - // Extract key for add and update events - if key, ok := q.key(resource); ok { - resource = key - } - } + e := event + + // Delete operations should always carry either something assignable to + // metev1.Object or cache.DeletedFinalStateUnknown. + // Add/Update events coming from the informer should have their resource + // converted to a key (string) before being enqueued. + //if event.Type() != kubestate.ResourceDeleted { + // // Extract key for add and update events + // if key, ok := q.key(e.Resource()); ok { + // e = kubestate.NewResourceEvent(event.Type(), cache.ExplicitKey(key)) + // } else { + // // if the resource cannot be keyed the worker will not be able to process it + // // since it will not be able to retrieve the resource + // q.logger.WithField("event", e).Warn(fmt.Sprintf("resource of type %T is not keyable - skipping enqueue", e.Resource())) + // return + // } + //} // Create new resource event and add to queue - e := kubestate.NewResourceEvent(event.Type(), resource) q.logger.WithField("event", e).Trace("enqueuing resource event") q.queue.Add(e) } @@ -69,7 +72,7 @@ func (q *QueueInformer) key(obj interface{}) (string, bool) { // resourceHandlers provides the default implementation for responding to events // these simply Log the event and add the object's key to the queue for later processing. -func (q *QueueInformer) resourceHandlers(ctx context.Context) *cache.ResourceEventHandlerFuncs { +func (q *QueueInformer) resourceHandlers(_ context.Context) *cache.ResourceEventHandlerFuncs { return &cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { q.Enqueue(kubestate.NewResourceEvent(kubestate.ResourceUpdated, obj)) @@ -104,25 +107,6 @@ func (q *QueueInformer) metricHandlers() *cache.ResourceEventHandlerFuncs { } } -func NewQueue(ctx context.Context, options ...Option) (*QueueInformer, error) { - config := defaultConfig() - config.apply(options) - - if err := config.validateQueue(); err != nil { - return nil, err - } - - queue := &QueueInformer{ - MetricsProvider: config.provider, - logger: config.logger, - queue: config.queue, - keyFunc: config.keyFunc, - syncer: config.syncer, - } - - return queue, nil -} - // NewQueueInformer returns a new QueueInformer configured with options. func NewQueueInformer(ctx context.Context, options ...Option) (*QueueInformer, error) { // Get default config and apply given options diff --git a/pkg/lib/queueinformer/queueinformer_operator.go b/pkg/lib/queueinformer/queueinformer_operator.go index ecdb4eb896..e4589be1a5 100644 --- a/pkg/lib/queueinformer/queueinformer_operator.go +++ b/pkg/lib/queueinformer/queueinformer_operator.go @@ -262,6 +262,28 @@ func (o *operator) worker(ctx context.Context, loop *QueueInformer) { } func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) bool { + // **************************** WARNING **************************** + // The QueueInformer listens to resource events raised by its + // (client-go) informer. For Add/Update event, it extracts the key + // for the resource and adds to its queue (that we Get() from below) + // a ResourceEvent carrying the key. + // **Except** if it is a deletion event. In that case, + // ResourceEvent carries the resource object (or tombstone). + // The sync'er expects a ResourceEvent carrying the resource. + // So, in the case of an Add/Update event coming from the queue, + // the resource is acquired from the index (through the key), and then + // a ResourceEvent carrying the resource is handed to the syncer. + // It should also be noted that throughout the code, items are added to + // queueinformers out of band of informer notifications. + // The fact that the queueinformers queue processes ResourceEvents, which + // themselves encapsulate an interface{} "Resource" make it tricky for the + // queue to dedup. Previous to the writing of this comment, the queue was + // processing strings and ResourceEvents, which led to concurrent processing + // of the same resource. To address this, we enforce (with panic) that the resource + // in the ResourceEvent must either be a cache.ExplicitKey or a client.Object. + // We then make sure that the ResourceEvent's String() returns the key for the + // encapsulated resource. Thus, independent of the resource type, the queue always + // processes it by key and dedups appropriately. queue := loop.queue item, quit := queue.Get() @@ -271,50 +293,39 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) defer queue.Done(item) logger := o.logger.WithField("item", item) - logger.WithField("queue-length", queue.Len()).Trace("popped queue") + logger.WithField("queue-length", queue.Len()).Info("popped queue") - event, ok := item.(kubestate.ResourceEvent) - if !ok || event.Type() != kubestate.ResourceDeleted { - // Get the key + var event = item + if item.Type() != kubestate.ResourceDeleted { key, keyable := loop.key(item) if !keyable { - logger.WithField("item", item).Warn("could not form key") + logger.WithField("item", item).Warnf("could not form key %s", item) queue.Forget(item) return true } logger = logger.WithField("cache-key", key) - var resource interface{} - if loop.indexer == nil { - resource = event.Resource() - } else { - // Get the current cached version of the resource - var exists bool - var err error - resource, exists, err = loop.indexer.GetByKey(key) - if err != nil { - logger.WithError(err).Error("cache get failed") - queue.Forget(item) - return true - } - if !exists { - logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache") - queue.Forget(item) - return true - } + // Get the current cached version of the resource + var exists bool + var err error + resource, exists, err := loop.indexer.GetByKey(key) + if err != nil { + logger.WithError(err).Error("cache get failed") + queue.Forget(item) + return true } - - if !ok { - event = kubestate.NewResourceEvent(kubestate.ResourceUpdated, resource) - } else { - event = kubestate.NewResourceEvent(event.Type(), resource) + if !exists { + logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache") + queue.Forget(item) + return true } + event = kubestate.NewResourceEvent(item.Type(), resource) } // Sync and requeue on error (throw out failed deletion syncs) err := loop.Sync(ctx, event) - if requeues := queue.NumRequeues(item); err != nil && requeues < 8 && event.Type() != kubestate.ResourceDeleted { + if requeues := queue.NumRequeues(item); err != nil && requeues < 8 && item.Type() != kubestate.ResourceDeleted { logger.WithField("requeues", requeues).Trace("requeuing with rate limiting") utilruntime.HandleError(errors.Wrap(err, fmt.Sprintf("sync %q failed", item))) queue.AddRateLimited(item) diff --git a/pkg/lib/queueinformer/resourcequeue.go b/pkg/lib/queueinformer/resourcequeue.go index 0e4da56cde..6881b9784b 100644 --- a/pkg/lib/queueinformer/resourcequeue.go +++ b/pkg/lib/queueinformer/resourcequeue.go @@ -14,22 +14,22 @@ import ( // ResourceQueueSet is a set of workqueues that is assumed to be keyed by namespace type ResourceQueueSet struct { - queueSet map[string]workqueue.RateLimitingInterface + queueSet map[string]workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] mutex sync.RWMutex } // NewResourceQueueSet returns a new queue set with the given queue map -func NewResourceQueueSet(queueSet map[string]workqueue.RateLimitingInterface) *ResourceQueueSet { +func NewResourceQueueSet(queueSet map[string]workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) *ResourceQueueSet { return &ResourceQueueSet{queueSet: queueSet} } // NewEmptyResourceQueueSet returns a new queue set with an empty but initialized queue map func NewEmptyResourceQueueSet() *ResourceQueueSet { - return &ResourceQueueSet{queueSet: make(map[string]workqueue.RateLimitingInterface)} + return &ResourceQueueSet{queueSet: make(map[string]workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent])} } // Set sets the queue at the given key -func (r *ResourceQueueSet) Set(key string, queue workqueue.RateLimitingInterface) { +func (r *ResourceQueueSet) Set(key string, queue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) { r.mutex.Lock() defer r.mutex.Unlock() r.queueSet[key] = queue diff --git a/pkg/package-server/server/server.go b/pkg/package-server/server/server.go index 85ad4931dd..030bf001f6 100644 --- a/pkg/package-server/server/server.go +++ b/pkg/package-server/server/server.go @@ -8,6 +8,8 @@ import ( "os" "time" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -35,7 +37,7 @@ const DefaultWakeupInterval = 12 * time.Hour type Operator struct { queueinformer.Operator - olmConfigQueue workqueue.TypedRateLimitingInterface[any] + olmConfigQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent] options *PackageServerOptions } @@ -239,9 +241,9 @@ func (o *PackageServerOptions) Run(ctx context.Context) error { op := &Operator{ Operator: queueOperator, - olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent]( + workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](), + workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{ Name: "olmConfig", }), options: o,