Skip to content

Commit ba9df23

Browse files
Merge pull request #958 from perdasilva/catperf-4.15
[release-4.15] OCPBUGS-48697: Fix excessive catalog source snapshots cause severe performance regression
2 parents b500cb3 + af8db22 commit ba9df23

File tree

19 files changed

+157
-71
lines changed

19 files changed

+157
-71
lines changed

staging/operator-lifecycle-manager/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ require (
3535
github.com/spf13/cobra v1.7.0
3636
github.com/spf13/pflag v1.0.5
3737
github.com/stretchr/testify v1.8.4
38+
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
3839
golang.org/x/net v0.34.0
3940
golang.org/x/sync v0.10.0
4041
golang.org/x/time v0.3.0
@@ -216,7 +217,6 @@ require (
216217
go.uber.org/multierr v1.6.0 // indirect
217218
go.uber.org/zap v1.24.0 // indirect
218219
golang.org/x/crypto v0.32.0 // indirect
219-
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
220220
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
221221
golang.org/x/mod v0.17.0 // indirect
222222
golang.org/x/oauth2 v0.14.0 // indirect

staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ type Operator struct {
129129
bundleUnpackTimeout time.Duration
130130
clientFactory clients.Factory
131131
muInstallPlan sync.Mutex
132-
sourceInvalidator *resolver.RegistrySourceProvider
132+
resolverSourceProvider *resolver.RegistrySourceProvider
133+
operatorCacheProvider resolvercache.OperatorCacheProvider
133134
}
134135

135136
type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)
@@ -215,10 +216,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
215216
clientFactory: clients.NewFactory(validatingConfig),
216217
}
217218
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
218-
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger)
219-
resolverSourceProvider := NewOperatorGroupToggleSourceProvider(op.sourceInvalidator, logger, op.lister.OperatorsV1().OperatorGroupLister())
219+
op.resolverSourceProvider = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger)
220+
op.operatorCacheProvider = resolver.NewOperatorCacheProvider(lister, crClient, op.resolverSourceProvider, logger)
220221
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient, workloadUserID, opmImage, utilImage)
221-
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, resolverSourceProvider, logger)
222+
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.operatorCacheProvider, logger)
222223
op.resolver = resolver.NewInstrumentedResolver(res, metrics.RegisterDependencyResolutionSuccess, metrics.RegisterDependencyResolutionFailure)
223224

