Skip to content

Commit 1bef042

Browse files
authored
Merge pull request kubernetes-sigs#9543 from ionutbalutoiu/fix/cluster-cache-tracker-memory-leak
🐛 Fix ClusterCacheTracker memory leak
2 parents fe3d762 + 97534b8 commit 1bef042

File tree

1 file changed

+59
-27
lines changed

1 file changed

+59
-27
lines changed

controllers/remote/cluster_cache_tracker.go

Lines changed: 59 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/pkg/errors"
3030
corev1 "k8s.io/api/core/v1"
3131
apierrors "k8s.io/apimachinery/pkg/api/errors"
32+
"k8s.io/apimachinery/pkg/api/meta"
3233
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3334
"k8s.io/apimachinery/pkg/runtime"
3435
"k8s.io/apimachinery/pkg/runtime/serializer"
@@ -298,8 +299,14 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
298299
return nil, errors.Wrapf(err, "error fetching REST client config for remote cluster %q", cluster.String())
299300
}
300301

301-
// Create a client and a cache for the cluster.
302-
c, uncachedClient, cache, err := t.createClient(ctx, config, cluster, indexes)
302+
// Create a http client and a mapper for the cluster.
303+
httpClient, mapper, err := t.createHTTPClientAndMapper(config, cluster)
304+
if err != nil {
305+
return nil, errors.Wrapf(err, "error creating http client and mapper for remote cluster %q", cluster.String())
306+
}
307+
308+
// Create an uncached client for the cluster.
309+
uncachedClient, err := t.createUncachedClient(config, cluster, httpClient, mapper)
303310
if err != nil {
304311
return nil, err
305312
}
@@ -324,16 +331,23 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
324331
config.CAFile = inClusterConfig.CAFile
325332
config.Host = inClusterConfig.Host
326333

327-
// Create a new client and overwrite the previously created client.
328-
c, _, cache, err = t.createClient(ctx, config, cluster, indexes)
334+
// Update the http client and the mapper to use in-cluster config.
335+
httpClient, mapper, err = t.createHTTPClientAndMapper(config, cluster)
329336
if err != nil {
330-
return nil, errors.Wrap(err, "error creating client for self-hosted cluster")
337+
return nil, errors.Wrapf(err, "error creating http client and mapper (using in-cluster config) for remote cluster %q", cluster.String())
331338
}
339+
332340
log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with in-cluster service %q", cluster.String(), config.Host))
333341
} else {
334342
log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with the regular apiserver endpoint %q", cluster.String(), config.Host))
335343
}
336344

345+
// Create a client and a cache for the cluster.
346+
cachedClient, err := t.createCachedClient(ctx, config, cluster, httpClient, mapper, indexes)
347+
if err != nil {
348+
return nil, err
349+
}
350+
337351
// Generating a new private key to be used for generating temporary certificates to connect to
338352
// etcd on the target cluster.
339353
// NOTE: Generating a private key is an expensive operation, so we store it in the cluster accessor.
@@ -343,9 +357,9 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
343357
}
344358

345359
return &clusterAccessor{
346-
cache: cache,
360+
cache: cachedClient.Cache,
347361
config: config,
348-
client: c,
362+
client: cachedClient.Client,
349363
watches: sets.Set[string]{},
350364
etcdClientCertificateKey: etcdKey,
351365
}, nil
@@ -377,28 +391,53 @@ func (t *ClusterCacheTracker) runningOnWorkloadCluster(ctx context.Context, c cl
377391
return t.controllerPodMetadata.UID == pod.UID, nil
378392
}
379393

380-
// createClient creates a cached client, and uncached client and a mapper based on a rest.Config.
381-
func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, indexes []Index) (client.Client, client.Client, *stoppableCache, error) {
394+
// createHTTPClientAndMapper creates a http client and a dynamic rest mapper for the given cluster, based on the rest.Config.
395+
func (t *ClusterCacheTracker) createHTTPClientAndMapper(config *rest.Config, cluster client.ObjectKey) (*http.Client, meta.RESTMapper, error) {
382396
// Create a http client for the cluster.
383397
httpClient, err := rest.HTTPClientFor(config)
384398
if err != nil {
385-
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String())
399+
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String())
386400
}
387401

