Skip to content

Commit ea72079

Browse files
Merge pull request #861 from ecordell/grpc-close
fix(catalog): close grpc connections before deleting them
2 parents e402bfd + 775881a commit ea72079

File tree

14 files changed

+445
-331
lines changed

14 files changed

+445
-331
lines changed

go.mod

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,13 @@ require (
3333
github.com/onsi/ginkgo v1.8.0 // indirect
3434
github.com/openshift/api v3.9.1-0.20190424152011-77b8897ec79a+incompatible
3535
github.com/openshift/client-go v0.0.0-20190401163519-84c2b942258a
36-
github.com/operator-framework/operator-registry v1.1.0
36+
github.com/operator-framework/operator-registry v1.1.1
3737
github.com/pkg/errors v0.8.1
3838
github.com/prometheus/client_golang v0.9.2
3939
github.com/sirupsen/logrus v1.4.1
4040
github.com/spf13/cobra v0.0.3
4141
github.com/stretchr/testify v1.2.2
42+
go.uber.org/atomic v1.4.0 // indirect
4243
go.uber.org/zap v1.10.0 // indirect
4344
golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 // indirect
4445
golang.org/x/text v0.3.1-0.20181227161524-e6919f6577db // indirect
@@ -49,7 +50,7 @@ require (
4950
k8s.io/api v0.0.0-20190118113203-912cbe2bfef3
5051
k8s.io/apiextensions-apiserver v0.0.0-20190221101132-cda7b6cfba78
5152
k8s.io/apimachinery v0.0.0-20190221084156-01f179d85dbc
52-
k8s.io/apiserver v0.0.0-20190402012035-5e1c1f41ee34
53+
k8s.io/apiserver v0.0.0-20190518070817-682b37ff211e
5354
k8s.io/cli-runtime v0.0.0-20190221101700-11047e25a94a // indirect
5455
k8s.io/client-go v11.0.0+incompatible
5556
k8s.io/code-generator v0.0.0-20181203235156-f8cba74510f3

go.sum

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,8 @@ github.com/operator-framework/operator-marketplace v0.0.0-20190216021216-57300a3
208208
github.com/operator-framework/operator-registry v1.0.1 h1:Z2155w77HzIkTrdp2qoY0QMkywxhJpuABUSGcgogXuc=
209209
github.com/operator-framework/operator-registry v1.0.1/go.mod h1:1xEdZjjUg2hPEd52LG3YQ0jtwiwEGdm98S1TH5P4RAA=
210210
github.com/operator-framework/operator-registry v1.0.4/go.mod h1:hve6YwcjM2nGVlscLtNsp9sIIBkNZo6jlJgzWw7vP9s=
211-
github.com/operator-framework/operator-registry v1.1.0 h1:yk+ikEQe+UVLFCvnfhqJW7DLMj3BHNY/X/tbgaaR1OM=
212-
github.com/operator-framework/operator-registry v1.1.0/go.mod h1:7D4WEwL+EKti5npUh4/u64DQhawCBRugp8Ql20duUb4=
211+
github.com/operator-framework/operator-registry v1.1.1 h1:oDIevJvKXFsp7BEb7iJHuLvuhPZYBtIx5oZQ7iSISAs=
212+
github.com/operator-framework/operator-registry v1.1.1/go.mod h1:7D4WEwL+EKti5npUh4/u64DQhawCBRugp8Ql20duUb4=
213213
github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g=
214214
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
215215
github.com/petar/GoLLRB v0.0.0-20130427215148-53be0d36a84c/go.mod h1:HUpKUBZnpzkdx0kD/+Yfuft+uD3zHGtXF/XJB14TUr4=
@@ -264,6 +264,8 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5
264264
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
265265
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
266266
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
267+
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
268+
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
267269
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
268270
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
269271
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=

pkg/controller/operators/catalog/operator.go

Lines changed: 52 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,9 @@ import (
88
"sync"
99
"time"
1010

11-
"github.com/operator-framework/operator-registry/pkg/api/grpc_health_v1"
1211
registryclient "github.com/operator-framework/operator-registry/pkg/client"
1312
errorwrap "github.com/pkg/errors"
1413
"github.com/sirupsen/logrus"
15-
"google.golang.org/grpc/connectivity"
1614
corev1 "k8s.io/api/core/v1"
1715
rbacv1 "k8s.io/api/rbac/v1"
1816
v1beta1ext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
@@ -350,6 +348,11 @@ func (o *Operator) handleCatSrcDeletion(obj interface{}) {
350348
func() {
351349
o.sourcesLock.Lock()
352350
defer o.sourcesLock.Unlock()
351+
if s, ok := o.sources[sourceKey]; ok {
352+
if err := s.Client.Close(); err != nil {
353+
o.Log.WithError(err).Warn("error closing client")
354+
}
355+
}
353356
delete(o.sources, sourceKey)
354357
}()
355358
o.Log.WithField("source", sourceKey).Info("removed client for deleted catalogsource")
@@ -449,6 +452,12 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
449452
func() {
450453
o.sourcesLock.Lock()
451454
defer o.sourcesLock.Unlock()
455+
s, ok := o.sources[sourceKey]
456+
if ok {
457+
if err := s.Client.Close(); err != nil {
458+
o.Log.WithError(err).Debug("error closing client connection")
459+
}
460+
}
452461
delete(o.sources, sourceKey)
453462
}()
454463

@@ -464,50 +473,65 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
464473
address := catsrc.Address()
465474
currentSource, ok := o.sources[sourceKey]
466475
logger = logger.WithField("currentSource", sourceKey)
467-
if !ok || currentSource.Address != address || catsrc.Status.LastSync.After(currentSource.LastConnect.Time) {
476+
477+
connect := false
478+
479+
// this connection is out of date, close and reconnect
480+
if ok && (currentSource.Address != address || catsrc.Status.LastSync.After(currentSource.LastConnect.Time)) {
481+
logger.Info("rebuilding connection to registry")
482+
if currentSource.Client != nil {
483+
if err := currentSource.Client.Close(); err != nil {
484+
logger.WithError(err).Warn("couldn't close outdated connection to registry")
485+
return
486+
}
487+
}
488+
delete(o.sources, sourceKey)
489+
connect = true
490+
} else if !ok {
491+
// have never made a connection, so need to build a new one
492+
connect = true
493+
}
494+
495+
if connect {
468496
logger.Info("building connection to registry")
469-
client, err := registryclient.NewClient(address)
497+
c, err := registryclient.NewClient(address)
470498
if err != nil {
471499
logger.WithError(err).Warn("couldn't connect to registry")
472500
}
473501
sourceRef := resolver.SourceRef{
474502
Address: address,
475-
Client: client,
503+
Client: c,
476504
LastConnect: timeNow(),
477505
LastHealthy: metav1.Time{}, // haven't detected healthy yet
478506
}
479507
o.sources[sourceKey] = sourceRef
480508
currentSource = sourceRef
481509
sourcesUpdated = true
482510
}
511+
483512
if currentSource.LastHealthy.IsZero() {
484513
logger.Info("client hasn't yet become healthy, attempt a health check")
485-
client, ok := currentSource.Client.(*registryclient.Client)
486-
if !ok {
487-
logger.WithField("client", currentSource.Client).Warn("unexpected client")
488-
return
489-
}
490-
res, err := client.Health.Check(context.TODO(), &grpc_health_v1.HealthCheckRequest{Service: "Registry"})
514+
healthy, err := currentSource.Client.HealthCheck(context.TODO(), 1*time.Second)
491515
if err != nil {
492-
logger.WithError(err).Debug("error checking health")
493-
if client.Conn.GetState() == connectivity.TransientFailure {
494-
logger.Debug("wait for state to change")
495-
ctx, _ := context.WithTimeout(context.TODO(), 1*time.Second)
496-
if !client.Conn.WaitForStateChange(ctx, connectivity.TransientFailure) {
497-
logger.Debug("state didn't change, trigger reconnect. this may happen when cached dns is wrong.")
498-
delete(o.sources, sourceKey)
499-
if err := o.catSrcQueueSet.Requeue(sourceKey.Name, sourceKey.Namespace); err != nil {
500-
logger.WithError(err).Debug("error requeueing")
501-
}
516+
if registryclient.IsErrorUnrecoverable(err) {
517+
logger.Debug("state didn't change, trigger reconnect. this may happen when cached dns is wrong.")
518+
if err := currentSource.Client.Close(); err != nil {
519+
logger.WithError(err).Warn("couldn't close outdated connection to registry")
502520
return
503521
}
522+
delete(o.sources, sourceKey)
523+
if err := o.catSrcQueueSet.Requeue(sourceKey.Name, sourceKey.Namespace); err != nil {
524+
logger.WithError(err).Debug("error requeueing")
525+
}
504526
}
527+
logger.WithError(err).Debug("connection error")
505528
return
506529
}
507-
if res.Status != grpc_health_v1.HealthCheckResponse_SERVING {
508-
logger.WithField("status", res.Status.String()).Debug("source not healthy")
530+
if !healthy {
531+
logger.Debug("source not healthy")
509532
return
510533
}
534+
511535
currentSource.LastHealthy = timeNow()
512536
o.sources[sourceKey] = currentSource
513537
sourcesUpdated = true
@@ -694,26 +718,14 @@ func (o *Operator) ensureResolverSources(logger *logrus.Entry, namespace string)
694718
}()
695719

696720
for k, s := range resolverSources {
697-
client, ok := s.(*registryclient.Client)
698-
if !ok {
699-
logger.Warn("unexpected client")
700-
continue
701-
}
702-
703721
logger = logger.WithField("resolverSource", k)
704-
logger.WithField("clientState", client.Conn.GetState()).Debug("source")
705-
if client.Conn.GetState() == connectivity.TransientFailure {
706-
logger.WithField("clientState", client.Conn.GetState()).Debug("waiting for connection")
707-
ctx, _ := context.WithTimeout(context.TODO(), 2*time.Second)
708-
changed := client.Conn.WaitForStateChange(ctx, connectivity.TransientFailure)
709-
if !changed {
710-
logger.WithField("clientState", client.Conn.GetState()).Debug("source in transient failure and didn't recover")
711-
delete(resolverSources, k)
712-
} else {
713-
logger.WithField("clientState", client.Conn.GetState()).Debug("connection re-established")
714-
}
722+
healthy, err := s.HealthCheck(context.TODO(), 2*time.Second)
723+
if err != nil || !healthy {
724+
logger.WithError(err).Debug("omitting source due to unhealthy source")
725+
delete(resolverSources, k)
715726
}
716727
}
728+
717729
return resolverSources
718730
}
719731

pkg/controller/registry/resolver/fakes/fake_registry_client.go

Lines changed: 149 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)