Skip to content

Commit f9ed54e

Browse files
author
Per Goncalves da Silva
committed
queue item fix
Signed-off-by: Per Goncalves da Silva <[email protected]>
1 parent b7aa493 commit f9ed54e

File tree

11 files changed

+108
-81
lines changed

11 files changed

+108
-81
lines changed

pkg/controller/operators/catalog/operator.go

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
89
"reflect"
910
"sort"
1011
"strings"
@@ -114,7 +115,7 @@ type Operator struct {
114115
subQueueSet *queueinformer.ResourceQueueSet
115116
ipQueueSet *queueinformer.ResourceQueueSet
116117
ogQueueSet *queueinformer.ResourceQueueSet
117-
nsResolveQueue workqueue.TypedRateLimitingInterface[any]
118+
nsResolveQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
118119
namespace string
119120
recorder record.EventRecorder
120121
sources *grpc.SourceStore
@@ -268,8 +269,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
268269
// Wire InstallPlans
269270
ipInformer := crInformerFactory.Operators().V1alpha1().InstallPlans()
270271
op.lister.OperatorsV1alpha1().RegisterInstallPlanLister(metav1.NamespaceAll, ipInformer.Lister())
271-
ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
272-
workqueue.TypedRateLimitingQueueConfig[any]{
272+
ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
273+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
274+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
273275
Name: "ips",
274276
})
275277
op.ipQueueSet.Set(metav1.NamespaceAll, ipQueue)
@@ -290,8 +292,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
290292

291293
operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups()
292294
op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister())
293-
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
294-
workqueue.TypedRateLimitingQueueConfig[any]{
295+
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
296+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
297+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
295298
Name: "ogs",
296299
})
297300
op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue)
@@ -312,8 +315,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
312315
// Wire CatalogSources
313316
catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources()
314317
op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister())
315-
catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
316-
workqueue.TypedRateLimitingQueueConfig[any]{
318+
catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
319+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
320+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
317321
Name: "catsrcs",
318322
})
319323
op.catsrcQueueSet.Set(metav1.NamespaceAll, catsrcQueue)
@@ -341,8 +345,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
341345
subIndexer := subInformer.Informer().GetIndexer()
342346
op.catalogSubscriberIndexer[metav1.NamespaceAll] = subIndexer
343347

344-
subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
345-
workqueue.TypedRateLimitingQueueConfig[any]{
348+
subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
349+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
350+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
346351
Name: "subs",
347352
})
348353
op.subQueueSet.Set(metav1.NamespaceAll, subQueue)
@@ -415,9 +420,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
415420
logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String(), "index": idx})
416421
logger.Info("registering labeller")
417422

418-
queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{
419-
Name: gvr.String(),
420-
})
423+
queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
424+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
425+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
426+
Name: gvr.String(),
427+
},
428+
)
421429
queueInformer, err := queueinformer.NewQueueInformer(
422430
ctx,
423431
queueinformer.WithQueue(queue),
@@ -560,9 +568,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
560568
logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String()})
561569
logger.Info("registering owner reference fixer")
562570

563-
queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{
564-
Name: gvr.String(),
565-
})
571+
queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
572+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
573+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
574+
Name: gvr.String(),
575+
},
576+
)
566577
queueInformer, err := queueinformer.NewQueueInformer(
567578
ctx,
568579
queueinformer.WithQueue(queue),
@@ -745,8 +756,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
745756
// Namespace sync for resolving subscriptions
746757
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
747758
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
748-
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
749-
workqueue.TypedRateLimitingQueueConfig[any]{
759+
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
760+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
761+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
750762
Name: "resolve",
751763
})
752764
namespaceQueueInformer, err := queueinformer.NewQueueInformer(
@@ -787,12 +799,12 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {
787799

788800
if err == nil {
789801
for ns := range namespaces {
790-
o.nsResolveQueue.Add(ns)
802+
o.nsResolveQueue.Add(kubestate.NewUpdateEvent(ns))
791803
}
792804
}
793805
}
794806

795-
o.nsResolveQueue.Add(state.Key.Namespace)
807+
o.nsResolveQueue.Add(kubestate.NewUpdateEvent(state.Key.Namespace))
796808
}
797809
if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil {
798810
o.logger.WithError(err).Info("couldn't requeue catalogsource from catalog status change")
@@ -1411,7 +1423,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
14111423
}
14121424

