Skip to content

Commit 75d0b22

Browse files
authored
Merge pull request #7537 from sbueringer/pr-cc-per-cluster-at-most-once-lock
🌱 ClusterCacheTracker: use non-blocking per-cluster locking
2 parents 808ca1c + 1348144 commit 75d0b22

File tree

11 files changed

+320
-49
lines changed

11 files changed

+320
-49
lines changed

controllers/remote/cluster_cache_healthcheck_test.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,10 @@ func TestClusterCacheHealthCheck(t *testing.T) {
140140
})
141141

142142
// Make sure this passes for at least for some seconds, to give the health check goroutine time to run.
143-
g.Consistently(func() bool { return cct.clusterAccessorExists(testClusterKey) }, 5*time.Second, 1*time.Second).Should(BeTrue())
143+
g.Consistently(func() bool {
144+
_, ok := cct.loadAccessor(testClusterKey)
145+
return ok
146+
}, 5*time.Second, 1*time.Second).Should(BeTrue())
144147
})
145148

146149
t.Run("with an invalid path", func(t *testing.T) {
@@ -162,7 +165,10 @@ func TestClusterCacheHealthCheck(t *testing.T) {
162165
})
163166

164167
// This should succeed after N consecutive failed requests.
165-
g.Eventually(func() bool { return cct.clusterAccessorExists(testClusterKey) }, 5*time.Second, 1*time.Second).Should(BeFalse())
168+
g.Eventually(func() bool {
169+
_, ok := cct.loadAccessor(testClusterKey)
170+
return ok
171+
}, 5*time.Second, 1*time.Second).Should(BeFalse())
166172
})
167173

168174
t.Run("with an invalid config", func(t *testing.T) {
@@ -193,7 +199,10 @@ func TestClusterCacheHealthCheck(t *testing.T) {
193199
})
194200

195201
// This should succeed after N consecutive failed requests.
196-
g.Eventually(func() bool { return cct.clusterAccessorExists(testClusterKey) }, 5*time.Second, 1*time.Second).Should(BeFalse())
202+
g.Eventually(func() bool {
203+
_, ok := cct.loadAccessor(testClusterKey)
204+
return ok
205+
}, 5*time.Second, 1*time.Second).Should(BeFalse())
197206
})
198207
})
199208
}

controllers/remote/cluster_cache_tracker.go

Lines changed: 93 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -54,19 +54,30 @@ const (
5454
healthCheckPollInterval = 10 * time.Second
5555
healthCheckRequestTimeout = 5 * time.Second
5656
healthCheckUnhealthyThreshold = 10
57+
initialCacheSyncTimeout = 5 * time.Minute
5758
clusterCacheControllerName = "cluster-cache-tracker"
5859
)
5960

61+
// ErrClusterLocked is returned in methods that require cluster-level locking
62+
// if the cluster is already locked by another concurrent call.
63+
var ErrClusterLocked = errors.New("cluster is locked already")
64+
6065
// ClusterCacheTracker manages client caches for workload clusters.
6166
type ClusterCacheTracker struct {
6267
log logr.Logger
6368
clientUncachedObjects []client.Object
6469
client client.Client
6570
scheme *runtime.Scheme
6671

67-
lock sync.RWMutex
72+
// clusterAccessorsLock is used to lock the access to the clusterAccessors map.
73+
clusterAccessorsLock sync.RWMutex
74+
// clusterAccessors is the map of clusterAccessors by cluster.
6875
clusterAccessors map[client.ObjectKey]*clusterAccessor
69-
indexes []Index
76+
// clusterLock is a per-cluster lock used whenever we're locking for a specific cluster.
77+
// E.g. for actions like creating a client or adding watches.
78+
clusterLock *keyedMutex
79+
80+
indexes []Index
7081

7182
// controllerPodMetadata is the Pod metadata of the controller using this ClusterCacheTracker.
7283
// This is only set when the POD_NAMESPACE, POD_NAME and POD_UID environment variables are set.
@@ -129,16 +140,14 @@ func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOpt
129140
client: manager.GetClient(),
130141
scheme: manager.GetScheme(),
131142
clusterAccessors: make(map[client.ObjectKey]*clusterAccessor),
143+
clusterLock: newKeyedMutex(),
132144
indexes: options.Indexes,
133145
}, nil
134146
}
135147

