Skip to content

Commit bdee5d9

Browse files
Merge pull request openshift#937 from openshift-bot/synchronize-upstream
NO-ISSUE: Synchronize From Upstream Repositories
2 parents 728cbe2 + a6ec135 commit bdee5d9

File tree

20 files changed

+214
-126
lines changed

20 files changed

+214
-126
lines changed

staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go

Lines changed: 17 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ type Operator struct {
131131
clientFactory clients.Factory
132132
muInstallPlan sync.Mutex
133133
resolverSourceProvider *resolver.RegistrySourceProvider
134+
operatorCacheProvider resolvercache.OperatorCacheProvider
134135
}
135136

136137
type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)
@@ -217,8 +218,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
217218
}
218219
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
219220
op.resolverSourceProvider = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger)
221+
op.operatorCacheProvider = resolver.NewOperatorCacheProvider(lister, crClient, op.resolverSourceProvider, logger)
220222
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient, workloadUserID, opmImage, utilImage)
221-
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.resolverSourceProvider, logger)
223+
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.operatorCacheProvider, logger)
222224
op.resolver = resolver.NewInstrumentedResolver(res, metrics.RegisterDependencyResolutionSuccess, metrics.RegisterDependencyResolutionFailure)
223225

224226
// Wire OLM CR sharedIndexInformers
@@ -360,7 +362,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
360362
subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions)),
361363
subscription.WithRegistryReconcilerFactory(op.reconciler),
362364
subscription.WithGlobalCatalogNamespace(op.namespace),
363-
subscription.WithSourceProvider(op.resolverSourceProvider),
365+
subscription.WithOperatorCacheProvider(op.operatorCacheProvider),
364366
)
365367
if err != nil {
366368
return nil, err
@@ -781,6 +783,7 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {
781783

782784
o.logger.Infof("state.Key.Namespace=%s state.Key.Name=%s state.State=%s", state.Key.Namespace, state.Key.Name, state.State.String())
783785
metrics.RegisterCatalogSourceState(state.Key.Name, state.Key.Namespace, state.State)
786+
metrics.RegisterCatalogSourceSnapshotsTotal(state.Key.Name, state.Key.Namespace)
784787

785788
switch state.State {
786789
case connectivity.Ready:
@@ -896,6 +899,7 @@ func (o *Operator) handleCatSrcDeletion(obj interface{}) {
896899
o.logger.WithField("source", sourceKey).Info("removed client for deleted catalogsource")
897900

898901
metrics.DeleteCatalogSourceStateMetric(catsrc.GetName(), catsrc.GetNamespace())
902+
metrics.DeleteCatalogSourceSnapshotsTotal(catsrc.GetName(), catsrc.GetNamespace())
899903
}
900904

901905
func validateSourceType(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, _ error) {
@@ -914,6 +918,7 @@ func validateSourceType(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *
914918
err = fmt.Errorf("unknown sourcetype: %s", sourceType)
915919
}
916920
if err != nil {
921+
logger.WithError(err).Error("error validating catalog source type")
917922
out.SetError(v1alpha1.CatalogSourceSpecInvalidError, err)
918923
return
919924
}
@@ -925,7 +930,6 @@ func validateSourceType(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *
925930
}
926931
}
927932
continueSync = true
928-
929933
return
930934
}
931935

@@ -938,27 +942,22 @@ func (o *Operator) syncConfigMap(logger *logrus.Entry, in *v1alpha1.CatalogSourc
938942

939943
out = in.DeepCopy()
940944

941-
logger = logger.WithFields(logrus.Fields{
942-
"configmap.namespace": in.Namespace,
943-
"configmap.name": in.Spec.ConfigMap,
944-
})
945-
logger.Info("checking catsrc configmap state")
946-
947945
var updateLabel bool
948946
// Get the catalog source's config map
949947
configMap, err := o.lister.CoreV1().ConfigMapLister().ConfigMaps(in.GetNamespace()).Get(in.Spec.ConfigMap)
950948
// Attempt to look up the CM via api call if there is a cache miss
951949
if apierrors.IsNotFound(err) {
950+
// TODO: Don't reach out via live client if its not found in the cache (https://github.com/operator-framework/operator-lifecycle-manager/issues/3415)
952951
configMap, err = o.opClient.KubernetesInterface().CoreV1().ConfigMaps(in.GetNamespace()).Get(context.TODO(), in.Spec.ConfigMap, metav1.GetOptions{})
953952
// Found cm in the cluster, add managed label to configmap
954953
if err == nil {
955-
labels := configMap.GetLabels()
956-
if labels == nil {
957-
labels = make(map[string]string)
954+
cmLabels := configMap.GetLabels()
955+
if cmLabels == nil {
956+
cmLabels = make(map[string]string)
958957
}
959958

960-
labels[install.OLMManagedLabelKey] = "false"
961-
configMap.SetLabels(labels)
959+
cmLabels[install.OLMManagedLabelKey] = "false"
960+
configMap.SetLabels(cmLabels)
962961
updateLabel = true
963962
}
964963
}
@@ -975,12 +974,9 @@ func (o *Operator) syncConfigMap(logger *logrus.Entry, in *v1alpha1.CatalogSourc
975974
out.SetError(v1alpha1.CatalogSourceConfigMapError, syncError)
976975
return
977976
}
978-
979-
logger.Info("adopted configmap")
980977
}
981978

982979
if in.Status.ConfigMapResource == nil || !in.Status.ConfigMapResource.IsAMatch(&configMap.ObjectMeta) {
983-
logger.Info("updating catsrc configmap state")
984980
// configmap ref nonexistent or updated, write out the new configmap ref to status and exit
985981
out.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{
986982
Name: configMap.GetName(),
@@ -1000,7 +996,6 @@ func (o *Operator) syncConfigMap(logger *logrus.Entry, in *v1alpha1.CatalogSourc
1000996
func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error) {
1001997
out = in.DeepCopy()
1002998

1003-
logger.Info("synchronizing registry server")
1004999
sourceKey := registry.CatalogKey{Name: in.GetName(), Namespace: in.GetNamespace()}
10051000
srcReconciler := o.reconciler.ReconcilerForSource(in)
10061001
if srcReconciler == nil {
@@ -1017,21 +1012,15 @@ func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.Catalog
10171012
return
10181013
}
10191014

1020-
logger.WithField("health", healthy).Infof("checked registry server health")
1021-
10221015
if healthy && in.Status.RegistryServiceStatus != nil {
1023-
logger.Info("registry state good")
10241016
continueSync = true
10251017
// return here if catalog does not have polling enabled
10261018
if !out.Poll() {
1027-
logger.Info("polling not enabled, nothing more to do")
10281019
return
10291020
}
10301021
}
10311022

10321023
// Registry pod hasn't been created or hasn't been updated since the last configmap update, recreate it
1033-
logger.Info("ensuring registry server")
1034-
10351024
err = srcReconciler.EnsureRegistryServer(logger, out)
10361025
if err != nil {
10371026
if _, ok := err.(reconciler.UpdateNotReadyErr); ok {
@@ -1044,8 +1033,6 @@ func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.Catalog
10441033
return
10451034
}
10461035

1047-
logger.Info("ensured registry server")
1048-
10491036
// requeue the catalog sync based on the polling interval, for accurate syncs of catalogs with polling enabled
10501037
if out.Spec.UpdateStrategy != nil && out.Spec.UpdateStrategy.RegistryPoll != nil {
10511038
if out.Spec.UpdateStrategy.Interval == nil {
@@ -1054,16 +1041,17 @@ func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.Catalog
10541041
return
10551042
}
10561043
if out.Spec.UpdateStrategy.RegistryPoll.ParsingError != "" && out.Status.Reason != v1alpha1.CatalogSourceIntervalInvalidError {
1057-
out.SetError(v1alpha1.CatalogSourceIntervalInvalidError, errors.New(out.Spec.UpdateStrategy.RegistryPoll.ParsingError))
1044+
err := errors.New(out.Spec.UpdateStrategy.RegistryPoll.ParsingError)
1045+
logger.WithError(err).Error("registry server sync error: failed to parse registry poll interval")
1046+
out.SetError(v1alpha1.CatalogSourceIntervalInvalidError, err)
10581047
}
1059-
logger.Infof("requeuing registry server sync based on polling interval %s", out.Spec.UpdateStrategy.Interval.Duration.String())
10601048
resyncPeriod := reconciler.SyncRegistryUpdateInterval(out, time.Now())
10611049
o.catsrcQueueSet.RequeueAfter(out.GetNamespace(), out.GetName(), queueinformer.ResyncWithJitter(resyncPeriod, 0.1)())
10621050
return
10631051
}
10641052

10651053
if err := o.sources.Remove(sourceKey); err != nil {
1066-
o.logger.WithError(err).Debug("error closing client connection")
1054+
o.logger.WithError(err).Error("registry server sync error: error closing client connection")
10671055
}
10681056

10691057
return
@@ -1154,7 +1142,6 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
11541142
"catalogsource.name": catsrc.Name,
11551143
"id": queueinformer.NewLoopID(),
11561144
})
1157-
logger.Info("syncing catalog source")
11581145

11591146
syncFunc := func(in *v1alpha1.CatalogSource, chain []CatalogSourceSyncFunc) (out *v1alpha1.CatalogSource, syncErr error) {
11601147
out = in

staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type syncerConfig struct {
2828
reconcilers kubestate.ReconcilerChain
2929
registryReconcilerFactory reconciler.RegistryReconcilerFactory
3030
globalCatalogNamespace string
31-
sourceProvider resolverCache.SourceProvider
31+
operatorCacheProvider resolverCache.OperatorCacheProvider
3232
}
3333

3434
// SyncerOption is a configuration option for a subscription syncer.
@@ -131,9 +131,9 @@ func WithGlobalCatalogNamespace(namespace string) SyncerOption {
131131
}
132132
}
133133

134-
func WithSourceProvider(provider resolverCache.SourceProvider) SyncerOption {
134+
func WithOperatorCacheProvider(provider resolverCache.OperatorCacheProvider) SyncerOption {
135135
return func(config *syncerConfig) {
136-
config.sourceProvider = provider
136+
config.operatorCacheProvider = provider
137137
}
138138
}
139139

staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/reconciler.go

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ type catalogHealthReconciler struct {
5757
catalogLister listers.CatalogSourceLister
5858
registryReconcilerFactory reconciler.RegistryReconcilerFactory
5959
globalCatalogNamespace string
60-
sourceProvider cache.SourceProvider
60+
operatorCacheProvider cache.OperatorCacheProvider
61+
logger logrus.StdLogger
6162
}
6263

6364
// Reconcile reconciles subscription catalog health conditions.
@@ -126,21 +127,16 @@ func (c *catalogHealthReconciler) Reconcile(ctx context.Context, in kubestate.St
126127
// updateDeprecatedStatus adds deprecation status conditions to the subscription when present in the cache entry then
127128
// returns a bool value of true if any changes to the existing subscription have occurred.
128129
func (c *catalogHealthReconciler) updateDeprecatedStatus(ctx context.Context, sub *v1alpha1.Subscription) (bool, error) {
129-
if c.sourceProvider == nil {
130+
if c.operatorCacheProvider == nil {
130131
return false, nil
131132
}
132-
source, ok := c.sourceProvider.Sources(sub.Spec.CatalogSourceNamespace)[cache.SourceKey{
133+
134+
entries := c.operatorCacheProvider.Namespaced(sub.Spec.CatalogSourceNamespace).Catalog(cache.SourceKey{
133135
Name: sub.Spec.CatalogSource,
134136
Namespace: sub.Spec.CatalogSourceNamespace,
135-
}]
136-
if !ok {
137-
return false, nil
138-
}
139-
snapshot, err := source.Snapshot(ctx)
140-
if err != nil {
141-
return false, err
142-
}
143-
if len(snapshot.Entries) == 0 {
137+
}).Find(cache.PkgPredicate(sub.Spec.Package), cache.ChannelPredicate(sub.Spec.Channel))
138+
139+
if len(entries) == 0 {
144140
return false, nil
145141
}
146142

@@ -149,12 +145,9 @@ func (c *catalogHealthReconciler) updateDeprecatedStatus(ctx context.Context, su
149145
var deprecations *cache.Deprecations
150146

151147
found := false
152-
for _, entry := range snapshot.Entries {
148+
for _, entry := range entries {
153149
// Find the cache entry that matches this subscription
154-
if entry.SourceInfo == nil || entry.Package() != sub.Spec.Package {
155-
continue
156-
}
157-
if sub.Spec.Channel != "" && entry.Channel() != sub.Spec.Channel {
150+
if entry.SourceInfo == nil {
158151
continue
159152
}
160153
if sub.Status.InstalledCSV != entry.Name {

staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/syncer.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"github.com/operator-framework/api/pkg/operators/install"
1717
"github.com/operator-framework/api/pkg/operators/v1alpha1"
1818
listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
19-
resolverCache "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
2019
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
2120
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
2221
"github.com/operator-framework/operator-lifecycle-manager/pkg/metrics"
@@ -38,7 +37,6 @@ type subscriptionSyncer struct {
3837
installPlanLister listers.InstallPlanLister
3938
globalCatalogNamespace string
4039
notify kubestate.NotifyFunc
41-
sourceProvider resolverCache.SourceProvider
4240
}
4341

4442
// now returns the Syncer's current time.
@@ -218,7 +216,6 @@ func newSyncerWithConfig(ctx context.Context, config *syncerConfig) (kubestate.S
218216
reconcilers: config.reconcilers,
219217
subscriptionCache: config.subscriptionInformer.GetIndexer(),
220218
installPlanLister: config.lister.OperatorsV1alpha1().InstallPlanLister(),
221-
sourceProvider: config.sourceProvider,
222219
notify: func(event types.NamespacedName) {
223220
// Notify Subscriptions by enqueuing to the Subscription queue.
224221
config.subscriptionQueue.Add(event)
@@ -256,7 +253,8 @@ func newSyncerWithConfig(ctx context.Context, config *syncerConfig) (kubestate.S
256253
catalogLister: config.lister.OperatorsV1alpha1().CatalogSourceLister(),
257254
registryReconcilerFactory: config.registryReconcilerFactory,
258255
globalCatalogNamespace: config.globalCatalogNamespace,
259-
sourceProvider: config.sourceProvider,
256+
operatorCacheProvider: config.operatorCacheProvider,
257+
logger: config.logger,
260258
},
261259
}
262260
s.reconcilers = append(defaultReconcilers, s.reconcilers...)

0 commit comments

Comments
 (0)