Skip to content

Commit 8d52709

Browse files
Merge pull request #1125 from dinhxuanvu/sync-improve
Bug 1779313: Enable multiple namespaces sync if catsrc is updated in global ns
2 parents b549266 + 45f0689 commit 8d52709

File tree

4 files changed

+218
-37
lines changed

4 files changed

+218
-37
lines changed

pkg/controller/operators/catalog/operator.go

Lines changed: 51 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -66,24 +66,25 @@ const (
6666
type Operator struct {
6767
queueinformer.Operator
6868

69-
logger *logrus.Logger
70-
clock utilclock.Clock
71-
opClient operatorclient.ClientInterface
72-
client versioned.Interface
73-
dynamicClient dynamic.Interface
74-
lister operatorlister.OperatorLister
75-
catsrcQueueSet *queueinformer.ResourceQueueSet
76-
subQueueSet *queueinformer.ResourceQueueSet
77-
ipQueueSet *queueinformer.ResourceQueueSet
78-
nsResolveQueue workqueue.RateLimitingInterface
79-
namespace string
80-
sources *grpc.SourceStore
81-
sourcesLastUpdate sharedtime.SharedTime
82-
resolver resolver.Resolver
83-
reconciler reconciler.RegistryReconcilerFactory
84-
csvProvidedAPIsIndexer map[string]cache.Indexer
85-
clientAttenuator *scoped.ClientAttenuator
86-
serviceAccountQuerier *scoped.UserDefinedServiceAccountQuerier
69+
logger *logrus.Logger
70+
clock utilclock.Clock
71+
opClient operatorclient.ClientInterface
72+
client versioned.Interface
73+
dynamicClient dynamic.Interface
74+
lister operatorlister.OperatorLister
75+
catsrcQueueSet *queueinformer.ResourceQueueSet
76+
subQueueSet *queueinformer.ResourceQueueSet
77+
ipQueueSet *queueinformer.ResourceQueueSet
78+
nsResolveQueue workqueue.RateLimitingInterface
79+
namespace string
80+
sources *grpc.SourceStore
81+
sourcesLastUpdate sharedtime.SharedTime
82+
resolver resolver.Resolver
83+
reconciler reconciler.RegistryReconcilerFactory
84+
csvProvidedAPIsIndexer map[string]cache.Indexer
85+
catalogSubscriberIndexer map[string]cache.Indexer
86+
clientAttenuator *scoped.ClientAttenuator
87+
serviceAccountQuerier *scoped.UserDefinedServiceAccountQuerier
8788
}
8889

8990
type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)
@@ -124,20 +125,21 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
124125

