@@ -142,27 +142,30 @@ func (s *SourceStore) watch(ctx context.Context, key resolver.CatalogKey, source
142
142
case <- ctx .Done ():
143
143
return
144
144
default :
145
- timer , _ := context .WithTimeout (ctx , s .stateTimeout (state ))
146
- if source .Conn .WaitForStateChange (timer , state ) {
147
- newState := source .Conn .GetState ()
148
- state = newState
149
-
150
- // update connection state
151
- src := s .Get (key )
152
- if src == nil {
153
- // source was removed, cleanup this goroutine
154
- return
145
+ func () {
146
+ timer , cancel := context .WithTimeout (ctx , s .stateTimeout (state ))
147
+ defer cancel ()
148
+ if source .Conn .WaitForStateChange (timer , state ) {
149
+ newState := source .Conn .GetState ()
150
+ state = newState
151
+
152
+ // update connection state
153
+ src := s .Get (key )
154
+ if src == nil {
155
+ // source was removed, cleanup this goroutine
156
+ return
157
+ }
158
+
159
+ src .LastConnect = metav1 .Now ()
160
+ src .ConnectionState = newState
161
+ s .sourcesLock .Lock ()
162
+ s .sources [key ] = * src
163
+ s .sourcesLock .Unlock ()
164
+
165
+ // notify subscriber
166
+ s .notify <- SourceState {Key : key , State : newState }
155
167
}
156
-
157
- src .LastConnect = metav1 .Now ()
158
- src .ConnectionState = newState
159
- s .sourcesLock .Lock ()
160
- s .sources [key ] = * src
161
- s .sourcesLock .Unlock ()
162
-
163
- // notify subscriber
164
- s .notify <- SourceState {Key : key , State : newState }
165
- }
168
+ }()
166
169
}
167
170
}
168
171
}
@@ -190,6 +193,7 @@ func (s *SourceStore) Remove(key resolver.CatalogKey) error {
190
193
func (s * SourceStore ) AsClients (globalNamespace , localNamespace string ) map [resolver.CatalogKey ]client.Interface {
191
194
refs := map [resolver.CatalogKey ]client.Interface {}
192
195
s .sourcesLock .RLock ()
196
+ defer s .sourcesLock .RUnlock ()
193
197
for key , source := range s .sources {
194
198
if ! (key .Namespace == globalNamespace || key .Namespace == localNamespace ) {
195
199
continue
@@ -199,7 +203,6 @@ func (s *SourceStore) AsClients(globalNamespace, localNamespace string) map[reso
199
203
}
200
204
refs [key ] = client .NewClientFromConn (source .Conn )
201
205
}
202
- s .sourcesLock .RUnlock ()
203
206
204
207
// TODO : remove unhealthy
205
208
return refs
0 commit comments