Skip to content

Commit f59c37a

Browse files
ecordelltkashem
authored andcommitted
feat(grpc): adds grpc source manager which notifies main controller
when grpc events occur. this removes a reliance on requeueing and should mean that catalog behavior becomes more stable and never needs to wait for a full resync
1 parent 586e941 commit f59c37a

File tree

17 files changed

+335
-171
lines changed

17 files changed

+335
-171
lines changed

Makefile

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ build-linux: build_cmd=build
6363
build-linux: arch_flags=GOOS=linux GOARCH=386
6464
build-linux: clean $(CMDS)
6565

66+
build-wait: clean bin/wait
67+
68+
bin/wait:
69+
CGO_ENABLED=0 GOOS=linux GOARCH=386 go build -o $@ $(PKG)/test/e2e/wait
70+
6671
$(CMDS): version_flags=-ldflags "-X $(PKG)/pkg/version.GitCommit=$(GIT_COMMIT) -X $(PKG)/pkg/version.OLMVersion=`cat OLM_VERSION`"
6772
$(CMDS):
6873
CGO_ENABLED=0 $(arch_flags) go $(build_cmd) $(MOD_FLAGS) $(version_flags) -o bin/$(shell basename $@) $@
@@ -94,9 +99,9 @@ setup-bare: clean e2e.namespace
9499
. ./scripts/install_bare.sh $(shell cat ./e2e.namespace) test/e2e/resources
95100

96101
e2e:
97-
go test -v $(MOD_FLAGS) -failfast -timeout 70m ./test/e2e/... -namespace=openshift-operators -kubeconfig=${KUBECONFIG} -olmNamespace=openshift-operator-lifecycle-manager
102+
go test -v $(MOD_FLAGS) -failfast -timeout 70m ./test/e2e/... -namespace=openshift-operators -kubeconfig=${KUBECONFIG} -olmNamespace=openshift-operator-lifecycle-manager -dummyImage=bitnami/nginx:latest
98103

99-
e2e-local: build-linux
104+
e2e-local: build-linux build-wait
100105
. ./scripts/build_local.sh
101106
. ./scripts/run_e2e_local.sh $(TEST)
102107

pkg/controller/operators/catalog/operator.go

Lines changed: 80 additions & 157 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,11 @@ import (
66
"errors"
77
"fmt"
88
"reflect"
9-
"sync"
109
"time"
1110

12-
registryclient "github.com/operator-framework/operator-registry/pkg/client"
1311
errorwrap "github.com/pkg/errors"
1412
"github.com/sirupsen/logrus"
13+
"google.golang.org/grpc/connectivity"
1514
corev1 "k8s.io/api/core/v1"
1615
rbacv1 "k8s.io/api/rbac/v1"
1716
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
@@ -30,6 +29,8 @@ import (
3029
"k8s.io/client-go/tools/clientcmd"
3130
"k8s.io/client-go/util/workqueue"
3231

32+
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/grpc"
33+
3334
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/reference"
3435
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1"
3536
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
@@ -75,8 +76,7 @@ type Operator struct {
7576
ipQueueSet *queueinformer.ResourceQueueSet
7677
nsResolveQueue workqueue.RateLimitingInterface
7778
namespace string
78-
sources map[resolver.CatalogKey]resolver.SourceRef
79-
sourcesLock sync.RWMutex
79+
sources *grpc.SourceStore
8080
sourcesLastUpdate metav1.Time
8181
resolver resolver.Resolver
8282
reconciler reconciler.RegistryReconcilerFactory
@@ -129,14 +129,14 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
129129
client: crClient,
130130
lister: lister,
131131
namespace: operatorNamespace,
132-
sources: make(map[resolver.CatalogKey]resolver.SourceRef),
133132
resolver: resolver.NewOperatorsV1alpha1Resolver(lister),
134133
catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
135134
subQueueSet: queueinformer.NewEmptyResourceQueueSet(),
136135
csvProvidedAPIsIndexer: map[string]cache.Indexer{},
137136
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient),
138137
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient),
139138
}
139+
op.sources = grpc.NewSourceStore(logger, op.syncSourceState)
140140
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now)
141141