125126
// Allocate the new instance of an Operator.
126127
op := &Operator{
127-
Operator: queueOperator,
128-
logger: logger,
129-
clock: clock,
130-
opClient: opClient,
131-
dynamicClient: dynamicClient,
132-
client: crClient,
133-
lister: lister,
134-
namespace: operatorNamespace,
135-
resolver: resolver.NewOperatorsV1alpha1Resolver(lister, crClient),
136-
catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
137-
subQueueSet: queueinformer.NewEmptyResourceQueueSet(),
138-
csvProvidedAPIsIndexer: map[string]cache.Indexer{},
139-
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient),
140-
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient),
128+
Operator: queueOperator,
129+
logger: logger,
130+
clock: clock,
131+
opClient: opClient,
132+
dynamicClient: dynamicClient,
133+
client: crClient,
134+
lister: lister,
135+
namespace: operatorNamespace,
136+
resolver: resolver.NewOperatorsV1alpha1Resolver(lister, crClient),
137+
catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
138+
subQueueSet: queueinformer.NewEmptyResourceQueueSet(),
139+
csvProvidedAPIsIndexer: map[string]cache.Indexer{},
140+
catalogSubscriberIndexer: map[string]cache.Indexer{},
141+
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient),
142+
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient),
141143
}
142144
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
143145
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now)
@@ -202,6 +204,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
202204
// Wire Subscriptions
203205
subInformer := crInformerFactory.Operators().V1alpha1().Subscriptions()
204206
op.lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, subInformer.Lister())
207+
if err := subInformer.Informer().AddIndexers(cache.Indexers{index.PresentCatalogIndexFuncKey: index.PresentCatalogIndexFunc}); err != nil {
208+
return nil, err
209+
}
210+
subIndexer := subInformer.Informer().GetIndexer()
211+
op.catalogSubscriberIndexer[namespace] = subIndexer
212+
205213
subQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s/subs", namespace))
206214
op.subQueueSet.Set(namespace, subQueue)
207215
subSyncer, err := subscription.NewSyncer(
@@ -339,6 +347,17 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {
339347

340348
switch state.State {
341349
case connectivity.Ready:
350+
if o.namespace == state.Key.Namespace {
351+
namespaces, err := index.CatalogSubscriberNamespaces(o.catalogSubscriberIndexer,
352+
state.Key.Name, state.Key.Namespace)
353+
354+
if err == nil {
355+
for ns := range namespaces {
356+
o.nsResolveQueue.Add(ns)
357+
}
358+
}
359+
}
360+
342361
o.nsResolveQueue.Add(state.Key.Namespace)
343362
default:
344363
if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil {

pkg/lib/index/catalog.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package indexer
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1"
7+
"k8s.io/client-go/tools/cache"
8+
)
9+
10+
const (
11+
// PresentCatalogIndexFuncKey is the recommended key to use for registering
12+
// the index func with an indexer.
13+
PresentCatalogIndexFuncKey string = "presentcatalogindexfunc"
14+
)
15+
16+
// PresentCatalogIndexFunc returns index from CatalogSource/CatalogSourceNamespace
17+
// of the given object (Subscription)
18+
func PresentCatalogIndexFunc(obj interface{}) ([]string, error) {
19+
sub, ok := obj.(*v1alpha1.Subscription)
20+
if !ok {
21+
return []string{""}, fmt.Errorf("invalid object of type: %T", obj)
22+
}
23+
24+
if sub.Spec.CatalogSource != "" && sub.Spec.CatalogSourceNamespace != "" {
25+
return []string{sub.Spec.CatalogSource + "/" + sub.Spec.CatalogSourceNamespace}, nil
26+
}
27+
28+
return []string{""}, nil
29+
}
30+
31+
// CatalogSubscriberNamespaces returns the list of namespace (as a map with namespace as key)
32+
// which has Suscriptions(s) that subscribe(s) to a given CatalogSource (name/namespace)
33+
func CatalogSubscriberNamespaces(indexers map[string]cache.Indexer, name, namespace string) (map[string]struct{}, error) {
34+
nsSet := map[string]struct{}{}
35+
index := fmt.Sprintf("%s/%s", name, namespace)
36+
37+
for _, indexer := range indexers {
38+
subs, err := indexer.ByIndex(PresentCatalogIndexFuncKey, index)
39+
if err != nil {
40+
return nil, err
41+
}
42+
for _, item := range subs {
43+
s, ok := item.(*v1alpha1.Subscription)
44+
if !ok {
45+
continue
46+
}
47+
// Add to set
48+
nsSet[s.GetNamespace()] = struct{}{}
49+
}
50+
}
51+
52+
return nsSet, nil
53+
}

test/e2e/catalog_e2e_test.go

Lines changed: 111 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,115 @@ func TestCatalogLoadingBetweenRestarts(t *testing.T) {
8181
t.Logf("Catalog source sucessfully loaded after rescale")
8282
}
8383

84+
func TestGlobalCatalogUpdateTriggersSubscriptionSync(t *testing.T) {
85+
defer cleaner.NotifyTestComplete(t, true)
86+
87+
globalNS := operatorNamespace
88+
c := newKubeClient(t)
89+
crc := newCRClient(t)
90+
91+
// Determine which namespace is global. Should be `openshift-marketplace` for OCP 4.2+.
92+
// Locally it is `olm`
93+
namespaces, _ := c.KubernetesInterface().CoreV1().Namespaces().List(metav1.ListOptions{})
94+
for _, ns := range namespaces.Items {
95+
if ns.GetName() == "openshift-marketplace" {
96+
globalNS = "openshift-marketplace"
97+
}
98+
}
99+
100+
mainPackageName := genName("nginx-")
101+
102+
mainPackageStable := fmt.Sprintf("%s-stable", mainPackageName)
103+
mainPackageReplacement := fmt.Sprintf("%s-replacement", mainPackageStable)
104+
105+
stableChannel := "stable"
106+
107+
mainNamedStrategy := newNginxInstallStrategy(genName("dep-"), nil, nil)
108+
109+
crdPlural := genName("ins-")
110+
111+
mainCRD := newCRD(crdPlural)
112+
mainCSV := newCSV(mainPackageStable, testNamespace, "", semver.MustParse("0.1.0"), []apiextensions.CustomResourceDefinition{mainCRD}, nil, mainNamedStrategy)
113+
replacementCSV := newCSV(mainPackageReplacement, testNamespace, mainPackageStable, semver.MustParse("0.2.0"), []apiextensions.CustomResourceDefinition{mainCRD}, nil, mainNamedStrategy)
114+
115+
mainCatalogName := genName("mock-ocs-main-")
116+
117+
// Create separate manifests for each CatalogSource
118+
mainManifests := []registry.PackageManifest{
119+
{
120+
PackageName: mainPackageName,
121+
Channels: []registry.PackageChannel{
122+
{Name: stableChannel, CurrentCSVName: mainPackageStable},
123+
},
124+
DefaultChannelName: stableChannel,
125+
},
126+
}
127+
128+
// Create the initial catalogsource
129+
createInternalCatalogSource(t, c, crc, mainCatalogName, globalNS, mainManifests, []apiextensions.CustomResourceDefinition{mainCRD}, []v1alpha1.ClusterServiceVersion{mainCSV})
130+
131+
// Attempt to get the catalog source before creating install plan
132+
_, err := fetchCatalogSource(t, crc, mainCatalogName, globalNS, catalogSourceRegistryPodSynced)
133+
require.NoError(t, err)
134+
135+
subscriptionSpec := &v1alpha1.SubscriptionSpec{
136+
CatalogSource: mainCatalogName,
137+
CatalogSourceNamespace: globalNS,
138+
Package: mainPackageName,
139+
Channel: stableChannel,
140+
StartingCSV: mainCSV.GetName(),
141+
InstallPlanApproval: v1alpha1.ApprovalManual,
142+
}
143+
144+
// Create Subscription
145+
subscriptionName := genName("sub-")
146+
createSubscriptionForCatalogWithSpec(t, crc, testNamespace, subscriptionName, subscriptionSpec)
147+
148+
subscription, err := fetchSubscription(t, crc, testNamespace, subscriptionName, subscriptionHasInstallPlanChecker)
149+
require.NoError(t, err)
150+
require.NotNil(t, subscription)
151+
152+
installPlanName := subscription.Status.Install.Name
153+
requiresApprovalChecker := buildInstallPlanPhaseCheckFunc(v1alpha1.InstallPlanPhaseRequiresApproval)
154+
fetchedInstallPlan, err := fetchInstallPlan(t, crc, installPlanName, requiresApprovalChecker)
155+
require.NoError(t, err)
156+
157+
fetchedInstallPlan.Spec.Approved = true
158+
_, err = crc.OperatorsV1alpha1().InstallPlans(testNamespace).Update(fetchedInstallPlan)
159+
require.NoError(t, err)
160+
161+
_, err = awaitCSV(t, crc, testNamespace, mainCSV.GetName(), csvSucceededChecker)
162+
require.NoError(t, err)
163+
164+
// Update manifest
165+
mainManifests = []registry.PackageManifest{
166+
{
167+
PackageName: mainPackageName,
168+
Channels: []registry.PackageChannel{
169+
{Name: stableChannel, CurrentCSVName: replacementCSV.GetName()},
170+
},
171+
DefaultChannelName: stableChannel,
172+
},
173+
}
174+
175+
// Update catalog configmap
176+
updateInternalCatalog(t, c, crc, mainCatalogName, globalNS, []apiextensions.CustomResourceDefinition{mainCRD}, []v1alpha1.ClusterServiceVersion{mainCSV, replacementCSV}, mainManifests)
177+
178+
// Get updated catalogsource
179+
fetchedUpdatedCatalog, err := fetchCatalogSource(t, crc, mainCatalogName, globalNS, catalogSourceRegistryPodSynced)
180+
require.NoError(t, err)
181+
182+
subscription, err = fetchSubscription(t, crc, testNamespace, subscriptionName, subscriptionStateUpgradePendingChecker)
183+
require.NoError(t, err)
184+
require.NotNil(t, subscription)
185+
186+
// Ensure the timing
187+
catalogConnState := fetchedUpdatedCatalog.Status.GRPCConnectionState
188+
subUpdatedTime := subscription.Status.LastUpdated
189+
timeLapse := subUpdatedTime.Sub(catalogConnState.LastConnectTime.Time).Seconds()
190+
require.True(t, timeLapse < 60)
191+
}
192+
84193
func TestConfigMapUpdateTriggersRegistryPodRollout(t *testing.T) {
85194
defer cleaner.NotifyTestComplete(t, true)
86195

@@ -153,8 +262,8 @@ func TestConfigMapUpdateTriggersRegistryPodRollout(t *testing.T) {
153262
fetchedUpdatedCatalog, err := fetchCatalogSource(t, crc, mainCatalogName, testNamespace, func(catalog *v1alpha1.CatalogSource) bool {
154263
before := fetchedInitialCatalog.Status.ConfigMapResource
155264
after := catalog.Status.ConfigMapResource
156-
if after != nil && before.LastUpdateTime.Before(&after.LastUpdateTime) &&
157-
after.ResourceVersion != before.ResourceVersion {
265+
if after != nil && before.LastUpdateTime.Before(&after.LastUpdateTime) &&
266+
after.ResourceVersion != before.ResourceVersion {
158267
fmt.Println("catalog updated")
159268
return true
160269
}

test/e2e/subscription_e2e_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1491,7 +1491,7 @@ func updateInternalCatalog(t *testing.T, c operatorclient.ClientInterface, crc v
14911491
require.NoError(t, err)
14921492

14931493
// Get initial configmap
1494-
configMap, err := c.KubernetesInterface().CoreV1().ConfigMaps(testNamespace).Get(fetchedInitialCatalog.Spec.ConfigMap, metav1.GetOptions{})
1494+
configMap, err := c.KubernetesInterface().CoreV1().ConfigMaps(namespace).Get(fetchedInitialCatalog.Spec.ConfigMap, metav1.GetOptions{})
14951495
require.NoError(t, err)
14961496

14971497
// Update package to point to new csv
@@ -1515,11 +1515,11 @@ func updateInternalCatalog(t *testing.T, c operatorclient.ClientInterface, crc v
15151515
configMap.Data[registry.ConfigMapCSVName] = string(csvsRaw)
15161516

15171517
// Update configmap
1518-
_, err = c.KubernetesInterface().CoreV1().ConfigMaps(testNamespace).Update(configMap)
1518+
_, err = c.KubernetesInterface().CoreV1().ConfigMaps(namespace).Update(configMap)
15191519
require.NoError(t, err)
15201520

15211521
// wait for catalog to update
1522-
_, err = fetchCatalogSource(t, crc, catalogSourceName, testNamespace, func(catalog *v1alpha1.CatalogSource) bool {
1522+
_, err = fetchCatalogSource(t, crc, catalogSourceName, namespace, func(catalog *v1alpha1.CatalogSource) bool {
15231523
before := fetchedInitialCatalog.Status.ConfigMapResource
15241524
after := catalog.Status.ConfigMapResource
15251525
if after != nil && after.LastUpdateTime.After(before.LastUpdateTime.Time) && after.ResourceVersion != before.ResourceVersion {

0 commit comments

Comments
 (0)