From e41dba4235d33f8bfa42cce364a88dbb07590b1e Mon Sep 17 00:00:00 2001 From: Joe Lanford Date: Wed, 15 Jan 2025 20:23:28 -0500 Subject: [PATCH 1/2] add log and metric to instrument count of catalog source snapshots Signed-off-by: Joe Lanford --- pkg/controller/operators/catalog/operator.go | 2 ++ .../registry/resolver/source_registry.go | 4 ++++ pkg/metrics/metrics.go | 21 +++++++++++++++++++ 3 files changed, 27 insertions(+) diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 250282cfb3..edfdfbe884 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -781,6 +781,7 @@ func (o *Operator) syncSourceState(state grpc.SourceState) { o.logger.Infof("state.Key.Namespace=%s state.Key.Name=%s state.State=%s", state.Key.Namespace, state.Key.Name, state.State.String()) metrics.RegisterCatalogSourceState(state.Key.Name, state.Key.Namespace, state.State) + metrics.RegisterCatalogSourceSnapshotsTotal(state.Key.Name, state.Key.Namespace) switch state.State { case connectivity.Ready: @@ -896,6 +897,7 @@ func (o *Operator) handleCatSrcDeletion(obj interface{}) { o.logger.WithField("source", sourceKey).Info("removed client for deleted catalogsource") metrics.DeleteCatalogSourceStateMetric(catsrc.GetName(), catsrc.GetNamespace()) + metrics.DeleteCatalogSourceSnapshotsTotal(catsrc.GetName(), catsrc.GetNamespace()) } func validateSourceType(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, _ error) { diff --git a/pkg/controller/registry/resolver/source_registry.go b/pkg/controller/registry/resolver/source_registry.go index fe193beae8..f20cab0eba 100644 --- a/pkg/controller/registry/resolver/source_registry.go +++ b/pkg/controller/registry/resolver/source_registry.go @@ -12,6 +12,7 @@ import ( v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache" + "github.com/operator-framework/operator-lifecycle-manager/pkg/metrics" "github.com/operator-framework/operator-registry/pkg/api" "github.com/operator-framework/operator-registry/pkg/client" opregistry "github.com/operator-framework/operator-registry/pkg/registry" @@ -143,6 +144,9 @@ type registrySource struct { } func (s *registrySource) Snapshot(ctx context.Context) (*cache.Snapshot, error) { + s.logger.Printf("requesting snapshot for catalog source %s/%s", s.key.Namespace, s.key.Name) + metrics.IncrementCatalogSourceSnapshotsTotal(s.key.Name, s.key.Namespace) + // Fetching default channels this way makes many round trips // -- may need to either add a new API to fetch all at once, // or embed the information into Bundle. diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 7512d87f72..0369a41a24 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -152,6 +152,14 @@ var ( []string{NamespaceLabel, NameLabel}, ) + catalogSourceSnapshotsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "catalog_source_snapshots_total", + Help: "The number of times the catalog operator has requested a snapshot of data from a catalog source", + }, + []string{NamespaceLabel, NameLabel}, + ) + // exported since it's not handled by HandleMetrics CSVUpgradeCount = prometheus.NewCounter( prometheus.CounterOpts{ @@ -250,6 +258,7 @@ func RegisterCatalog() { prometheus.MustRegister(subscriptionCount) prometheus.MustRegister(catalogSourceCount) prometheus.MustRegister(catalogSourceReady) + prometheus.MustRegister(catalogSourceSnapshotsTotal) prometheus.MustRegister(SubscriptionSyncCount) prometheus.MustRegister(dependencyResolutionSummary) prometheus.MustRegister(installPlanWarningCount) @@ -272,6 +281,18 @@ func DeleteCatalogSourceStateMetric(name, namespace string) { catalogSourceReady.DeleteLabelValues(namespace, name) } +func RegisterCatalogSourceSnapshotsTotal(name, namespace string) { + catalogSourceSnapshotsTotal.WithLabelValues(namespace, name).Add(0) +} + +func IncrementCatalogSourceSnapshotsTotal(name, namespace string) { + catalogSourceSnapshotsTotal.WithLabelValues(namespace, name).Inc() +} + +func DeleteCatalogSourceSnapshotsTotal(name, namespace string) { + catalogSourceSnapshotsTotal.DeleteLabelValues(namespace, name) +} + func DeleteCSVMetric(oldCSV *operatorsv1alpha1.ClusterServiceVersion) { // Delete the old CSV metrics csvAbnormal.DeleteLabelValues(oldCSV.Namespace, oldCSV.Name, oldCSV.Spec.Version.String(), string(oldCSV.Status.Phase), string(oldCSV.Status.Reason)) From 0684f5b6c2e0ed0ba6ee78770994e3ab26ba0775 Mon Sep 17 00:00:00 2001 From: Joe Lanford Date: Wed, 15 Jan 2025 17:23:33 -0500 Subject: [PATCH 2/2] use operator cache provider for deprecation updates to limit calls to GRPC server Signed-off-by: Joe Lanford --- pkg/controller/operators/catalog/operator.go | 6 +++-- .../operators/catalog/subscription/config.go | 6 ++--- .../catalog/subscription/reconciler.go | 27 +++++++------------ .../operators/catalog/subscription/syncer.go | 6 ++--- pkg/controller/registry/resolver/resolver.go | 6 ++--- .../registry/resolver/step_resolver.go | 10 +++++-- 6 files changed, 30 insertions(+), 31 deletions(-) diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index edfdfbe884..f9eaeb5eeb 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -131,6 +131,7 @@ type Operator struct { clientFactory clients.Factory muInstallPlan sync.Mutex resolverSourceProvider *resolver.RegistrySourceProvider + operatorCacheProvider resolvercache.OperatorCacheProvider } type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error) @@ -217,8 +218,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo } op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState) op.resolverSourceProvider = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger) + op.operatorCacheProvider = resolver.NewOperatorCacheProvider(lister, crClient, op.resolverSourceProvider, logger) op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient, workloadUserID, opmImage, utilImage) - res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.resolverSourceProvider, logger) + res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.operatorCacheProvider, logger) op.resolver = resolver.NewInstrumentedResolver(res, metrics.RegisterDependencyResolutionSuccess, metrics.RegisterDependencyResolutionFailure) // Wire OLM CR sharedIndexInformers @@ -360,7 +362,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions)), subscription.WithRegistryReconcilerFactory(op.reconciler), subscription.WithGlobalCatalogNamespace(op.namespace), - subscription.WithSourceProvider(op.resolverSourceProvider), + subscription.WithOperatorCacheProvider(op.operatorCacheProvider), ) if err != nil { return nil, err diff --git a/pkg/controller/operators/catalog/subscription/config.go b/pkg/controller/operators/catalog/subscription/config.go index 01c8a21899..9b4152c9c1 100644 --- a/pkg/controller/operators/catalog/subscription/config.go +++ b/pkg/controller/operators/catalog/subscription/config.go @@ -28,7 +28,7 @@ type syncerConfig struct { reconcilers kubestate.ReconcilerChain registryReconcilerFactory reconciler.RegistryReconcilerFactory globalCatalogNamespace string - sourceProvider resolverCache.SourceProvider + operatorCacheProvider resolverCache.OperatorCacheProvider } // SyncerOption is a configuration option for a subscription syncer. @@ -131,9 +131,9 @@ func WithGlobalCatalogNamespace(namespace string) SyncerOption { } } -func WithSourceProvider(provider resolverCache.SourceProvider) SyncerOption { +func WithOperatorCacheProvider(provider resolverCache.OperatorCacheProvider) SyncerOption { return func(config *syncerConfig) { - config.sourceProvider = provider + config.operatorCacheProvider = provider } } diff --git a/pkg/controller/operators/catalog/subscription/reconciler.go b/pkg/controller/operators/catalog/subscription/reconciler.go index 1a6741a650..fa9dd79d28 100644 --- a/pkg/controller/operators/catalog/subscription/reconciler.go +++ b/pkg/controller/operators/catalog/subscription/reconciler.go @@ -57,7 +57,8 @@ type catalogHealthReconciler struct { catalogLister listers.CatalogSourceLister registryReconcilerFactory reconciler.RegistryReconcilerFactory globalCatalogNamespace string - sourceProvider cache.SourceProvider + operatorCacheProvider cache.OperatorCacheProvider + logger logrus.StdLogger } // Reconcile reconciles subscription catalog health conditions. @@ -126,21 +127,16 @@ func (c *catalogHealthReconciler) Reconcile(ctx context.Context, in kubestate.St // updateDeprecatedStatus adds deprecation status conditions to the subscription when present in the cache entry then // returns a bool value of true if any changes to the existing subscription have occurred. func (c *catalogHealthReconciler) updateDeprecatedStatus(ctx context.Context, sub *v1alpha1.Subscription) (bool, error) { - if c.sourceProvider == nil { + if c.operatorCacheProvider == nil { return false, nil } - source, ok := c.sourceProvider.Sources(sub.Spec.CatalogSourceNamespace)[cache.SourceKey{ + + entries := c.operatorCacheProvider.Namespaced(sub.Spec.CatalogSourceNamespace).Catalog(cache.SourceKey{ Name: sub.Spec.CatalogSource, Namespace: sub.Spec.CatalogSourceNamespace, - }] - if !ok { - return false, nil - } - snapshot, err := source.Snapshot(ctx) - if err != nil { - return false, err - } - if len(snapshot.Entries) == 0 { + }).Find(cache.PkgPredicate(sub.Spec.Package), cache.ChannelPredicate(sub.Spec.Channel)) + + if len(entries) == 0 { return false, nil } @@ -149,12 +145,9 @@ func (c *catalogHealthReconciler) updateDeprecatedStatus(ctx context.Context, su var deprecations *cache.Deprecations found := false - for _, entry := range snapshot.Entries { + for _, entry := range entries { // Find the cache entry that matches this subscription - if entry.SourceInfo == nil || entry.Package() != sub.Spec.Package { - continue - } - if sub.Spec.Channel != "" && entry.Channel() != sub.Spec.Channel { + if entry.SourceInfo == nil { continue } if sub.Status.InstalledCSV != entry.Name { diff --git a/pkg/controller/operators/catalog/subscription/syncer.go b/pkg/controller/operators/catalog/subscription/syncer.go index b39adc6bc0..564930ef3c 100644 --- a/pkg/controller/operators/catalog/subscription/syncer.go +++ b/pkg/controller/operators/catalog/subscription/syncer.go @@ -16,7 +16,6 @@ import ( "github.com/operator-framework/api/pkg/operators/install" "github.com/operator-framework/api/pkg/operators/v1alpha1" listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1" - resolverCache "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil" "github.com/operator-framework/operator-lifecycle-manager/pkg/metrics" @@ -38,7 +37,6 @@ type subscriptionSyncer struct { installPlanLister listers.InstallPlanLister globalCatalogNamespace string notify kubestate.NotifyFunc - sourceProvider resolverCache.SourceProvider } // now returns the Syncer's current time. @@ -218,7 +216,6 @@ func newSyncerWithConfig(ctx context.Context, config *syncerConfig) (kubestate.S reconcilers: config.reconcilers, subscriptionCache: config.subscriptionInformer.GetIndexer(), installPlanLister: config.lister.OperatorsV1alpha1().InstallPlanLister(), - sourceProvider: config.sourceProvider, notify: func(event types.NamespacedName) { // Notify Subscriptions by enqueuing to the Subscription queue. config.subscriptionQueue.Add(event) @@ -256,7 +253,8 @@ func newSyncerWithConfig(ctx context.Context, config *syncerConfig) (kubestate.S catalogLister: config.lister.OperatorsV1alpha1().CatalogSourceLister(), registryReconcilerFactory: config.registryReconcilerFactory, globalCatalogNamespace: config.globalCatalogNamespace, - sourceProvider: config.sourceProvider, + operatorCacheProvider: config.operatorCacheProvider, + logger: config.logger, }, } s.reconcilers = append(defaultReconcilers, s.reconcilers...) diff --git a/pkg/controller/registry/resolver/resolver.go b/pkg/controller/registry/resolver/resolver.go index c19aba9f26..322b581fb4 100644 --- a/pkg/controller/registry/resolver/resolver.go +++ b/pkg/controller/registry/resolver/resolver.go @@ -29,15 +29,15 @@ type constraintProvider interface { } type Resolver struct { - cache *cache.Cache + cache cache.OperatorCacheProvider log logrus.FieldLogger pc *predicateConverter systemConstraintsProvider constraintProvider } -func NewDefaultResolver(rcp cache.SourceProvider, sourcePriorityProvider cache.SourcePriorityProvider, logger logrus.FieldLogger) *Resolver { +func NewDefaultResolver(cacheProvider cache.OperatorCacheProvider, logger logrus.FieldLogger) *Resolver { return &Resolver{ - cache: cache.New(rcp, cache.WithLogger(logger), cache.WithSourcePriorityProvider(sourcePriorityProvider)), + cache: cacheProvider, log: logger, pc: &predicateConverter{ celEnv: constraints.NewCelEnvironment(), diff --git a/pkg/controller/registry/resolver/step_resolver.go b/pkg/controller/registry/resolver/step_resolver.go index 5d2807bceb..5fb9ab3c0a 100644 --- a/pkg/controller/registry/resolver/step_resolver.go +++ b/pkg/controller/registry/resolver/step_resolver.go @@ -56,7 +56,7 @@ func (pp catsrcPriorityProvider) Priority(key cache.SourceKey) int { return catsrc.Spec.Priority } -func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versioned.Interface, globalCatalogNamespace string, sourceProvider cache.SourceProvider, log logrus.FieldLogger) *OperatorStepResolver { +func NewOperatorCacheProvider(lister operatorlister.OperatorLister, client versioned.Interface, sourceProvider cache.SourceProvider, log logrus.FieldLogger) cache.OperatorCacheProvider { cacheSourceProvider := &mergedSourceProvider{ sps: []cache.SourceProvider{ sourceProvider, @@ -70,13 +70,19 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio }, }, } + catSrcPriorityProvider := &catsrcPriorityProvider{lister: lister.OperatorsV1alpha1().CatalogSourceLister()} + + return cache.New(cacheSourceProvider, cache.WithLogger(log), cache.WithSourcePriorityProvider(catSrcPriorityProvider)) +} + +func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versioned.Interface, globalCatalogNamespace string, opCacheProvider cache.OperatorCacheProvider, log logrus.FieldLogger) *OperatorStepResolver { stepResolver := &OperatorStepResolver{ subLister: lister.OperatorsV1alpha1().SubscriptionLister(), csvLister: lister.OperatorsV1alpha1().ClusterServiceVersionLister(), ogLister: lister.OperatorsV1().OperatorGroupLister(), client: client, globalCatalogNamespace: globalCatalogNamespace, - resolver: NewDefaultResolver(cacheSourceProvider, catsrcPriorityProvider{lister: lister.OperatorsV1alpha1().CatalogSourceLister()}, log), + resolver: NewDefaultResolver(opCacheProvider, log), log: log, }