Skip to content

Commit 723ee40

Browse files
committed
fix(catalogsources): omit unhealthy catalogs from resolution
and retrigger catalog sync if we attempt to resolve with an unhealthy catalog this should fix issues where subscriptions fail when created immedately after a catalog is updated
1 parent da67b54 commit 723ee40

File tree

7 files changed

+227
-62
lines changed

7 files changed

+227
-62
lines changed

pkg/controller/operators/catalog/operator.go

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"sync"
99
"time"
1010

11+
"github.com/operator-framework/operator-registry/pkg/api/grpc_health_v1"
1112
registryclient "github.com/operator-framework/operator-registry/pkg/client"
1213
errorwrap "github.com/pkg/errors"
1314
"github.com/sirupsen/logrus"
@@ -302,6 +303,10 @@ func (o *Operator) handleCatSrcDeletion(obj interface{}) {
302303
delete(o.sources, sourceKey)
303304
}()
304305
o.Log.WithField("source", sourceKey).Info("removed client for deleted catalogsource")
306+
307+
if err := o.catSrcQueueSet.Remove(sourceKey.Name, sourceKey.Namespace); err != nil {
308+
o.Log.WithError(err)
309+
}
305310
}
306311

307312
func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
@@ -314,18 +319,21 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
314319
logger := o.Log.WithFields(logrus.Fields{
315320
"source": catsrc.GetName(),
316321
})
317-
322+
logger.Debug("syncing catsrc")
318323
out := catsrc.DeepCopy()
319324
sourceKey := resolver.CatalogKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()}
320325

