Skip to content

Commit 5140d19

Browse files
committed
Enable multiple namespaces sync if catsrc is updated in global ns
Currently, if CatalogSource is updated and connection is Ready, only the namespace where CatalogSource resides gets resynced. Now, a list of namespaces that contain Subscriptions that use updated CatalogSource will get resynced instead. Signed-off-by: Vu Dinh <[email protected]>
1 parent 812d184 commit 5140d19

File tree

2 files changed

+106
-32
lines changed

2 files changed

+106
-32
lines changed

pkg/controller/operators/catalog/operator.go

Lines changed: 51 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -66,24 +66,25 @@ const (
6666
type Operator struct {
6767
queueinformer.Operator
6868

69-
logger *logrus.Logger
70-
clock utilclock.Clock
71-
opClient operatorclient.ClientInterface
72-
client versioned.Interface
73-
dynamicClient dynamic.Interface
74-
lister operatorlister.OperatorLister
75-
catsrcQueueSet *queueinformer.ResourceQueueSet
76-
subQueueSet *queueinformer.ResourceQueueSet
77-
ipQueueSet *queueinformer.ResourceQueueSet
78-
nsResolveQueue workqueue.RateLimitingInterface
79-
namespace string
80-
sources *grpc.SourceStore
81-
sourcesLastUpdate sharedtime.SharedTime
82-
resolver resolver.Resolver
83-
reconciler reconciler.RegistryReconcilerFactory
84-
csvProvidedAPIsIndexer map[string]cache.Indexer
85-
clientAttenuator *scoped.ClientAttenuator
86-
serviceAccountQuerier *scoped.UserDefinedServiceAccountQuerier
69+
logger *logrus.Logger
70+
clock utilclock.Clock
71+
opClient operatorclient.ClientInterface
72+
client versioned.Interface
73+
dynamicClient dynamic.Interface
74+
lister operatorlister.OperatorLister
75+
catsrcQueueSet *queueinformer.ResourceQueueSet
76+
subQueueSet *queueinformer.ResourceQueueSet
77+
ipQueueSet *queueinformer.ResourceQueueSet
78+
nsResolveQueue workqueue.RateLimitingInterface
79+
namespace string
80+
sources *grpc.SourceStore
81+
sourcesLastUpdate sharedtime.SharedTime
82+
resolver resolver.Resolver
83+
reconciler reconciler.RegistryReconcilerFactory
84+
csvProvidedAPIsIndexer map[string]cache.Indexer
85+
catalogSubscriberIndexer map[string]cache.Indexer
86+
clientAttenuator *scoped.ClientAttenuator
87+
serviceAccountQuerier *scoped.UserDefinedServiceAccountQuerier
8788
}
8889

8990
type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)
@@ -124,20 +125,21 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
124125

125126
// Allocate the new instance of an Operator.
126127
op := &Operator{
127-
Operator: queueOperator,
128-
logger: logger,
129-
clock: clock,
130-
opClient: opClient,
131-
dynamicClient: dynamicClient,
132-
client: crClient,
133-
lister: lister,
134-
namespace: operatorNamespace,
135-
resolver: resolver.NewOperatorsV1alpha1Resolver(lister, crClient),
136-
catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
137-
subQueueSet: queueinformer.NewEmptyResourceQueueSet(),
138-
csvProvidedAPIsIndexer: map[string]cache.Indexer{},
139-
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient),
140-
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient),
128+
Operator: queueOperator,
129+
logger: logger,
130+
clock: clock,
131+
opClient: opClient,
132+
dynamicClient: dynamicClient,
133+
client: crClient,
134+
lister: lister,
135+
namespace: operatorNamespace,
136+
resolver: resolver.NewOperatorsV1alpha1Resolver(lister, crClient),
137+
catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
138+
subQueueSet: queueinformer.NewEmptyResourceQueueSet(),
139+
csvProvidedAPIsIndexer: map[string]cache.Indexer{},
140+
catalogSubscriberIndexer: map[string]cache.Indexer{},
141+
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient),
142+
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient),
141143
}
142144
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
143145
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now)
@@ -202,6 +204,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
202204
// Wire Subscriptions
203205
subInformer := crInformerFactory.Operators().V1alpha1().Subscriptions()
204206
op.lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, subInformer.Lister())
207+
if err := subInformer.Informer().AddIndexers(cache.Indexers{index.PresentCatalogIndexFuncKey: index.PresentCatalogIndexFunc}); err != nil {
208+
return nil, err
209+
}
210+
subIndexer := subInformer.Informer().GetIndexer()
211+
op.catalogSubscriberIndexer[namespace] = subIndexer
212+
205213
subQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s/subs", namespace))
206214
op.subQueueSet.Set(namespace, subQueue)
207215
subSyncer, err := subscription.NewSyncer(
@@ -339,6 +347,17 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {
339347

340348
switch state.State {
341349
case connectivity.Ready:
350+
if o.namespace == state.Key.Namespace {
351+
subs, err := index.CatalogSubscriberNamespaces(o.catalogSubscriberIndexer,
352+
state.Key.Name, state.Key.Namespace)
353+
354+
if err == nil {
355+
for _, ns := range subs {
356+
o.nsResolveQueue.Add(ns)
357+
}
358+
}
359+
}
360+
342361
o.nsResolveQueue.Add(state.Key.Namespace)
343362
default:
344363
if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil {

pkg/lib/index/catalog.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package indexer
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1"
7+
"k8s.io/client-go/tools/cache"
8+
)
9+
10+
const (
11+
// PresentCatalogIndexFuncKey is the recommended key to use for registering
12+
// the index func with an indexer.
13+
PresentCatalogIndexFuncKey string = "presentcatalogindexfunc"
14+
)
15+
16+
// PresentCatalogIndexFunc returns index from CatalogSource/CatalogSourceNamespace
17+
// of the given object (Subscription)
18+
func PresentCatalogIndexFunc(obj interface{}) ([]string, error) {
19+
indicies := []string{}
20+
21+
sub, ok := obj.(*v1alpha1.Subscription)
22+
if !ok {
23+
return indicies, fmt.Errorf("invalid object of type: %T", obj)
24+
}
25+
26+
indicies = append(indicies, fmt.Sprintf("%s/%s", sub.Spec.CatalogSource,
27+
sub.Spec.CatalogSourceNamespace))
28+
29+
return indicies, nil
30+
}
31+
32+
// CatalogSubscriberNamespaces returns the list of Suscriptions' name and namespace
33+
// (name/namespace as key and namespace as value) that uses the given CatalogSource (name/namespace)
34+
func CatalogSubscriberNamespaces(indexers map[string]cache.Indexer, name, namespace string) (map[string]string, error) {
35+
nsSet := map[string]string{}
36+
index := fmt.Sprintf("%s/%s", name, namespace)
37+
38+
for _, indexer := range indexers {
39+
subs, err := indexer.ByIndex(PresentCatalogIndexFuncKey, index)
40+
if err != nil {
41+
return nil, err
42+
}
43+
for _, item := range subs {
44+
s, ok := item.(*v1alpha1.Subscription)
45+
if !ok {
46+
continue
47+
}
48+
// Add to set
49+
key := fmt.Sprintf("%s/%s", s.GetName(), s.GetNamespace())
50+
nsSet[key] = s.GetNamespace()
51+
}
52+
}
53+
54+
return nsSet, nil
55+
}

0 commit comments

Comments
 (0)