@@ -30,6 +30,7 @@ import (
3030 "k8s.io/apimachinery/pkg/runtime"
3131 "k8s.io/apimachinery/pkg/runtime/schema"
3232 "k8s.io/apimachinery/pkg/selection"
33+ "k8s.io/apimachinery/pkg/types"
3334 utilerrors "k8s.io/apimachinery/pkg/util/errors"
3435 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3536 "k8s.io/apimachinery/pkg/util/sets"
@@ -114,7 +115,7 @@ type Operator struct {
114115 subQueueSet * queueinformer.ResourceQueueSet
115116 ipQueueSet * queueinformer.ResourceQueueSet
116117 ogQueueSet * queueinformer.ResourceQueueSet
117- nsResolveQueue workqueue.TypedRateLimitingInterface [any ]
118+ nsResolveQueue workqueue.TypedRateLimitingInterface [types. NamespacedName ]
118119 namespace string
119120 recorder record.EventRecorder
120121 sources * grpc.SourceStore
@@ -268,8 +269,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
268269 // Wire InstallPlans
269270 ipInformer := crInformerFactory .Operators ().V1alpha1 ().InstallPlans ()
270271 op .lister .OperatorsV1alpha1 ().RegisterInstallPlanLister (metav1 .NamespaceAll , ipInformer .Lister ())
271- ipQueue := workqueue .NewTypedRateLimitingQueueWithConfig [any ](workqueue .DefaultTypedControllerRateLimiter [any ](),
272- workqueue.TypedRateLimitingQueueConfig [any ]{
272+ ipQueue := workqueue .NewTypedRateLimitingQueueWithConfig [types. NamespacedName ](workqueue .DefaultTypedControllerRateLimiter [types. NamespacedName ](),
273+ workqueue.TypedRateLimitingQueueConfig [types. NamespacedName ]{
273274 Name : "ips" ,
274275 })
275276 op .ipQueueSet .Set (metav1 .NamespaceAll , ipQueue )
@@ -290,8 +291,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
290291
291292 operatorGroupInformer := crInformerFactory .Operators ().V1 ().OperatorGroups ()
292293 op .lister .OperatorsV1 ().RegisterOperatorGroupLister (metav1 .NamespaceAll , operatorGroupInformer .Lister ())
293- ogQueue := workqueue .NewTypedRateLimitingQueueWithConfig [any ](workqueue .DefaultTypedControllerRateLimiter [any ](),
294- workqueue.TypedRateLimitingQueueConfig [any ]{
294+ ogQueue := workqueue .NewTypedRateLimitingQueueWithConfig [types. NamespacedName ](workqueue .DefaultTypedControllerRateLimiter [types. NamespacedName ](),
295+ workqueue.TypedRateLimitingQueueConfig [types. NamespacedName ]{
295296 Name : "ogs" ,
296297 })
297298 op .ogQueueSet .Set (metav1 .NamespaceAll , ogQueue )
@@ -312,8 +313,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
312313 // Wire CatalogSources
313314 catsrcInformer := crInformerFactory .Operators ().V1alpha1 ().CatalogSources ()
314315 op .lister .OperatorsV1alpha1 ().RegisterCatalogSourceLister (metav1 .NamespaceAll , catsrcInformer .Lister ())
315- catsrcQueue := workqueue .NewTypedRateLimitingQueueWithConfig [any ](workqueue .DefaultTypedControllerRateLimiter [any ](),
316- workqueue.TypedRateLimitingQueueConfig [any ]{
316+ catsrcQueue := workqueue .NewTypedRateLimitingQueueWithConfig [types. NamespacedName ](workqueue .DefaultTypedControllerRateLimiter [types. NamespacedName ](),
317+ workqueue.TypedRateLimitingQueueConfig [types. NamespacedName ]{
317318 Name : "catsrcs" ,
318319 })
319320 op .catsrcQueueSet .Set (metav1 .NamespaceAll , catsrcQueue )
@@ -323,7 +324,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
323324 queueinformer .WithLogger (op .logger ),
324325 queueinformer .WithQueue (catsrcQueue ),
325326 queueinformer .WithInformer (catsrcInformer .Informer ()),
326- queueinformer .WithSyncer (queueinformer .LegacySyncHandler (op .syncCatalogSources ).ToSyncerWithDelete (op .handleCatSrcDeletion )),
327+ queueinformer .WithSyncer (queueinformer .LegacySyncHandler (op .syncCatalogSources ).ToSyncer ()),
328+ queueinformer .WithDeletionHandler (op .handleCatSrcDeletion ),
327329 )
328330 if err != nil {
329331 return nil , err
@@ -341,8 +343,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
341343 subIndexer := subInformer .Informer ().GetIndexer ()
342344 op .catalogSubscriberIndexer [metav1 .NamespaceAll ] = subIndexer
343345
344- subQueue := workqueue .NewTypedRateLimitingQueueWithConfig [any ](workqueue .DefaultTypedControllerRateLimiter [any ](),
345- workqueue.TypedRateLimitingQueueConfig [any ]{
346+ subQueue := workqueue .NewTypedRateLimitingQueueWithConfig [types. NamespacedName ](workqueue .DefaultTypedControllerRateLimiter [types. NamespacedName ](),
347+ workqueue.TypedRateLimitingQueueConfig [types. NamespacedName ]{
346348 Name : "subs" ,
347349 })
348350 op .subQueueSet .Set (metav1 .NamespaceAll , subQueue )
@@ -355,7 +357,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
355357 subscription .WithCatalogInformer (catsrcInformer .Informer ()),
356358 subscription .WithInstallPlanInformer (ipInformer .Informer ()),
357359 subscription .WithSubscriptionQueue (subQueue ),
358- subscription .WithAppendedReconcilers (subscription .ReconcilerFromLegacySyncHandler (op .syncSubscriptions , nil )),
360+ subscription .WithAppendedReconcilers (subscription .ReconcilerFromLegacySyncHandler (op .syncSubscriptions )),
359361 subscription .WithRegistryReconcilerFactory (op .reconciler ),
360362 subscription .WithGlobalCatalogNamespace (op .namespace ),
361363 subscription .WithSourceProvider (op .resolverSourceProvider ),
@@ -415,7 +417,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
415417 logger := op .logger .WithFields (logrus.Fields {"gvr" : gvr .String (), "index" : idx })
416418 logger .Info ("registering labeller" )
417419
418- queue := workqueue .NewTypedRateLimitingQueueWithConfig [any ](workqueue .DefaultTypedControllerRateLimiter [any ](), workqueue.TypedRateLimitingQueueConfig [any ]{
420+ queue := workqueue .NewTypedRateLimitingQueueWithConfig [types. NamespacedName ](workqueue .DefaultTypedControllerRateLimiter [types. NamespacedName ](), workqueue.TypedRateLimitingQueueConfig [types. NamespacedName ]{
419421 Name : gvr .String (),
420422 })
421423 queueInformer , err := queueinformer .NewQueueInformer (
@@ -560,7 +562,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
560562 logger := op .logger .WithFields (logrus.Fields {"gvr" : gvr .String ()})
561563 logger .Info ("registering owner reference fixer" )
562564
563- queue := workqueue .NewTypedRateLimitingQueueWithConfig [any ](workqueue .DefaultTypedControllerRateLimiter [any ](), workqueue.TypedRateLimitingQueueConfig [any ]{
565+ queue := workqueue .NewTypedRateLimitingQueueWithConfig [types. NamespacedName ](workqueue .DefaultTypedControllerRateLimiter [types. NamespacedName ](), workqueue.TypedRateLimitingQueueConfig [types. NamespacedName ]{
564566 Name : gvr .String (),
565567 })
566568 queueInformer , err := queueinformer .NewQueueInformer (
@@ -670,13 +672,14 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
670672 }
671673
672674 // Generate and register QueueInformers for k8s resources
673- k8sSyncer := queueinformer .LegacySyncHandler (op .syncObject ).ToSyncerWithDelete ( op . handleDeletion )
675+ k8sSyncer := queueinformer .LegacySyncHandler (op .syncObject ).ToSyncer ( )
674676 for _ , informer := range sharedIndexInformers {
675677 queueInformer , err := queueinformer .NewQueueInformer (
676678 ctx ,
677679 queueinformer .WithLogger (op .logger ),
678680 queueinformer .WithInformer (informer ),
679681 queueinformer .WithSyncer (k8sSyncer ),
682+ queueinformer .WithDeletionHandler (op .handleDeletion ),
680683 )
681684 if err != nil {
682685 return nil , err
@@ -724,7 +727,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
724727 ctx ,
725728 queueinformer .WithLogger (op .logger ),
726729 queueinformer .WithInformer (crdInformer ),
727- queueinformer .WithSyncer (queueinformer .LegacySyncHandler (op .syncObject ).ToSyncerWithDelete (op .handleDeletion )),
730+ queueinformer .WithSyncer (queueinformer .LegacySyncHandler (op .syncObject ).ToSyncer ()),
731+ queueinformer .WithDeletionHandler (op .handleDeletion ),
728732 )
729733 if err != nil {
730734 return nil , err
@@ -745,8 +749,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
745749 // Namespace sync for resolving subscriptions
746750 namespaceInformer := informers .NewSharedInformerFactory (op .opClient .KubernetesInterface (), resyncPeriod ()).Core ().V1 ().Namespaces ()
747751 op .lister .CoreV1 ().RegisterNamespaceLister (namespaceInformer .Lister ())
748- op .nsResolveQueue = workqueue .NewTypedRateLimitingQueueWithConfig [any ](workqueue .DefaultTypedControllerRateLimiter [any ](),
749- workqueue.TypedRateLimitingQueueConfig [any ]{
752+ op .nsResolveQueue = workqueue .NewTypedRateLimitingQueueWithConfig [types. NamespacedName ](workqueue .DefaultTypedControllerRateLimiter [types. NamespacedName ](),
753+ workqueue.TypedRateLimitingQueueConfig [types. NamespacedName ]{
750754 Name : "resolve" ,
751755 })
752756 namespaceQueueInformer , err := queueinformer .NewQueueInformer (
@@ -787,12 +791,12 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {
787791
788792 if err == nil {
789793 for ns := range namespaces {
790- o .nsResolveQueue .Add (ns )
794+ o .nsResolveQueue .Add (types. NamespacedName { Name : ns } )
791795 }
792796 }
793797 }
794798
795- o .nsResolveQueue .Add (state .Key .Namespace )
799+ o .nsResolveQueue .Add (types. NamespacedName { Name : state .Key .Namespace } )
796800 }
797801 if err := o .catsrcQueueSet .Requeue (state .Key .Namespace , state .Key .Name ); err != nil {
798802 o .logger .WithError (err ).Info ("couldn't requeue catalogsource from catalog status change" )
@@ -873,18 +877,16 @@ func (o *Operator) handleDeletion(obj interface{}) {
873877func (o * Operator ) handleCatSrcDeletion (obj interface {}) {
874878 catsrc , ok := obj .(metav1.Object )
875879 if ! ok {
880+ tombstone , ok := obj .(cache.DeletedFinalStateUnknown )
876881 if ! ok {
877- tombstone , ok := obj .(cache.DeletedFinalStateUnknown )
878- if ! ok {
879- utilruntime .HandleError (fmt .Errorf ("couldn't get object from tombstone %#v" , obj ))
880- return
881- }
882+ utilruntime .HandleError (fmt .Errorf ("couldn't get object from tombstone %#v" , obj ))
883+ return
884+ }
882885
883- catsrc , ok = tombstone .Obj .(metav1.Object )
884- if ! ok {
885- utilruntime .HandleError (fmt .Errorf ("tombstone contained object that is not a Namespace %#v" , obj ))
886- return
887- }
886+ catsrc , ok = tombstone .Obj .(metav1.Object )
887+ if ! ok {
888+ utilruntime .HandleError (fmt .Errorf ("tombstone contained object that is not a Namespace %#v" , obj ))
889+ return
888890 }
889891 }
890892 sourceKey := registry.CatalogKey {Name : catsrc .GetName (), Namespace : catsrc .GetNamespace ()}
@@ -1411,7 +1413,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
14111413 }
14121414
14131415 logger .Info ("unpacking is not complete yet, requeueing" )
1414- o .nsResolveQueue .AddAfter (namespace , 5 * time .Second )
1416+ o .nsResolveQueue .AddAfter (types. NamespacedName { Name : namespace } , 5 * time .Second )
14151417 return nil
14161418 }
14171419 }
@@ -1506,7 +1508,7 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
15061508 return fmt .Errorf ("casting Subscription failed" )
15071509 }
15081510
1509- o .nsResolveQueue .Add (sub .GetNamespace ())
1511+ o .nsResolveQueue .Add (types. NamespacedName { Name : sub .GetNamespace ()} )
15101512
15111513 return nil
15121514}
@@ -1520,7 +1522,7 @@ func (o *Operator) syncOperatorGroups(obj interface{}) error {
15201522 return fmt .Errorf ("casting OperatorGroup failed" )
15211523 }
15221524
1523- o .nsResolveQueue .Add (og .GetNamespace ())
1525+ o .nsResolveQueue .Add (types. NamespacedName { Name : og .GetNamespace ()} )
15241526
15251527 return nil
15261528}
0 commit comments