321326
if catsrc.Spec.SourceType == v1alpha1.SourceTypeInternal || catsrc.Spec.SourceType == v1alpha1.SourceTypeConfigmap {
327+
logger.Debug("checking catsrc configmap state")
328+
322329
// Get the catalog source's config map
323330
configMap, err := o.lister.CoreV1().ConfigMapLister().ConfigMaps(catsrc.GetNamespace()).Get(catsrc.Spec.ConfigMap)
324331
if err != nil {
325332
return fmt.Errorf("failed to get catalog config map %s: %s", catsrc.Spec.ConfigMap, err)
326333
}
327334

328335
if catsrc.Status.ConfigMapResource == nil || catsrc.Status.ConfigMapResource.UID != configMap.GetUID() || catsrc.Status.ConfigMapResource.ResourceVersion != configMap.GetResourceVersion() {
336+
logger.Debug("updating catsrc configmap state")
329337
// configmap ref nonexistent or updated, write out the new configmap ref to status and exit
330338
out.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{
331339
Name: configMap.GetName(),
@@ -351,26 +359,33 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
351359
return fmt.Errorf("no reconciler for source type %s", catsrc.Spec.SourceType)
352360
}
353361

362+
logger.Debug("catsrc configmap state good, checking registry pod")
363+
354364
// if registry pod hasn't been created or hasn't been updated since the last configmap update, recreate it
355365
if catsrc.Status.RegistryServiceStatus == nil || catsrc.Status.RegistryServiceStatus.CreatedAt.Before(&catsrc.Status.LastSync) {
366+
logger.Debug("registry pod scheduled for recheck")
367+
356368
if err := reconciler.EnsureRegistryServer(out); err != nil {
357369
logger.WithError(err).Warn("couldn't ensure registry server")
358370
return err
359371
}
372+
logger.Debug("ensured registry pod")
360373

361-
if !catsrc.Status.LastSync.Before(&out.Status.LastSync) {
362-
return nil
363-
}
374+
out.Status.RegistryServiceStatus.CreatedAt = timeNow()
375+
out.Status.LastSync = timeNow()
364376

377+
logger.Debug("updating catsrc status")
365378
// update status
366379
if _, err := o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil {
367380
return err
368381
}
369382

370383
o.sourcesLastUpdate = timeNow()
384+
logger.Debug("registry pod recreated")
371385

372386
return nil
373387
}
388+
logger.Debug("registry pod state good")
374389

375390
// update operator's view of sources
376391
sourcesUpdated := false
@@ -379,7 +394,9 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
379394
defer o.sourcesLock.Unlock()
380395
address := catsrc.Status.RegistryServiceStatus.Address()
381396
currentSource, ok := o.sources[sourceKey]
382-
if !ok || currentSource.Address != address {
397+
logger = logger.WithField("currentSource", sourceKey)
398+
if !ok || currentSource.Address != address || catsrc.Status.LastSync.After(currentSource.LastConnect.Time) {
399+
logger.Info("building connection to registry")
383400
client, err := registryclient.NewClient(address)
384401
if err != nil {
385402
logger.WithError(err).Warn("couldn't connect to registry")
@@ -388,8 +405,42 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
388405
Address: address,
389406
Client: client,
390407
LastConnect: timeNow(),
408+
LastHealthy: metav1.Time{}, // haven't detected healthy yet
391409
}
392410
o.sources[sourceKey] = sourceRef
411+
currentSource = sourceRef
412+
sourcesUpdated = true
413+
}
414+
if currentSource.LastHealthy.IsZero() {
415+
logger.Info("client hasn't yet become healthy, attempt a health check")
416+
client, ok := currentSource.Client.(*registryclient.Client)
417+
if !ok {
418+
logger.WithField("client", currentSource.Client).Warn("unexpected client")
419+
return
420+
}
421+
res, err := client.Health.Check(context.TODO(), &grpc_health_v1.HealthCheckRequest{Service: "Registry"})
422+
if err != nil {
423+
logger.WithError(err).Debug("error checking health")
424+
if client.Conn.GetState() == connectivity.TransientFailure {
425+
logger.Debug("wait for state to change")
426+
ctx, _ := context.WithTimeout(context.TODO(), 1*time.Second)
427+
if !client.Conn.WaitForStateChange(ctx, connectivity.TransientFailure) {
428+
logger.Debug("state didn't change, trigger reconnect. this may happen when cached dns is wrong.")
429+
delete(o.sources, sourceKey)
430+
if err := o.catSrcQueueSet.Requeue(sourceKey.Name, sourceKey.Namespace); err != nil {
431+
logger.WithError(err).Debug("error requeueing")
432+
}
433+
return
434+
}
435+
}
436+
return
437+
}
438+
if res.Status != grpc_health_v1.HealthCheckResponse_SERVING {
439+
logger.WithField("status", res.Status.String()).Debug("source not healthy")
440+
return
441+
}
442+
currentSource.LastHealthy = timeNow()
443+
o.sources[sourceKey] = currentSource
393444
sourcesUpdated = true
394445
}
395446
}()
@@ -484,10 +535,12 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
484535

485536
installplanReference, err := o.createInstallPlan(namespace, subs, installPlanApproval, steps)
486537
if err != nil {
538+
logger.WithError(err).Debug("error creating installplan")
487539
return err
488540
}
489541

490542
if err := o.ensureSubscriptionInstallPlanState(namespace, subs, installplanReference); err != nil {
543+
logger.WithError(err).Debug("error ensuring subscription installplan state")
491544
return err
492545
}
493546
return nil
@@ -500,6 +553,14 @@ func (o *Operator) ensureResolverSources(logger *logrus.Entry, namespace string)
500553
o.sourcesLock.RLock()
501554
defer o.sourcesLock.RUnlock()
502555
for k, ref := range o.sources {
556+
if ref.LastHealthy.IsZero() {
557+
logger = logger.WithField("source", k)
558+
logger.Debug("omitting source, hasn't yet become healthy")
559+
if err := o.catSrcQueueSet.Requeue(k.Name, k.Namespace); err != nil {
560+
logger.Warn("error requeueing")
561+
}
562+
continue
563+
}
503564
// only resolve in namespace local + global catalogs
504565
if k.Namespace == namespace || k.Namespace == o.namespace {
505566
resolverSources[k] = ref.Client
@@ -513,6 +574,7 @@ func (o *Operator) ensureResolverSources(logger *logrus.Entry, namespace string)
513574
logger.Warn("unexpected client")
514575
continue
515576
}
577+
516578
logger = logger.WithField("resolverSource", k)
517579
logger.WithField("clientState", client.Conn.GetState()).Debug("source")
518580
if client.Conn.GetState() == connectivity.TransientFailure {
@@ -837,7 +899,7 @@ func (o *Operator) ExecutePlan(plan *v1alpha1.InstallPlan) error {
837899

838900
// Attempt to create the Subscription
839901
sub.SetNamespace(namespace)
840-
_, err = o.client.OperatorsV1alpha1().Subscriptions(sub.GetNamespace()).Create(&sub)
902+
created, err := o.client.OperatorsV1alpha1().Subscriptions(sub.GetNamespace()).Create(&sub)
841903
if k8serrors.IsAlreadyExists(err) {
842904
// If it already existed, mark the step as Present.
843905
plan.Status.Plan[i].Status = v1alpha1.StepStatusPresent
@@ -846,6 +908,15 @@ func (o *Operator) ExecutePlan(plan *v1alpha1.InstallPlan) error {
846908
} else {
847909
// If no error occurred, mark the step as Created.
848910
plan.Status.Plan[i].Status = v1alpha1.StepStatusCreated
911+
created.Status.Install = &v1alpha1.InstallPlanReference{
912+
UID: plan.GetUID(),
913+
Name: plan.GetName(),
914+
APIVersion: v1alpha1.SchemeGroupVersion.String(),
915+
Kind: v1alpha1.InstallPlanKind,
916+
}
917+
if _, err := o.client.OperatorsV1alpha1().Subscriptions(sub.GetNamespace()).UpdateStatus(created); err != nil {
918+
o.Log.WithError(err).Warn("couldn't set installplan reference on created subscription")
919+
}
849920
}
850921
case secretKind:
851922
// TODO: this will confuse bundle users that include secrets in their bundles - this only handles pull secrets

pkg/controller/registry/reconciler/configmap.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func (c *ConfigMapRegistryReconciler) currentService(source configMapCatalogSour
189189
serviceName := source.Service().GetName()
190190
service, err := c.Lister.CoreV1().ServiceLister().Services(source.GetNamespace()).Get(serviceName)
191191
if err != nil {
192-
logrus.WithField("service", serviceName).Warn("couldn't find service in cache")
192+
logrus.WithField("service", serviceName).Debug("couldn't find service in cache")
193193
return nil
194194
}
195195
return service
@@ -199,7 +199,7 @@ func (c *ConfigMapRegistryReconciler) currentServiceAccount(source configMapCata
199199
serviceAccountName := source.ServiceAccount().GetName()
200200
serviceAccount, err := c.Lister.CoreV1().ServiceAccountLister().ServiceAccounts(source.GetNamespace()).Get(serviceAccountName)
201201
if err != nil {
202-
logrus.WithField("serviceAccouint", serviceAccountName).WithError(err).Warn("couldn't find service account in cache")
202+
logrus.WithField("serviceAccouint", serviceAccountName).WithError(err).Debug("couldn't find service account in cache")
203203
return nil
204204
}
205205
return serviceAccount
@@ -209,7 +209,7 @@ func (c *ConfigMapRegistryReconciler) currentRole(source configMapCatalogSourceD
209209
roleName := source.Role().GetName()
210210
role, err := c.Lister.RbacV1().RoleLister().Roles(source.GetNamespace()).Get(roleName)
211211
if err != nil {
212-
logrus.WithField("role", roleName).WithError(err).Warn("couldn't find role in cache")
212+
logrus.WithField("role", roleName).WithError(err).Debug("couldn't find role in cache")
213213
return nil
214214
}
215215
return role
@@ -219,7 +219,7 @@ func (c *ConfigMapRegistryReconciler) currentRoleBinding(source configMapCatalog
219219
roleBindingName := source.RoleBinding().GetName()
220220
roleBinding, err := c.Lister.RbacV1().RoleBindingLister().RoleBindings(source.GetNamespace()).Get(roleBindingName)
221221
if err != nil {
222-
logrus.WithField("roleBinding", roleBindingName).WithError(err).Warn("couldn't find role binding in cache")
222+
logrus.WithField("roleBinding", roleBindingName).WithError(err).Debug("couldn't find role binding in cache")
223223
return nil
224224
}
225225
return roleBinding
@@ -229,11 +229,11 @@ func (c *ConfigMapRegistryReconciler) currentPods(source configMapCatalogSourceD
229229
podName := source.Pod(image).GetName()
230230
pods, err := c.Lister.CoreV1().PodLister().Pods(source.GetNamespace()).List(labels.SelectorFromSet(source.Labels()))
231231
if err != nil {
232-
logrus.WithField("pod", podName).WithError(err).Warn("couldn't find pod in cache")
232+
logrus.WithField("pod", podName).WithError(err).Debug("couldn't find pod in cache")
233233
return nil
234234
}
235235
if len(pods) > 1 {
236-
logrus.WithField("selector", source.Selector()).Warn("multiple pods found for selector")
236+
logrus.WithField("selector", source.Selector()).Debug("multiple pods found for selector")
237237
}
238238
return pods
239239
}
@@ -242,11 +242,11 @@ func (c *ConfigMapRegistryReconciler) currentPodsWithCorrectResourceVersion(sour
242242
podName := source.Pod(image).GetName()
243243
pods, err := c.Lister.CoreV1().PodLister().Pods(source.GetNamespace()).List(labels.SelectorFromValidatedSet(source.Labels()))
244244
if err != nil {
245-
logrus.WithField("pod", podName).WithError(err).Warn("couldn't find pod in cache")
245+
logrus.WithField("pod", podName).WithError(err).Debug("couldn't find pod in cache")
246246
return nil
247247
}
248248
if len(pods) > 1 {
249-
logrus.WithField("selector", source.Selector()).Warn("multiple pods found for selector")
249+
logrus.WithField("selector", source.Selector()).Debug("multiple pods found for selector")
250250
}
251251
return pods
252252
}

pkg/controller/registry/reconciler/reconciler.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ type RegistryReconcilerFactory struct {
2222
}
2323

2424
func (r *RegistryReconcilerFactory) ReconcilerForSourceType(sourceType v1alpha1.SourceType) RegistryReconciler {
25-
if sourceType == v1alpha1.SourceTypeInternal || sourceType == v1alpha1.SourceTypeConfigmap {
25+
switch sourceType {
26+
case v1alpha1.SourceTypeInternal, v1alpha1.SourceTypeConfigmap:
2627
return &ConfigMapRegistryReconciler{
2728
Lister: r.Lister,
2829
OpClient: r.OpClient,
2930
Image: r.ConfigMapServerImage,
3031
}
31-
}
32-
if sourceType == v1alpha1.SourceTypeGrpc {
32+
case v1alpha1.SourceTypeGrpc:
3333
return &GrpcRegistryReconciler{
3434
Lister: r.Lister,
3535
OpClient: r.OpClient,

pkg/controller/registry/resolver/querier.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type SourceRef struct {
1515
Address string
1616
Client client.Interface
1717
LastConnect metav1.Time
18+
LastHealthy metav1.Time
1819
}
1920

2021
type SourceQuerier interface {

pkg/lib/queueinformer/resourcequeue.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,20 @@ func (r ResourceQueueSet) Requeue(name, namespace string) error {
2525

2626
return fmt.Errorf("couldn't find queue for resource")
2727
}
28+
29+
func (r ResourceQueueSet) Remove(name, namespace string) error {
30+
// We can build the key directly, will need to change if queue uses different key scheme
31+
key := fmt.Sprintf("%s/%s", namespace, name)
32+
33+
if queue, ok := r[metav1.NamespaceAll]; len(r) == 1 && ok {
34+
queue.Forget(key)
35+
return nil
36+
}
37+
38+
if queue, ok := r[namespace]; ok {
39+
queue.Forget(key)
40+
return nil
41+
}
42+
43+
return fmt.Errorf("couldn't find queue for resource")
44+
}

0 commit comments

Comments
 (0)