14131425
logger.Info("unpacking is not complete yet, requeueing")
1414-
o.nsResolveQueue.AddAfter(namespace, 5*time.Second)
1426+
o.nsResolveQueue.AddAfter(kubestate.NewUpdateEvent(namespace), 5*time.Second)
14151427
return nil
14161428
}
14171429
}
@@ -1506,7 +1518,7 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
15061518
return fmt.Errorf("casting Subscription failed")
15071519
}
15081520

1509-
o.nsResolveQueue.Add(sub.GetNamespace())
1521+
o.nsResolveQueue.Add(kubestate.NewUpdateEvent(sub.GetNamespace()))
15101522

15111523
return nil
15121524
}
@@ -1520,7 +1532,7 @@ func (o *Operator) syncOperatorGroups(obj interface{}) error {
15201532
return fmt.Errorf("casting OperatorGroup failed")
15211533
}
15221534

1523-
o.nsResolveQueue.Add(og.GetNamespace())
1535+
o.nsResolveQueue.Add(kubestate.NewUpdateEvent(og.GetNamespace()))
15241536

15251537
return nil
15261538
}

pkg/controller/operators/catalog/subscription/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type syncerConfig struct {
2323
subscriptionInformer cache.SharedIndexInformer
2424
catalogInformer cache.SharedIndexInformer
2525
installPlanInformer cache.SharedIndexInformer
26-
subscriptionQueue workqueue.TypedRateLimitingInterface[any]
26+
subscriptionQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
2727
reconcilers kubestate.ReconcilerChain
2828
registryReconcilerFactory reconciler.RegistryReconcilerFactory
2929
globalCatalogNamespace string
@@ -97,7 +97,7 @@ func WithOperatorLister(lister operatorlister.OperatorLister) SyncerOption {
9797
}
9898

9999
// WithSubscriptionQueue sets a syncer's subscription queue.
100-
func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[any]) SyncerOption {
100+
func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) SyncerOption {
101101
return func(config *syncerConfig) {
102102
config.subscriptionQueue = subscriptionQueue
103103
}

pkg/controller/operators/catalogtemplate/operator.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package catalogtemplate
33
import (
44
"context"
55
"fmt"
6+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
67
"strings"
78
"time"
89

@@ -101,8 +102,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, logger *logrus.Logg
101102
// Wire CatalogSources
102103
catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources()
103104
op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister())
104-
catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
105-
workqueue.TypedRateLimitingQueueConfig[any]{
105+
catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
106+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
107+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
106108
Name: "catalogSourceTemplate",
107109
})
108110
op.catalogSourceTemplateQueueSet.Set(metav1.NamespaceAll, catalogTemplateSrcQueue)

pkg/controller/operators/olm/operator.go

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
78
"strings"
89
"sync"
910
"time"
@@ -83,11 +84,11 @@ type Operator struct {
8384
copiedCSVLister metadatalister.Lister
8485
ogQueueSet *queueinformer.ResourceQueueSet
8586
csvQueueSet *queueinformer.ResourceQueueSet
86-
olmConfigQueue workqueue.TypedRateLimitingInterface[any]
87+
olmConfigQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
8788
csvCopyQueueSet *queueinformer.ResourceQueueSet
8889
copiedCSVGCQueueSet *queueinformer.ResourceQueueSet
89-
nsQueueSet workqueue.TypedRateLimitingInterface[any]
90-
apiServiceQueue workqueue.TypedRateLimitingInterface[any]
90+
nsQueueSet workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
91+
apiServiceQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
9192
csvIndexers map[string]cache.Indexer
9293
recorder record.EventRecorder
9394
resolver install.StrategyResolverInterface
@@ -198,17 +199,17 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
198199
client: config.externalClient,
199200
ogQueueSet: queueinformer.NewEmptyResourceQueueSet(),
200201
csvQueueSet: queueinformer.NewEmptyResourceQueueSet(),
201-
olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
202-
workqueue.DefaultTypedControllerRateLimiter[any](),
203-
workqueue.TypedRateLimitingQueueConfig[any]{
202+
olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
203+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
204+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
204205
Name: "olmConfig",
205206
}),
206207