142142
// Set up syncing for namespace-scoped resources
@@ -168,7 +168,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
168168
if err != nil {
169169
return nil, err
170170
}
171-
op.RegisterQueueInformer(ipQueueInformer)
171+
if err := op.RegisterQueueInformer(ipQueueInformer); err != nil {
172+
return nil, err
173+
}
172174

173175
// Wire CatalogSources
174176
catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources()
@@ -186,7 +188,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
186188
if err != nil {
187189
return nil, err
188190
}
189-
op.RegisterQueueInformer(catsrcQueueInformer)
191+
if err := op.RegisterQueueInformer(catsrcQueueInformer); err != nil {
192+
return nil, err
193+
}
190194

191195
// Wire Subscriptions
192196
subInformer := crInformerFactory.Operators().V1alpha1().Subscriptions()
@@ -220,7 +224,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
220224
if err != nil {
221225
return nil, err
222226
}
223-
op.RegisterQueueInformer(subQueueInformer)
227+
if err := op.RegisterQueueInformer(subQueueInformer); err != nil {
228+
return nil, err
229+
}
224230

225231
// Wire k8s informers
226232
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), resyncPeriod, informers.WithNamespace(namespace))
@@ -304,7 +310,11 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
304310
if err != nil {
305311
return nil, err
306312
}
307-
op.RegisterQueueInformer(namespaceQueueInformer)
313+
if err := op.RegisterQueueInformer(namespaceQueueInformer); err != nil {
314+
return nil, err
315+
}
316+
317+
op.sources.Start(context.Background())
308318

309319
return op, nil
310320
}
@@ -313,6 +323,19 @@ func (o *Operator) now() metav1.Time {
313323
return metav1.NewTime(o.clock.Now().UTC())
314324
}
315325

326+
func (o *Operator) syncSourceState(state grpc.SourceState) {
327+
o.sourcesLastUpdate = o.now()
328+
329+
switch state.State {
330+
case connectivity.Ready:
331+
o.resolveNamespace(state.Key.Namespace)
332+
default:
333+
if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil {
334+
o.logger.WithError(err).Info("couldn't requeue catalogsource from catalog status change")
335+
}
336+
}
337+
}
338+
316339
func (o *Operator) requeueOwners(obj metav1.Object) {
317340
namespace := obj.GetNamespace()
318341
logger := o.logger.WithFields(logrus.Fields{
@@ -402,16 +425,9 @@ func (o *Operator) handleCatSrcDeletion(obj interface{}) {
402425
}
403426
}
404427
sourceKey := resolver.CatalogKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()}
405-
func() {
406-
o.sourcesLock.Lock()
407-
defer o.sourcesLock.Unlock()
408-
if s, ok := o.sources[sourceKey]; ok {
409-
if err := s.Client.Close(); err != nil {
410-
o.logger.WithError(err).Warn("error closing client")
411-
}
412-
}
413-
delete(o.sources, sourceKey)
414-
}()
428+
if err := o.sources.Remove(sourceKey); err != nil {
429+
o.logger.WithError(err).Warn("error closing client")
430+
}
415431
o.logger.WithField("source", sourceKey).Info("removed client for deleted catalogsource")
416432
}
417433

@@ -480,124 +496,66 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
480496

