@@ -14,6 +14,7 @@ import (
14
14
"google.golang.org/grpc/connectivity"
15
15
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16
16
"k8s.io/apimachinery/pkg/labels"
17
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
17
18
"k8s.io/client-go/tools/cache"
18
19
"k8s.io/client-go/util/workqueue"
19
20
@@ -100,18 +101,17 @@ func (p *RegistryProvider) setClient(client registryClient, key sourceKey) {
100
101
p .clients [key ] = client
101
102
}
102
103
103
- func (p * RegistryProvider ) removeClient (key sourceKey ) bool {
104
+ func (p * RegistryProvider ) removeClient (key sourceKey ) ( registryClient , bool ) {
104
105
p .mu .Lock ()
105
106
defer p .mu .Unlock ()
106
107
107
108
client , ok := p .clients [key ]
108
109
if ! ok {
109
- return false
110
+ return registryClient {}, false
110
111
}
111
112
112
- client .conn .Close ()
113
113
delete (p .clients , key )
114
- return true
114
+ return client , true
115
115
}
116
116
117
117
func (p * RegistryProvider ) syncCatalogSource (obj interface {}) (syncError error ) {
@@ -144,6 +144,7 @@ func (p *RegistryProvider) syncCatalogSource(obj interface{}) (syncError error)
144
144
if ! changed {
145
145
logger .Debugf ("grpc connection reset timeout" )
146
146
syncError = fmt .Errorf ("grpc connection reset timeout" )
147
+ return
147
148
}
148
149
149
150
logger .Info ("grpc connection reset" )
@@ -165,10 +166,21 @@ func (p *RegistryProvider) syncCatalogSource(obj interface{}) (syncError error)
165
166
}
166
167
167
168
func (p * RegistryProvider ) catalogSourceDeleted (obj interface {}) {
168
- catsrc , ok := obj .(* operatorsv1alpha1. CatalogSource )
169
+ catsrc , ok := obj .(metav1. Object )
169
170
if ! ok {
170
- logrus .Errorf ("catalogsource type assertion failed: wrong type: %#v" , obj )
171
- return
171
+ if ! ok {
172
+ tombstone , ok := obj .(cache.DeletedFinalStateUnknown )
173
+ if ! ok {
174
+ utilruntime .HandleError (fmt .Errorf ("couldn't get object from tombstone %#v" , obj ))
175
+ return
176
+ }
177
+
178
+ catsrc , ok = tombstone .Obj .(metav1.Object )
179
+ if ! ok {
180
+ utilruntime .HandleError (fmt .Errorf ("tombstone contained object that is not a Namespace %#v" , obj ))
181
+ return
182
+ }
183
+ }
172
184
}
173
185
174
186
logger := logrus .WithFields (logrus.Fields {
@@ -179,8 +191,14 @@ func (p *RegistryProvider) catalogSourceDeleted(obj interface{}) {
179
191
logger .Debugf ("attempting to remove grpc connection" )
180
192
181
193
key := sourceKey {catsrc .GetName (), catsrc .GetNamespace ()}
182
- removed := p .removeClient (key )
194
+ client , removed := p .removeClient (key )
183
195
if removed {
196
+ err := client .conn .Close ()
197
+ if err != nil {
198
+ logger .WithField ("err" , err .Error ()).Error ("error closing connection" )
199
+ utilruntime .HandleError (fmt .Errorf ("error closing connection %s" , err .Error ()))
200
+ return
201
+ }
184
202
logger .Debug ("grpc connection removed" )
185
203
return
186
204
}
0 commit comments