207208
csvCopyQueueSet: queueinformer.NewEmptyResourceQueueSet(),
208209
copiedCSVGCQueueSet: queueinformer.NewEmptyResourceQueueSet(),
209-
apiServiceQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
210-
workqueue.DefaultTypedControllerRateLimiter[any](),
211-
workqueue.TypedRateLimitingQueueConfig[any]{
210+
apiServiceQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
211+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
212+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
212213
Name: "apiservice",
213214
}),
214215
resolver: config.strategyResolver,
@@ -246,9 +247,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
246247
).Operators().V1alpha1().ClusterServiceVersions()
247248
informersByNamespace[namespace].CSVInformer = csvInformer
248249
op.lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, csvInformer.Lister())
249-
csvQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](
250-
workqueue.DefaultTypedControllerRateLimiter[any](),
251-
workqueue.TypedRateLimitingQueueConfig[any]{
250+
csvQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
251+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
252+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
252253
Name: fmt.Sprintf("%s/csv", namespace),
253254
})
254255
op.csvQueueSet.Set(namespace, csvQueue)
@@ -273,7 +274,11 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
273274
op.csvIndexers[namespace] = csvIndexer
274275

275276
// Register separate queue for copying csvs
276-
csvCopyQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), fmt.Sprintf("%s/csv-copy", namespace))
277+
csvCopyQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
278+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
279+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
280+
Name: fmt.Sprintf("%s/csv-copy", namespace),
281+
})
277282
op.csvCopyQueueSet.Set(namespace, csvCopyQueue)
278283
csvCopyQueueInformer, err := queueinformer.NewQueueInformer(
279284
ctx,
@@ -307,9 +312,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
307312
informersByNamespace[namespace].CopiedCSVLister = op.copiedCSVLister
308313

309314
// Register separate queue for gcing copied csvs
310-
copiedCSVGCQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](
311-
workqueue.DefaultTypedControllerRateLimiter[any](),
312-
workqueue.TypedRateLimitingQueueConfig[any]{
315+
copiedCSVGCQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
316+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
317+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
313318
Name: fmt.Sprintf("%s/csv-gc", namespace),
314319
})
315320
op.copiedCSVGCQueueSet.Set(namespace, copiedCSVGCQueue)
@@ -333,9 +338,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
333338
operatorGroupInformer := extInformerFactory.Operators().V1().OperatorGroups()
334339
informersByNamespace[namespace].OperatorGroupInformer = operatorGroupInformer
335340
op.lister.OperatorsV1().RegisterOperatorGroupLister(namespace, operatorGroupInformer.Lister())
336-
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](
337-
workqueue.DefaultTypedControllerRateLimiter[any](),
338-
workqueue.TypedRateLimitingQueueConfig[any]{
341+
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
342+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
343+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
339344
Name: fmt.Sprintf("%s/og", namespace),
340345
})
341346
op.ogQueueSet.Set(namespace, ogQueue)
@@ -522,9 +527,12 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
522527
logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String(), "index": idx})
523528
logger.Info("registering labeller")
524529