481497
// If registry pod hasn't been created or hasn't been updated since the last configmap update, recreate it
482498
if !healthy || catsrc.Status.RegistryServiceStatus == nil {
483-
return func() error {
484-
o.sourcesLock.Lock()
485-
defer o.sourcesLock.Unlock()
499+
logger.Debug("ensuring registry server")
500+
if err := srcReconciler.EnsureRegistryServer(out); err != nil {
501+
logger.WithError(err).Warn("couldn't ensure registry server")
502+
return err
503+
}
504+
logger.Debug("ensured registry server")
486505

487-
logger.Debug("ensuring registry server")
488-
if err := srcReconciler.EnsureRegistryServer(out); err != nil {
489-
logger.WithError(err).Warn("couldn't ensure registry server")
490-
return err
491-
}
492-
logger.Debug("ensured registry server")
506+
// update status
507+
logger.Debug("updating catsrc status")
508+
if _, err := o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil {
509+
return err
510+
}
493511

494-
if s, ok := o.sources[sourceKey]; ok {
495-
if err := s.Client.Close(); err != nil {
496-
logger.WithError(err).Debug("error closing client connection")
497-
}
498-
}
499-
delete(o.sources, sourceKey)
500-
o.sourcesLastUpdate = out.Status.LastSync
512+
logger.Debug("updating catsrc status")
513+
if _, err := o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil {
514+
return err
515+
}
516+
logger.Debug("registry server recreated")
501517

502-
logger.Debug("updating catsrc status")
503-
if _, err := o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil {
504-
return err
505-
}
506-
logger.Debug("registry server recreated")
518+
if err := o.sources.Remove(sourceKey); err != nil {
519+
o.logger.WithError(err).Debug("error closing client connection")
520+
}
507521

508-
return nil
509-
}()
522+
return nil
510523
}
511524
logger.Debug("registry state good")
512525