224225
// Wire OLM CR sharedIndexInformers
@@ -347,7 +348,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
347348
subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions, nil)),
348349
subscription.WithRegistryReconcilerFactory(op.reconciler),
349350
subscription.WithGlobalCatalogNamespace(op.namespace),
350-
subscription.WithSourceProvider(op.sourceInvalidator),
351+
subscription.WithOperatorCacheProvider(op.operatorCacheProvider),
351352
)
352353
if err != nil {
353354
return nil, err
@@ -763,10 +764,11 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {
763764

764765
o.logger.Infof("state.Key.Namespace=%s state.Key.Name=%s state.State=%s", state.Key.Namespace, state.Key.Name, state.State.String())
765766
metrics.RegisterCatalogSourceState(state.Key.Name, state.Key.Namespace, state.State)
767+
metrics.RegisterCatalogSourceSnapshotsTotal(state.Key.Name, state.Key.Namespace)
766768

767769
switch state.State {
768770
case connectivity.Ready:
769-
o.sourceInvalidator.Invalidate(resolvercache.SourceKey(state.Key))
771+
o.resolverSourceProvider.Invalidate(resolvercache.SourceKey(state.Key))
770772
if o.namespace == state.Key.Namespace {
771773
namespaces, err := index.CatalogSubscriberNamespaces(o.catalogSubscriberIndexer,
772774
state.Key.Name, state.Key.Namespace)
@@ -880,6 +882,7 @@ func (o *Operator) handleCatSrcDeletion(obj interface{}) {
880882
o.logger.WithField("source", sourceKey).Info("removed client for deleted catalogsource")
881883

882884
metrics.DeleteCatalogSourceStateMetric(catsrc.GetName(), catsrc.GetNamespace())
885+
metrics.DeleteCatalogSourceSnapshotsTotal(catsrc.GetName(), catsrc.GetNamespace())
883886
}
884887

885888
func validateSourceType(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, _ error) {

staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type syncerConfig struct {
2727
reconcilers kubestate.ReconcilerChain
2828
registryReconcilerFactory reconciler.RegistryReconcilerFactory
2929
globalCatalogNamespace string
30-
sourceProvider resolverCache.SourceProvider
30+
operatorCacheProvider resolverCache.OperatorCacheProvider
3131
}
3232

3333
// SyncerOption is a configuration option for a subscription syncer.
@@ -130,9 +130,9 @@ func WithGlobalCatalogNamespace(namespace string) SyncerOption {
130130
}
131131
}
132132

133-
func WithSourceProvider(provider resolverCache.SourceProvider) SyncerOption {
133+
func WithOperatorCacheProvider(provider resolverCache.OperatorCacheProvider) SyncerOption {
134134
return func(config *syncerConfig) {
135-
config.sourceProvider = provider
135+
config.operatorCacheProvider = provider
136136
}
137137
}
138138

staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/reconciler.go

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ type catalogHealthReconciler struct {
6161
catalogLister listers.CatalogSourceLister
6262
registryReconcilerFactory reconciler.RegistryReconcilerFactory
6363
globalCatalogNamespace string
64-
sourceProvider cache.SourceProvider
64+
operatorCacheProvider cache.OperatorCacheProvider
65+
logger logrus.StdLogger
6566
}
6667

6768
// Reconcile reconciles subscription catalog health conditions.
@@ -130,21 +131,16 @@ func (c *catalogHealthReconciler) Reconcile(ctx context.Context, in kubestate.St
130131
// updateDeprecatedStatus adds deprecation status conditions to the subscription when present in the cache entry then
131132
// returns a bool value of true if any changes to the existing subscription have occurred.
132133
func (c *catalogHealthReconciler) updateDeprecatedStatus(ctx context.Context, sub *v1alpha1.Subscription) (bool, error) {
133-
if c.sourceProvider == nil {
134+
if c.operatorCacheProvider == nil {
134135
return false, nil
135136
}
136-
source, ok := c.sourceProvider.Sources(sub.Spec.CatalogSourceNamespace)[cache.SourceKey{
137+
138+
entries := c.operatorCacheProvider.Namespaced(sub.Spec.CatalogSourceNamespace).Catalog(cache.SourceKey{
137139
Name: sub.Spec.CatalogSource,
138140
Namespace: sub.Spec.CatalogSourceNamespace,
139-
}]
140-
if !ok {
141-
return false, nil
142-
}
143-
snapshot, err := source.Snapshot(ctx)
144-
if err != nil {
145-
return false, err
146-
}
147-
if len(snapshot.Entries) == 0 {
141+
}).Find(cache.PkgPredicate(sub.Spec.Package), cache.ChannelPredicate(sub.Spec.Channel))
142+
143+
if len(entries) == 0 {
148144
return false, nil
149145
}
150146

@@ -153,12 +149,9 @@ func (c *catalogHealthReconciler) updateDeprecatedStatus(ctx context.Context, su
153149
var deprecations *cache.Deprecations
154150

155151
found := false
156-
for _, entry := range snapshot.Entries {
152+
for _, entry := range entries {
157153
// Find the cache entry that matches this subscription
158-
if entry.SourceInfo == nil || entry.Package() != sub.Spec.Package {
159-
continue
160-
}
161-
if sub.Spec.Channel != "" && entry.Channel() != sub.Spec.Channel {
154+
if entry.SourceInfo == nil {
162155
continue
163156
}
164157
if sub.Status.InstalledCSV != entry.Name {

staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/syncer.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/operator-framework/api/pkg/operators/install"
1414
"github.com/operator-framework/api/pkg/operators/v1alpha1"
1515
listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
16-
resolverCache "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
1716
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
1817
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
1918
"github.com/operator-framework/operator-lifecycle-manager/pkg/metrics"
@@ -35,7 +34,6 @@ type subscriptionSyncer struct {
3534
installPlanLister listers.InstallPlanLister
3635
globalCatalogNamespace string
3736
notify kubestate.NotifyFunc
38-
sourceProvider resolverCache.SourceProvider
3937
}
4038

4139
// now returns the Syncer's current time.
@@ -216,7 +214,6 @@ func newSyncerWithConfig(ctx context.Context, config *syncerConfig) (kubestate.S
216214
reconcilers: config.reconcilers,
217215
subscriptionCache: config.subscriptionInformer.GetIndexer(),
218216
installPlanLister: config.lister.OperatorsV1alpha1().InstallPlanLister(),
219-
sourceProvider: config.sourceProvider,
220217
notify: func(event kubestate.ResourceEvent) {
221218
// Notify Subscriptions by enqueuing to the Subscription queue.
222219
config.subscriptionQueue.Add(event)
@@ -237,7 +234,8 @@ func newSyncerWithConfig(ctx context.Context, config *syncerConfig) (kubestate.S
237234
catalogLister: config.lister.OperatorsV1alpha1().CatalogSourceLister(),
238235
registryReconcilerFactory: config.registryReconcilerFactory,
239236
globalCatalogNamespace: config.globalCatalogNamespace,
240-
sourceProvider: config.sourceProvider,
237+
operatorCacheProvider: config.operatorCacheProvider,
238+
logger: config.logger,
241239
},
242240
}
243241
s.reconcilers = append(defaultReconcilers, s.reconcilers...)

staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/solver"
1818
"github.com/operator-framework/operator-registry/pkg/api"
1919
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
20+
21+
"golang.org/x/exp/slices"
2022
)
2123

2224
// constraintProvider knows how to provide solver constraints for a given cache entry.
@@ -29,15 +31,15 @@ type constraintProvider interface {
2931
}
3032

3133
type Resolver struct {
32-
cache *cache.Cache
34+
cache cache.OperatorCacheProvider
3335
log logrus.FieldLogger
3436
pc *predicateConverter
3537
systemConstraintsProvider constraintProvider
3638
}
3739

38-
func NewDefaultResolver(rcp cache.SourceProvider, sourcePriorityProvider cache.SourcePriorityProvider, logger logrus.FieldLogger) *Resolver {
40+
func NewDefaultResolver(cacheProvider cache.OperatorCacheProvider, logger logrus.FieldLogger) *Resolver {
3941
return &Resolver{
40-
cache: cache.New(rcp, cache.WithLogger(logger), cache.WithSourcePriorityProvider(sourcePriorityProvider)),
42+
cache: cacheProvider,
4143
log: logger,
4244
pc: &predicateConverter{
4345
celEnv: constraints.NewCelEnvironment(),
@@ -513,11 +515,13 @@ func (r *Resolver) addInvariants(namespacedCache cache.MultiCatalogOperatorFinde
513515
}
514516

515517
for gvk, is := range gvkConflictToVariable {
518+
slices.Sort(is)
516519
s := NewSingleAPIProviderVariable(gvk.Group, gvk.Version, gvk.Kind, is)
517520
variables[s.Identifier()] = s
518521
}
519522

520523
for pkg, is := range packageConflictToVariable {
524+
slices.Sort(is)
521525
s := NewSinglePackageInstanceVariable(pkg, is)
522526
variables[s.Identifier()] = s
523527
}

staging/operator-lifecycle-manager/pkg/controller/registry/resolver/solver/lit_mapping.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"github.com/go-air/gini/inter"
88
"github.com/go-air/gini/logic"
99
"github.com/go-air/gini/z"
10+
11+
"golang.org/x/exp/slices"
1012
)
1113

1214
type DuplicateIdentifier Identifier
@@ -203,5 +205,17 @@ func (d *litMapping) Conflicts(g inter.Assumable) []AppliedConstraint {
203205
as = append(as, a)
204206
}
205207
}
208+
slices.SortFunc(as, func(a, b AppliedConstraint) int {
209+
return strCmp(a.String(), b.String())
210+
})
206211
return as
207212
}
213+
214+
func strCmp(str1, str2 string) int {
215+
if str1 < str2 {
216+
return -1
217+
} else if str1 > str2 {
218+
return 1
219+
}
220+
return 0
221+
}

staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_registry.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
1313
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
1414
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
15+
"github.com/operator-framework/operator-lifecycle-manager/pkg/metrics"
1516
"github.com/operator-framework/operator-registry/pkg/api"
1617
"github.com/operator-framework/operator-registry/pkg/client"
1718
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
@@ -143,6 +144,9 @@ type registrySource struct {
143144
}
144145

145146
func (s *registrySource) Snapshot(ctx context.Context) (*cache.Snapshot, error) {
147+
s.logger.Printf("requesting snapshot for catalog source %s/%s", s.key.Namespace, s.key.Name)
148+
metrics.IncrementCatalogSourceSnapshotsTotal(s.key.Name, s.key.Namespace)
149+
146150
// Fetching default channels this way makes many round trips
147151
// -- may need to either add a new API to fetch all at once,
148152
// or embed the information into Bundle.

staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (pp catsrcPriorityProvider) Priority(key cache.SourceKey) int {
5656
return catsrc.Spec.Priority
5757
}
5858

59-
func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versioned.Interface, globalCatalogNamespace string, sourceProvider cache.SourceProvider, log logrus.FieldLogger) *OperatorStepResolver {
59+
func NewOperatorCacheProvider(lister operatorlister.OperatorLister, client versioned.Interface, sourceProvider cache.SourceProvider, log logrus.FieldLogger) cache.OperatorCacheProvider {
6060
cacheSourceProvider := &mergedSourceProvider{
6161
sps: []cache.SourceProvider{
6262
sourceProvider,
@@ -70,13 +70,19 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio
7070
},
7171
},
7272
}
73+
catSrcPriorityProvider := &catsrcPriorityProvider{lister: lister.OperatorsV1alpha1().CatalogSourceLister()}
74+
75+
return cache.New(cacheSourceProvider, cache.WithLogger(log), cache.WithSourcePriorityProvider(catSrcPriorityProvider))
76+
}
77+
78+
func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versioned.Interface, globalCatalogNamespace string, opCacheProvider cache.OperatorCacheProvider, log logrus.FieldLogger) *OperatorStepResolver {
7379
stepResolver := &OperatorStepResolver{
7480
subLister: lister.OperatorsV1alpha1().SubscriptionLister(),
7581
csvLister: lister.OperatorsV1alpha1().ClusterServiceVersionLister(),
7682
ogLister: lister.OperatorsV1().OperatorGroupLister(),
7783
client: client,
7884
globalCatalogNamespace: globalCatalogNamespace,
79-
resolver: NewDefaultResolver(cacheSourceProvider, catsrcPriorityProvider{lister: lister.OperatorsV1alpha1().CatalogSourceLister()}, log),
85+
resolver: NewDefaultResolver(opCacheProvider, log),
8086
log: log,
8187
}
8288

staging/operator-lifecycle-manager/pkg/metrics/metrics.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,14 @@ var (
152152
[]string{NamespaceLabel, NameLabel},
153153
)
154154

155+
catalogSourceSnapshotsTotal = prometheus.NewCounterVec(
156+
prometheus.CounterOpts{
157+
Name: "catalog_source_snapshots_total",
158+
Help: "The number of times the catalog operator has requested a snapshot of data from a catalog source",
159+
},
160+
[]string{NamespaceLabel, NameLabel},
161+
)
162+
155163
// exported since it's not handled by HandleMetrics
156164
CSVUpgradeCount = prometheus.NewCounter(
157165
prometheus.CounterOpts{
@@ -250,6 +258,7 @@ func RegisterCatalog() {
250258
prometheus.MustRegister(subscriptionCount)
251259
prometheus.MustRegister(catalogSourceCount)
252260
prometheus.MustRegister(catalogSourceReady)
261+
prometheus.MustRegister(catalogSourceSnapshotsTotal)
253262
prometheus.MustRegister(SubscriptionSyncCount)
254263
prometheus.MustRegister(dependencyResolutionSummary)
255264
prometheus.MustRegister(installPlanWarningCount)
@@ -272,6 +281,18 @@ func DeleteCatalogSourceStateMetric(name, namespace string) {
272281
catalogSourceReady.DeleteLabelValues(namespace, name)
273282
}
274283

284+
func RegisterCatalogSourceSnapshotsTotal(name, namespace string) {
285+
catalogSourceSnapshotsTotal.WithLabelValues(namespace, name).Add(0)
286+
}
287+
288+
func IncrementCatalogSourceSnapshotsTotal(name, namespace string) {
289+
catalogSourceSnapshotsTotal.WithLabelValues(namespace, name).Inc()
290+
}
291+
292+
func DeleteCatalogSourceSnapshotsTotal(name, namespace string) {
293+
catalogSourceSnapshotsTotal.DeleteLabelValues(namespace, name)
294+
}
295+
275296
func DeleteCSVMetric(oldCSV *operatorsv1alpha1.ClusterServiceVersion) {
276297
// Delete the old CSV metrics
277298
csvAbnormal.DeleteLabelValues(oldCSV.Namespace, oldCSV.Name, oldCSV.Spec.Version.String(), string(oldCSV.Status.Phase), string(oldCSV.Status.Reason))

0 commit comments

Comments
 (0)