136148
// GetClient returns a cached client for the given cluster.
137149
func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error) {
138-
t.lock.Lock()
139-
defer t.lock.Unlock()
140-
141-
accessor, err := t.getClusterAccessorLH(ctx, cluster, t.indexes...)
150+
accessor, err := t.getClusterAccessor(ctx, cluster, t.indexes...)
142151
if err != nil {
143152
return nil, err
144153
}
@@ -148,10 +157,7 @@ func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.Obje
148157

149158
// GetRESTConfig returns a cached REST config for the given cluster.
150159
func (t *ClusterCacheTracker) GetRESTConfig(ctc context.Context, cluster client.ObjectKey) (*rest.Config, error) {
151-
t.lock.Lock()
152-
defer t.lock.Unlock()
153-
154-
accessor, err := t.getClusterAccessorLH(ctc, cluster, t.indexes...)
160+
accessor, err := t.getClusterAccessor(ctc, cluster, t.indexes...)
155161
if err != nil {
156162
return nil, err
157163
}
@@ -169,29 +175,68 @@ type clusterAccessor struct {
169175

170176
// clusterAccessorExists returns true if a clusterAccessor exists for cluster.
171177
func (t *ClusterCacheTracker) clusterAccessorExists(cluster client.ObjectKey) bool {
172-
t.lock.RLock()
173-
defer t.lock.RUnlock()
178+
t.clusterAccessorsLock.RLock()
179+
defer t.clusterAccessorsLock.RUnlock()
174180

175181
_, exists := t.clusterAccessors[cluster]
176182
return exists
177183
}
178184

179-
// getClusterAccessorLH first tries to return an already-created clusterAccessor for cluster, falling back to creating a
180-
// new clusterAccessor if needed. Note, this method requires t.lock to already be held (LH=lock held).
181-
func (t *ClusterCacheTracker) getClusterAccessorLH(ctx context.Context, cluster client.ObjectKey, indexes ...Index) (*clusterAccessor, error) {
182-
a := t.clusterAccessors[cluster]
183-
if a != nil {
184-
return a, nil
185+
// loadAccessor loads a clusterAccessor.
186+
func (t *ClusterCacheTracker) loadAccessor(cluster client.ObjectKey) (*clusterAccessor, bool) {
187+
t.clusterAccessorsLock.RLock()
188+
defer t.clusterAccessorsLock.RUnlock()
189+
190+
accessor, ok := t.clusterAccessors[cluster]
191+
return accessor, ok
192+
}
193+
194+
// storeAccessor stores a clusterAccessor.
195+
func (t *ClusterCacheTracker) storeAccessor(cluster client.ObjectKey, accessor *clusterAccessor) {
196+
t.clusterAccessorsLock.Lock()
197+
defer t.clusterAccessorsLock.Unlock()
198+
199+
t.clusterAccessors[cluster] = accessor
200+
}
201+
202+
// getClusterAccessor returns a clusterAccessor for cluster.
203+
// It first tries to return an already-created clusterAccessor.
204+
// It then falls back to create a new clusterAccessor if needed.
205+
// If there is already another go routine trying to create a clusterAccessor
206+
// for the same cluster, an error is returned.
207+
func (t *ClusterCacheTracker) getClusterAccessor(ctx context.Context, cluster client.ObjectKey, indexes ...Index) (*clusterAccessor, error) {
208+
log := ctrl.LoggerFrom(ctx, "cluster", klog.KRef(cluster.Namespace, cluster.Name))
209+
210+
// If the clusterAccessor already exists, return early.
211+
if accessor, ok := t.loadAccessor(cluster); ok {
212+
return accessor, nil
185213
}
186214

187-
a, err := t.newClusterAccessor(ctx, cluster, indexes...)
188-
if err != nil {
189-
return nil, errors.Wrap(err, "error creating client and cache for remote cluster")
215+
// clusterAccessor doesn't exist yet, we might have to initialize one.
216+
// Lock on the cluster to ensure only one clusterAccessor is initialized
217+
// for the cluster at the same time.
218+
// Return an error if another go routine already tries to create a clusterAccessor.
219+
if ok := t.clusterLock.TryLock(cluster); !ok {
220+
return nil, errors.Wrapf(ErrClusterLocked, "failed to create cluster accessor: failed to get lock for cluster")
190221
}
222+
defer t.clusterLock.Unlock(cluster)
191223

192-
t.clusterAccessors[cluster] = a
224+
// Until we got the cluster lock a different goroutine might have initialized the clusterAccessor
225+
// for this cluster successfully already. If this is the case we return it.
226+
if accessor, ok := t.loadAccessor(cluster); ok {
227+
return accessor, nil
228+
}
229+
230+
// We are the go routine who has to initialize the clusterAccessor.
231+
log.V(4).Info("Creating new cluster accessor")
232+
accessor, err := t.newClusterAccessor(ctx, cluster, indexes...)
233+
if err != nil {
234+
return nil, errors.Wrap(err, "failed to create cluster accessor")
235+
}
193236

194-
return a, nil
237+
log.V(4).Info("Storing new cluster accessor")
238+
t.storeAccessor(cluster, accessor)
239+
return accessor, nil
195240
}
196241

197242
// newClusterAccessor creates a new clusterAccessor.
@@ -265,7 +310,12 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
265310

266311
// Start the cache!!!
267312
go cache.Start(cacheCtx) //nolint:errcheck
268-
if !cache.WaitForCacheSync(cacheCtx) {
313+
314+
// Wait until the cache is initially synced
315+
cacheSyncCtx, cacheSyncCtxCancel := context.WithTimeout(ctx, initialCacheSyncTimeout)
316+
defer cacheSyncCtxCancel()
317+
if !cache.WaitForCacheSync(cacheSyncCtx) {
318+
cache.Stop()
269319
return nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err())
270320
}
271321

@@ -337,8 +387,8 @@ func (t *ClusterCacheTracker) createClient(config *rest.Config, cluster client.O
337387

338388
// deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker.
339389
func (t *ClusterCacheTracker) deleteAccessor(_ context.Context, cluster client.ObjectKey) {
340-
t.lock.Lock()
341-
defer t.lock.Unlock()
390+
t.clusterAccessorsLock.Lock()
391+
defer t.clusterAccessorsLock.Unlock()
342392

343393
a, exists := t.clusterAccessors[cluster]
344394
if !exists {
@@ -387,25 +437,30 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error
387437
return errors.New("input.Name is required")
388438
}
389439

390-
t.lock.Lock()
391-
defer t.lock.Unlock()
392-
393-
a, err := t.getClusterAccessorLH(ctx, input.Cluster, t.indexes...)
440+
accessor, err := t.getClusterAccessor(ctx, input.Cluster, t.indexes...)
394441
if err != nil {
395-
return err
442+
return errors.Wrapf(err, "failed to add %s watch on cluster %s", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
443+
}
444+
445+
// We have to lock the cluster, so that the watch is not created multiple times in parallel.
446+
ok := t.clusterLock.TryLock(input.Cluster)
447+
if !ok {
448+
return errors.Wrapf(ErrClusterLocked, "failed to add %s watch on cluster %s: failed to get lock for cluster", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
396449
}
450+
defer t.clusterLock.Unlock(input.Cluster)
397451

398-
if a.watches.Has(input.Name) {
399-
t.log.V(6).Info("Watch already exists", "Cluster", klog.KRef(input.Cluster.Namespace, input.Cluster.Name), "name", input.Name)
452+
if accessor.watches.Has(input.Name) {
453+
log := ctrl.LoggerFrom(ctx)
454+
log.V(6).Info("Watch already exists", "Cluster", klog.KRef(input.Cluster.Namespace, input.Cluster.Name), "name", input.Name)
400455
return nil
401456
}
402457

403458
// Need to create the watch
404-
if err := input.Watcher.Watch(source.NewKindWithCache(input.Kind, a.cache), input.EventHandler, input.Predicates...); err != nil {
405-
return errors.Wrap(err, "error creating watch")
459+
if err := input.Watcher.Watch(source.NewKindWithCache(input.Kind, accessor.cache), input.EventHandler, input.Predicates...); err != nil {
460+
return errors.Wrapf(err, "failed to add %s watch on cluster %s: failed to create watch", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
406461
}
407462

408-
a.watches.Insert(input.Name)
463+
accessor.watches.Insert(input.Name)
409464

410465
return nil
411466
}
@@ -472,7 +527,7 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health
472527
return false, nil
473528
}
474529

475-
if !t.clusterAccessorExists(in.cluster) {
530+
if _, ok := t.loadAccessor(in.cluster); !ok {
476531
// Cache for this cluster has already been cleaned up.
477532
// Nothing to do, so return true.
478533
return true, nil
@@ -505,7 +560,7 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health
505560
// An error returned implies the health check has failed a sufficient number of
506561
// times for the cluster to be considered unhealthy
507562
// NB. we are ignoring ErrWaitTimeout because this error happens when the channel is close, that in this case
508-
// happens when the cache is explicitly stopped.F
563+
// happens when the cache is explicitly stopped.
509564
if err != nil && err != wait.ErrWaitTimeout {
510565
t.log.Error(err, "Error health checking cluster", "Cluster", klog.KRef(in.cluster.Namespace, in.cluster.Name))
511566
t.deleteAccessor(ctx, in.cluster)

controllers/remote/keyedmutex.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package remote
18+
19+
import (
20+
"sync"
21+
22+
"sigs.k8s.io/controller-runtime/pkg/client"
23+
)
24+
25+
// keyedMutex is a mutex locking on the key provided to the Lock function.
26+
// Only one caller can hold the lock for a specific key at a time.
27+
// A second Lock call if the lock is already held for a key returns false.
28+
type keyedMutex struct {
29+
locksMtx sync.Mutex
30+
locks map[client.ObjectKey]struct{}
31+
}
32+
33+
// newKeyedMutex creates a new keyed mutex ready for use.
34+
func newKeyedMutex() *keyedMutex {
35+
return &keyedMutex{
36+
locks: make(map[client.ObjectKey]struct{}),
37+
}
38+
}
39+
40+
// TryLock locks the passed in key if it's not already locked.
41+
// A second Lock call if the lock is already held for a key returns false.
42+
// In the ClusterCacheTracker case the key is the ObjectKey for a cluster.
43+
func (k *keyedMutex) TryLock(key client.ObjectKey) bool {
44+
k.locksMtx.Lock()
45+
defer k.locksMtx.Unlock()
46+
47+
// Check if there is already a lock for this key (e.g. Cluster).
48+
if _, ok := k.locks[key]; ok {
49+
// There is already a lock, return false.
50+
return false
51+
}
52+
53+
// Lock doesn't exist yet, create the lock.
54+
k.locks[key] = struct{}{}
55+
56+
return true
57+
}
58+
59+
// Unlock unlocks the key.
60+
func (k *keyedMutex) Unlock(key client.ObjectKey) {
61+
k.locksMtx.Lock()
62+
defer k.locksMtx.Unlock()
63+
64+
// Remove the lock if it exists.
65+
delete(k.locks, key)
66+
}

0 commit comments

Comments
 (0)