513526
// update operator's view of sources
514-
sourcesUpdated := false
515-
func() {
516-
o.sourcesLock.Lock()
517-
defer o.sourcesLock.Unlock()
518-
address := catsrc.Address()
519-
currentSource, ok := o.sources[sourceKey]
520-
logger = logger.WithField("currentSource", sourceKey)
521-
522-
connect := false
523-
524-
// this connection is out of date, close and reconnect
525-
if ok && (currentSource.Address != address || catsrc.Status.LastSync.After(currentSource.LastConnect.Time)) {
526-
logger.Info("rebuilding connection to registry")
527-
if currentSource.Client != nil {
528-
if err := currentSource.Client.Close(); err != nil {
529-
logger.WithError(err).Warn("couldn't close outdated connection to registry")
530-
return
531-
}
532-
}
533-
delete(o.sources, sourceKey)
534-
o.sourcesLastUpdate = o.now()
527+
now := o.now()
535528

536-
connect = true
537-
} else if !ok {
538-
// have never made a connection, so need to build a new one
539-
connect = true
540-
}
529+
if currentSource := o.sources.GetMeta(sourceKey); currentSource != nil {
530+
address := catsrc.Address()
541531

542-
logger := logger.WithField("address", address)
543-
if connect {
544-
logger.Info("building connection to registry")
545-
c, err := registryclient.NewClient(address)
532+
logger = logger.WithField("address", address).WithField("currentSource", sourceKey)
533+
if currentSource.Address != address {
534+
source, err := o.sources.Add(sourceKey, address)
546535
if err != nil {
547536
logger.WithError(err).Warn("couldn't connect to registry")
548537
}
549-
sourceRef := resolver.SourceRef{
550-
Address: address,
551-
Client: c,
552-
LastConnect: o.now(),
553-
LastHealthy: metav1.Time{}, // haven't detected healthy yet
538+
currentSource = &source.SourceMeta
539+
540+
out.Status.LastSync = now
541+
if _, err := o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil {
542+
return err
554543
}
555-
o.sources[sourceKey] = sourceRef
556-
currentSource = sourceRef
557-
sourcesUpdated = true
558-
o.sourcesLastUpdate = sourceRef.LastConnect
559544
}
560545

561-
if currentSource.LastHealthy.IsZero() {
562-
logger.Info("client hasn't yet become healthy, attempt a health check")
563-
healthy, err := currentSource.Client.HealthCheck(context.TODO(), 2*time.Second)
564-
if err != nil || !healthy {
565-
if registryclient.IsErrorUnrecoverable(err) {
566-
logger.Debug("state didn't change, trigger reconnect. this may happen when cached dns is wrong.")
567-
if err := currentSource.Client.Close(); err != nil {
568-
logger.WithError(err).Warn("couldn't close outdated connection to registry")
569-
return
570-
}
571-
delete(o.sources, sourceKey)
572-
o.sourcesLastUpdate = o.now()
573-
}
574-
if err := o.catsrcQueueSet.Requeue(sourceKey.Namespace, sourceKey.Name); err != nil {
575-
logger.WithError(err).Debug("error requeuing")
576-
}
577-
return
546+
// connection is already good, but we need to update the sync time
547+
if out.Status.LastSync.Before(&o.sourcesLastUpdate) {
548+
out.Status.LastSync = now
549+
if _, err := o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil {
550+
return err
578551
}
579-
580-
logger.Debug("client has become healthy!")
581-
currentSource.LastHealthy = currentSource.LastConnect
582-
o.sourcesLastUpdate = currentSource.LastHealthy
583-
o.sources[sourceKey] = currentSource
584-
sourcesUpdated = true
585552
}
586-
}()
587-
588-
if !sourcesUpdated {
589-
return nil
590-
}
591-
592-
// record that we've done work here onto the status
593-
out.Status.LastSync = o.now()
594-
if _, err := o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil {
595-
return err
553+
} else {
554+
if _, err := o.sources.Add(sourceKey, catsrc.Address()); err != nil {
555+
return err
556+
}
596557
}
597558

598-
// Trigger a resolve, will pick up any subscriptions that depend on the catalog
599-
o.nsResolveQueue.Add(out.GetNamespace())
600-
601559
return nil
602560
}
603561

@@ -615,9 +573,9 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
615573
})
616574

617575
// get the set of sources that should be used for resolution and best-effort get their connections working
618-
resolverSources := o.ensureResolverSources(logger, namespace)
619-
logger.Debugf("resolved sources: %#v", resolverSources)
620-
querier := resolver.NewNamespaceSourceQuerier(resolverSources)
576+
logger.Debug("resolving sources")
577+
578+
querier := resolver.NewNamespaceSourceQuerier(o.sources.AsClients(o.namespace, namespace))
621579

622580
logger.Debug("checking if subscriptions need update")
623581

@@ -714,46 +672,11 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
714672
return nil
715673
}
716674

717-
func (o *Operator) ensureResolverSources(logger *logrus.Entry, namespace string) map[resolver.CatalogKey]registryclient.Interface {
718-
// TODO: record connection status onto an object
719-
resolverSources := map[resolver.CatalogKey]registryclient.Interface{}
720-
func() {
721-
o.sourcesLock.RLock()
722-
defer o.sourcesLock.RUnlock()
723-
for k, ref := range o.sources {
724-
if ref.LastHealthy.IsZero() {
725-
logger = logger.WithField("source", k)
726-
logger.Debug("omitting source, hasn't yet become healthy")
727-
if err := o.catsrcQueueSet.Requeue(k.Namespace, k.Name); err != nil {
728-
logger.Warn("error requeueing")
729-
}
730-
continue
731-
}
732-
// only resolve in namespace local + global catalogs
733-
if k.Namespace == namespace || k.Namespace == o.namespace {
734-
resolverSources[k] = ref.Client
735-
}
736-
}
737-
}()
738-
739-
for k, s := range resolverSources {
740-
logger = logger.WithField("resolverSource", k)
741-
if healthy, err := s.HealthCheck(context.TODO(), 2*time.Second); err != nil || !healthy {
742-
logger.WithError(err).Debug("omitting unhealthy source")
743-
if err := o.catsrcQueueSet.Requeue(k.Namespace, k.Name); err != nil {
744-
logger.Warn("error requeueing")
745-
}
746-
delete(resolverSources, k)
747-
}
748-
}
749-
750-
return resolverSources
675+
func (o *Operator) resolveNamespace(namespace string) {
676+
o.nsResolveQueue.Add(namespace)
751677
}
752678

753679
func (o *Operator) nothingToUpdate(logger *logrus.Entry, sub *v1alpha1.Subscription) bool {
754-
o.sourcesLock.RLock()
755-
defer o.sourcesLock.RUnlock()
756-
757680
// Only sync if catalog has been updated since last sync time
758681
if o.sourcesLastUpdate.Before(&sub.Status.LastUpdated) && sub.Status.State != v1alpha1.SubscriptionStateNone && sub.Status.State != v1alpha1.SubscriptionStateUpgradeAvailable {
759682
logger.Debugf("skipping update: no new updates to catalog since last sync at %s", sub.Status.LastUpdated.String())

0 commit comments

Comments
 (0)