@@ -8,11 +8,9 @@ import (
8
8
"sync"
9
9
"time"
10
10
11
- "github.com/operator-framework/operator-registry/pkg/api/grpc_health_v1"
12
11
registryclient "github.com/operator-framework/operator-registry/pkg/client"
13
12
errorwrap "github.com/pkg/errors"
14
13
"github.com/sirupsen/logrus"
15
- "google.golang.org/grpc/connectivity"
16
14
corev1 "k8s.io/api/core/v1"
17
15
rbacv1 "k8s.io/api/rbac/v1"
18
16
v1beta1ext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
@@ -350,6 +348,11 @@ func (o *Operator) handleCatSrcDeletion(obj interface{}) {
350
348
func () {
351
349
o .sourcesLock .Lock ()
352
350
defer o .sourcesLock .Unlock ()
351
+ if s , ok := o .sources [sourceKey ]; ok {
352
+ if err := s .Client .Close (); err != nil {
353
+ o .Log .WithError (err ).Warn ("error closing client" )
354
+ }
355
+ }
353
356
delete (o .sources , sourceKey )
354
357
}()
355
358
o .Log .WithField ("source" , sourceKey ).Info ("removed client for deleted catalogsource" )
@@ -449,6 +452,12 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
449
452
func () {
450
453
o .sourcesLock .Lock ()
451
454
defer o .sourcesLock .Unlock ()
455
+ s , ok := o .sources [sourceKey ]
456
+ if ok {
457
+ if err := s .Client .Close (); err != nil {
458
+ o .Log .WithError (err ).Debug ("error closing client connection" )
459
+ }
460
+ }
452
461
delete (o .sources , sourceKey )
453
462
}()
454
463
@@ -464,50 +473,65 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
464
473
address := catsrc .Address ()
465
474
currentSource , ok := o .sources [sourceKey ]
466
475
logger = logger .WithField ("currentSource" , sourceKey )
467
- if ! ok || currentSource .Address != address || catsrc .Status .LastSync .After (currentSource .LastConnect .Time ) {
476
+
477
+ connect := false
478
+
479
+ // this connection is out of date, close and reconnect
480
+ if ok && (currentSource .Address != address || catsrc .Status .LastSync .After (currentSource .LastConnect .Time )) {
481
+ logger .Info ("rebuilding connection to registry" )
482
+ if currentSource .Client != nil {
483
+ if err := currentSource .Client .Close (); err != nil {
484
+ logger .WithError (err ).Warn ("couldn't close outdated connection to registry" )
485
+ return
486
+ }
487
+ }
488
+ delete (o .sources , sourceKey )
489
+ connect = true
490
+ } else if ! ok {
491
+ // have never made a connection, so need to build a new one
492
+ connect = true
493
+ }
494
+
495
+ if connect {
468
496
logger .Info ("building connection to registry" )
469
- client , err := registryclient .NewClient (address )
497
+ c , err := registryclient .NewClient (address )
470
498
if err != nil {
471
499
logger .WithError (err ).Warn ("couldn't connect to registry" )
472
500
}
473
501
sourceRef := resolver.SourceRef {
474
502
Address : address ,
475
- Client : client ,
503
+ Client : c ,
476
504
LastConnect : timeNow (),
477
505
LastHealthy : metav1.Time {}, // haven't detected healthy yet
478
506
}
479
507
o .sources [sourceKey ] = sourceRef
480
508
currentSource = sourceRef
481
509
sourcesUpdated = true
482
510
}
511
+
483
512
if currentSource .LastHealthy .IsZero () {
484
513
logger .Info ("client hasn't yet become healthy, attempt a health check" )
485
- client , ok := currentSource .Client .(* registryclient.Client )
486
- if ! ok {
487
- logger .WithField ("client" , currentSource .Client ).Warn ("unexpected client" )
488
- return
489
- }
490
- res , err := client .Health .Check (context .TODO (), & grpc_health_v1.HealthCheckRequest {Service : "Registry" })
514
+ healthy , err := currentSource .Client .HealthCheck (context .TODO (), 1 * time .Second )
491
515
if err != nil {
492
- logger .WithError (err ).Debug ("error checking health" )
493
- if client .Conn .GetState () == connectivity .TransientFailure {
494
- logger .Debug ("wait for state to change" )
495
- ctx , _ := context .WithTimeout (context .TODO (), 1 * time .Second )
496
- if ! client .Conn .WaitForStateChange (ctx , connectivity .TransientFailure ) {
497
- logger .Debug ("state didn't change, trigger reconnect. this may happen when cached dns is wrong." )
498
- delete (o .sources , sourceKey )
499
- if err := o .catSrcQueueSet .Requeue (sourceKey .Name , sourceKey .Namespace ); err != nil {
500
- logger .WithError (err ).Debug ("error requeueing" )
501
- }
516
+ if registryclient .IsErrorUnrecoverable (err ) {
517
+ logger .Debug ("state didn't change, trigger reconnect. this may happen when cached dns is wrong." )
518
+ if err := currentSource .Client .Close (); err != nil {
519
+ logger .WithError (err ).Warn ("couldn't close outdated connection to registry" )
502
520
return
503
521
}
522
+ delete (o .sources , sourceKey )
523
+ if err := o .catSrcQueueSet .Requeue (sourceKey .Name , sourceKey .Namespace ); err != nil {
524
+ logger .WithError (err ).Debug ("error requeueing" )
525
+ }
504
526
}
527
+ logger .WithError (err ).Debug ("connection error" )
505
528
return
506
529
}
507
- if res . Status != grpc_health_v1 . HealthCheckResponse_SERVING {
508
- logger .WithField ( "status" , res . Status . String ()). Debug ("source not healthy" )
530
+ if ! healthy {
531
+ logger .Debug ("source not healthy" )
509
532
return
510
533
}
534
+
511
535
currentSource .LastHealthy = timeNow ()
512
536
o .sources [sourceKey ] = currentSource
513
537
sourcesUpdated = true
@@ -694,26 +718,14 @@ func (o *Operator) ensureResolverSources(logger *logrus.Entry, namespace string)
694
718
}()
695
719
696
720
for k , s := range resolverSources {
697
- client , ok := s .(* registryclient.Client )
698
- if ! ok {
699
- logger .Warn ("unexpected client" )
700
- continue
701
- }
702
-
703
721
logger = logger .WithField ("resolverSource" , k )
704
- logger .WithField ("clientState" , client .Conn .GetState ()).Debug ("source" )
705
- if client .Conn .GetState () == connectivity .TransientFailure {
706
- logger .WithField ("clientState" , client .Conn .GetState ()).Debug ("waiting for connection" )
707
- ctx , _ := context .WithTimeout (context .TODO (), 2 * time .Second )
708
- changed := client .Conn .WaitForStateChange (ctx , connectivity .TransientFailure )
709
- if ! changed {
710
- logger .WithField ("clientState" , client .Conn .GetState ()).Debug ("source in transient failure and didn't recover" )
711
- delete (resolverSources , k )
712
- } else {
713
- logger .WithField ("clientState" , client .Conn .GetState ()).Debug ("connection re-established" )
714
- }
722
+ healthy , err := s .HealthCheck (context .TODO (), 2 * time .Second )
723
+ if err != nil || ! healthy {
724
+ logger .WithError (err ).Debug ("omitting source due to unhealthy source" )
725
+ delete (resolverSources , k )
715
726
}
716
727
}
728
+
717
729
return resolverSources
718
730
}
719
731
0 commit comments