Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 43 additions & 32 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"sync"
"time"

"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/validatingroundtripper"
errorwrap "github.com/pkg/errors"
Expand Down Expand Up @@ -114,7 +116,7 @@ type Operator struct {
subQueueSet *queueinformer.ResourceQueueSet
ipQueueSet *queueinformer.ResourceQueueSet
ogQueueSet *queueinformer.ResourceQueueSet
nsResolveQueue workqueue.TypedRateLimitingInterface[any]
nsResolveQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
namespace string
recorder record.EventRecorder
sources *grpc.SourceStore
Expand Down Expand Up @@ -268,8 +270,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Wire InstallPlans
ipInformer := crInformerFactory.Operators().V1alpha1().InstallPlans()
op.lister.OperatorsV1alpha1().RegisterInstallPlanLister(metav1.NamespaceAll, ipInformer.Lister())
ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "ips",
})
op.ipQueueSet.Set(metav1.NamespaceAll, ipQueue)
Expand All @@ -290,8 +293,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo

operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups()
op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister())
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "ogs",
})
op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue)
Expand All @@ -312,8 +316,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Wire CatalogSources
catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources()
op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister())
catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "catsrcs",
})
op.catsrcQueueSet.Set(metav1.NamespaceAll, catsrcQueue)
Expand Down Expand Up @@ -341,8 +346,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
subIndexer := subInformer.Informer().GetIndexer()
op.catalogSubscriberIndexer[metav1.NamespaceAll] = subIndexer

subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "subs",
})
op.subQueueSet.Set(metav1.NamespaceAll, subQueue)
Expand Down Expand Up @@ -415,9 +421,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String(), "index": idx})
logger.Info("registering labeller")

queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{
Name: gvr.String(),
})
queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: gvr.String(),
},
)
queueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithQueue(queue),
Expand Down Expand Up @@ -560,9 +569,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String()})
logger.Info("registering owner reference fixer")

queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{
Name: gvr.String(),
})
queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: gvr.String(),
},
)
queueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithQueue(queue),
Expand Down Expand Up @@ -745,8 +757,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Namespace sync for resolving subscriptions
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "resolve",
})
namespaceQueueInformer, err := queueinformer.NewQueueInformer(
Expand Down Expand Up @@ -787,12 +800,12 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {

if err == nil {
for ns := range namespaces {
o.nsResolveQueue.Add(ns)
o.nsResolveQueue.Add(kubestate.NewUpdateEvent(ns))
}
}
}

o.nsResolveQueue.Add(state.Key.Namespace)
o.nsResolveQueue.Add(kubestate.NewUpdateEvent(state.Key.Namespace))
}
if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil {
o.logger.WithError(err).Info("couldn't requeue catalogsource from catalog status change")
Expand Down Expand Up @@ -873,18 +886,16 @@ func (o *Operator) handleDeletion(obj interface{}) {
func (o *Operator) handleCatSrcDeletion(obj interface{}) {
catsrc, ok := obj.(metav1.Object)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}

catsrc, ok = tombstone.Obj.(metav1.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Namespace %#v", obj))
return
}
catsrc, ok = tombstone.Obj.(metav1.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Namespace %#v", obj))
return
}
}
sourceKey := registry.CatalogKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()}
Expand Down Expand Up @@ -1411,7 +1422,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
}

logger.Info("unpacking is not complete yet, requeueing")
o.nsResolveQueue.AddAfter(namespace, 5*time.Second)
o.nsResolveQueue.AddAfter(kubestate.NewUpdateEvent(namespace), 5*time.Second)
return nil
}
}
Expand Down Expand Up @@ -1506,7 +1517,7 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
return fmt.Errorf("casting Subscription failed")
}

o.nsResolveQueue.Add(sub.GetNamespace())
o.nsResolveQueue.Add(kubestate.NewUpdateEvent(sub.GetNamespace()))

return nil
}
Expand All @@ -1520,7 +1531,7 @@ func (o *Operator) syncOperatorGroups(obj interface{}) error {
return fmt.Errorf("casting OperatorGroup failed")
}

o.nsResolveQueue.Add(og.GetNamespace())
o.nsResolveQueue.Add(kubestate.NewUpdateEvent(og.GetNamespace()))

return nil
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/controller/operators/catalog/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"testing/quick"
"time"

"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"

"k8s.io/utils/ptr"

controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
Expand Down Expand Up @@ -2156,13 +2158,13 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string,
client: clientFake,
lister: lister,
namespace: namespace,
nsResolveQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
workqueue.NewTypedMaxOfRateLimiter[any](
workqueue.NewTypedItemExponentialFailureRateLimiter[any](1*time.Second, 1000*time.Second),
nsResolveQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.NewTypedMaxOfRateLimiter[kubestate.ResourceEvent](
workqueue.NewTypedItemExponentialFailureRateLimiter[kubestate.ResourceEvent](1*time.Second, 1000*time.Second),
// 1 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(1), 100)},
&workqueue.TypedBucketRateLimiter[kubestate.ResourceEvent]{Limiter: rate.NewLimiter(rate.Limit(1), 100)},
),
workqueue.TypedRateLimitingQueueConfig[any]{
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "resolver",
}),
resolver: config.resolver,
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/operators/catalog/subscription/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type syncerConfig struct {
subscriptionInformer cache.SharedIndexInformer
catalogInformer cache.SharedIndexInformer
installPlanInformer cache.SharedIndexInformer
subscriptionQueue workqueue.TypedRateLimitingInterface[any]
subscriptionQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
reconcilers kubestate.ReconcilerChain
registryReconcilerFactory reconciler.RegistryReconcilerFactory
globalCatalogNamespace string
Expand Down Expand Up @@ -97,7 +97,7 @@ func WithOperatorLister(lister operatorlister.OperatorLister) SyncerOption {
}

// WithSubscriptionQueue sets a syncer's subscription queue.
func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[any]) SyncerOption {
func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) SyncerOption {
return func(config *syncerConfig) {
config.subscriptionQueue = subscriptionQueue
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/operators/catalog/subscription/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (s *subscriptionSyncer) notifyOnInstallPlan(ctx context.Context, obj interf
for _, owner := range owners {
subKey := fmt.Sprintf("%s/%s", plan.GetNamespace(), owner.Name)
logger.Tracef("notifying subscription %s", subKey)
s.Notify(kubestate.NewResourceEvent(kubestate.ResourceUpdated, cache.ExplicitKey(subKey)))
s.Notify(kubestate.NewResourceEvent(kubestate.ResourceUpdated, subKey))
}
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/controller/operators/catalogtemplate/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"time"

"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"

"github.com/distribution/reference"
"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -101,8 +103,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, logger *logrus.Logg
// Wire CatalogSources
catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources()
op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister())
catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
Name: "catalogSourceTemplate",
})
op.catalogSourceTemplateQueueSet.Set(metav1.NamespaceAll, catalogTemplateSrcQueue)
Expand Down
Loading
Loading