388402
// Create a mapper for it
389403
mapper, err := apiutil.NewDynamicRESTMapper(config, httpClient)
390404
if err != nil {
391-
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String())
405+
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String())
392406
}
393407

394408
// Verify if we can get a rest mapping from the workload cluster apiserver.
395409
// Note: This also checks if the apiserver is up in general. We do this already here
396410
// to avoid further effort creating a cache and a client and to produce a clearer error message.
397411
_, err = mapper.RESTMapping(corev1.SchemeGroupVersion.WithKind("Node").GroupKind(), corev1.SchemeGroupVersion.Version)
398412
if err != nil {
399-
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String())
413+
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String())
414+
}
415+
416+
return httpClient, mapper, nil
417+
}
418+
419+
// createUncachedClient creates an uncached client for the given cluster, based on the rest.Config.
420+
func (t *ClusterCacheTracker) createUncachedClient(config *rest.Config, cluster client.ObjectKey, httpClient *http.Client, mapper meta.RESTMapper) (client.Client, error) {
421+
// Create the uncached client for the remote cluster
422+
uncachedClient, err := client.New(config, client.Options{
423+
Scheme: t.scheme,
424+
Mapper: mapper,
425+
HTTPClient: httpClient,
426+
})
427+
if err != nil {
428+
return nil, errors.Wrapf(err, "error creating uncached client for remote cluster %q", cluster.String())
400429
}
401430

431+
return uncachedClient, nil
432+
}
433+
434+
type cachedClientOutput struct {
435+
Client client.Client
436+
Cache *stoppableCache
437+
}
438+
439+
// createCachedClient creates a cached client for the given cluster, based on a rest.Config.
440+
func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, httpClient *http.Client, mapper meta.RESTMapper, indexes []Index) (*cachedClientOutput, error) {
402441
// Create the cache for the remote cluster
403442
cacheOptions := cache.Options{
404443
HTTPClient: httpClient,
@@ -407,7 +446,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
407446
}
408447
remoteCache, err := cache.New(config, cacheOptions)
409448
if err != nil {
410-
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating cache", cluster.String())
449+
return nil, errors.Wrapf(err, "error creating cached client for remote cluster %q: error creating cache", cluster.String())
411450
}
412451

413452
cacheCtx, cacheCtxCancel := context.WithCancel(ctx)
@@ -420,7 +459,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
420459

421460
for _, index := range indexes {
422461
if err := cache.IndexField(ctx, index.Object, index.Field, index.ExtractValue); err != nil {
423-
return nil, nil, nil, errors.Wrapf(err, "error adding index for field %q to cache for remote cluster %q", index.Field, cluster.String())
462+
return nil, errors.Wrapf(err, "error creating cached client for remote cluster %q: error adding index for field %q to cache", cluster.String(), index.Field)
424463
}
425464
}
426465

@@ -436,19 +475,9 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
436475
},
437476
})
438477
if err != nil {
439-
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String())
478+
return nil, errors.Wrapf(err, "error creating cached client for remote cluster %q", cluster.String())
440479
}
441480

442-
// Create an uncached client. This is used in `runningOnWorkloadCluster` to ensure we don't continuously cache
443-
// pods in the client.
444-
uncachedClient, err := client.New(config, client.Options{
445-
Scheme: t.scheme,
446-
Mapper: mapper,
447-
HTTPClient: httpClient,
448-
})
449-
if err != nil {
450-
return nil, nil, nil, errors.Wrapf(err, "error creating uncached client for remote cluster %q", cluster.String())
451-
}
452481
// Start the cache!!!
453482
go cache.Start(cacheCtx) //nolint:errcheck
454483

@@ -457,7 +486,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
457486
defer cacheSyncCtxCancel()
458487
if !cache.WaitForCacheSync(cacheSyncCtx) {
459488
cache.Stop()
460-
return nil, nil, nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err())
489+
return nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err())
461490
}
462491

463492
// Wrap the cached client with a client that sets timeouts on all Get and List calls
@@ -474,7 +503,10 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
474503
httpClient: httpClient,
475504
})
476505

477-
return cachedClient, uncachedClient, cache, nil
506+
return &cachedClientOutput{
507+
Client: cachedClient,
508+
Cache: cache,
509+
}, nil
478510
}
479511

480512
// deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker.

0 commit comments

Comments
 (0)