525-
queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{
526-
Name: gvr.String(),
527-
})
530+
queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
531+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
532+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
533+
Name: gvr.String(),
534+
},
535+
)
528536
queueInformer, err := queueinformer.NewQueueInformer(
529537
ctx,
530538
queueinformer.WithQueue(queue),
@@ -696,9 +704,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
696704
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), config.resyncPeriod()).Core().V1().Namespaces()
697705
informersByNamespace[metav1.NamespaceAll].NamespaceInformer = namespaceInformer
698706
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
699-
op.nsQueueSet = workqueue.NewTypedRateLimitingQueueWithConfig[any](
700-
workqueue.DefaultTypedControllerRateLimiter[any](),
701-
workqueue.TypedRateLimitingQueueConfig[any]{
707+
op.nsQueueSet = workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
708+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
709+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
702710
Name: "resolver",
703711
})
704712
namespaceInformer.Informer().AddEventHandler(
@@ -1665,7 +1673,7 @@ func (a *Operator) syncCopyCSV(obj interface{}) (syncError error) {
16651673
}
16661674

16671675
if err == nil {
1668-
go a.olmConfigQueue.AddAfter(olmConfig, time.Second*5)
1676+
go a.olmConfigQueue.AddAfter(kubestate.NewUpdateEvent(olmConfig), time.Second*5)
16691677
}
16701678

16711679
logger := a.logger.WithFields(logrus.Fields{

pkg/controller/operators/olm/operatorgroup.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/sha256"
66
"fmt"
7+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
78
"math/big"
89
"reflect"
910
"strings"
@@ -182,7 +183,7 @@ func (a *Operator) syncOperatorGroups(obj interface{}) error {
182183
logger.Debug("Requeueing out of sync namespaces")
183184
for _, ns := range outOfSyncNamespaces {
184185
logger.WithField("namespace", ns).Debug("requeueing")
185-
a.nsQueueSet.Add(ns)
186+
a.nsQueueSet.Add(kubestate.NewUpdateEvent(ns))
186187
}
187188

188189
// CSV requeue is handled by the succeeding sync in `annotateCSVs`
@@ -263,7 +264,7 @@ func (a *Operator) operatorGroupDeleted(obj interface{}) {
263264
logger.Debug("OperatorGroup deleted, requeueing out of sync namespaces")
264265
for _, ns := range op.Status.Namespaces {
265266
logger.WithField("namespace", ns).Debug("requeueing")
266-
a.nsQueueSet.Add(ns)
267+
a.nsQueueSet.Add(kubestate.NewUpdateEvent(ns))
267268
}
268269
}
269270

pkg/lib/kubestate/kubestate.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,13 @@ func (r resourceEvent) Resource() interface{} {
163163
return r.resource
164164
}
165165

166+
func NewUpdateEvent(resource interface{}) ResourceEvent {
167+
return resourceEvent{
168+
eventType: ResourceUpdated,
169+
resource: resource,
170+
}
171+
}
172+
166173
func NewResourceEvent(eventType ResourceEventType, resource interface{}) ResourceEvent {
167174
return resourceEvent{
168175
eventType: eventType,

pkg/lib/queueinformer/config.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
type queueInformerConfig struct {
1515
provider metrics.MetricsProvider
1616
logger *logrus.Logger
17-
queue workqueue.RateLimitingInterface
17+
queue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
1818
informer cache.SharedIndexInformer
1919
indexer cache.Indexer
2020
keyFunc KeyFunc
@@ -105,9 +105,9 @@ func defaultKeyFunc(obj interface{}) (string, bool) {
105105
func defaultConfig() *queueInformerConfig {
106106
return &queueInformerConfig{
107107
provider: metrics.NewMetricsNil(),
108-
queue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
109-
workqueue.DefaultTypedControllerRateLimiter[any](),
110-
workqueue.TypedRateLimitingQueueConfig[any]{
108+
queue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
109+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
110+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
111111
Name: "default",
112112
}),
113113
logger: logrus.New(),
@@ -130,7 +130,7 @@ func WithLogger(logger *logrus.Logger) Option {
130130
}
131131

132132
// WithQueue sets the queue used by a QueueInformer.
133-
func WithQueue(queue workqueue.RateLimitingInterface) Option {
133+
func WithQueue(queue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) Option {
134134
return func(config *queueInformerConfig) {
135135
config.queue = queue
136136
}

0 commit comments

Comments
 (0)