Skip to content

Commit 97534b8

Browse files
committed
Fix ClusterCacheTracker memory leak
Decouple the code that creates uncached client from the code that creates the cached client. This way, we don't start the cache until the cached client is created. This avoids scenarios where the `ClusterCacheTracker` cache is started when only an uncached client is needed. There is an existing code path, described in the original issue, where the `ClusterCacheTracker` cache is started when only an uncached client is needed. The cache is re-created later, and the initial cache is still running continuously in the background. Signed-off-by: Ionut Balutoiu <[email protected]>
1 parent 0f55580 commit 97534b8

File tree

1 file changed

+59
-27
lines changed

1 file changed

+59
-27
lines changed

controllers/remote/cluster_cache_tracker.go

Lines changed: 59 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/pkg/errors"
3030
corev1 "k8s.io/api/core/v1"
3131
apierrors "k8s.io/apimachinery/pkg/api/errors"
32+
"k8s.io/apimachinery/pkg/api/meta"
3233
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3334
"k8s.io/apimachinery/pkg/runtime"
3435
"k8s.io/apimachinery/pkg/runtime/serializer"
@@ -298,8 +299,14 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
298299
return nil, errors.Wrapf(err, "error fetching REST client config for remote cluster %q", cluster.String())
299300
}
300301

301-
// Create a client and a cache for the cluster.
302-
c, uncachedClient, cache, err := t.createClient(ctx, config, cluster, indexes)
302+
// Create a http client and a mapper for the cluster.
303+
httpClient, mapper, err := t.createHTTPClientAndMapper(config, cluster)
304+
if err != nil {
305+
return nil, errors.Wrapf(err, "error creating http client and mapper for remote cluster %q", cluster.String())
306+
}
307+
308+
// Create an uncached client for the cluster.
309+
uncachedClient, err := t.createUncachedClient(config, cluster, httpClient, mapper)
303310
if err != nil {
304311
return nil, err
305312
}
@@ -324,16 +331,23 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
324331
config.CAFile = inClusterConfig.CAFile
325332
config.Host = inClusterConfig.Host
326333

327-
// Create a new client and overwrite the previously created client.
328-
c, _, cache, err = t.createClient(ctx, config, cluster, indexes)
334+
// Update the http client and the mapper to use in-cluster config.
335+
httpClient, mapper, err = t.createHTTPClientAndMapper(config, cluster)
329336
if err != nil {
330-
return nil, errors.Wrap(err, "error creating client for self-hosted cluster")
337+
return nil, errors.Wrapf(err, "error creating http client and mapper (using in-cluster config) for remote cluster %q", cluster.String())
331338
}
339+
332340
log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with in-cluster service %q", cluster.String(), config.Host))
333341
} else {
334342
log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with the regular apiserver endpoint %q", cluster.String(), config.Host))
335343
}
336344

345+
// Create a client and a cache for the cluster.
346+
cachedClient, err := t.createCachedClient(ctx, config, cluster, httpClient, mapper, indexes)
347+
if err != nil {
348+
return nil, err
349+
}
350+
337351
// Generating a new private key to be used for generating temporary certificates to connect to
338352
// etcd on the target cluster.
339353
// NOTE: Generating a private key is an expensive operation, so we store it in the cluster accessor.
@@ -343,9 +357,9 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
343357
}
344358

345359
return &clusterAccessor{
346-
cache: cache,
360+
cache: cachedClient.Cache,
347361
config: config,
348-
client: c,
362+
client: cachedClient.Client,
349363
watches: sets.Set[string]{},
350364
etcdClientCertificateKey: etcdKey,
351365
}, nil
@@ -377,28 +391,53 @@ func (t *ClusterCacheTracker) runningOnWorkloadCluster(ctx context.Context, c cl
377391
return t.controllerPodMetadata.UID == pod.UID, nil
378392
}
379393

380-
// createClient creates a cached client, and uncached client and a mapper based on a rest.Config.
381-
func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, indexes []Index) (client.Client, client.Client, *stoppableCache, error) {
394+
// createHTTPClientAndMapper creates a http client and a dynamic rest mapper for the given cluster, based on the rest.Config.
395+
func (t *ClusterCacheTracker) createHTTPClientAndMapper(config *rest.Config, cluster client.ObjectKey) (*http.Client, meta.RESTMapper, error) {
382396
// Create a http client for the cluster.
383397
httpClient, err := rest.HTTPClientFor(config)
384398
if err != nil {
385-
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String())
399+
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String())
386400
}
387401

388402
// Create a mapper for it
389403
mapper, err := apiutil.NewDynamicRESTMapper(config, httpClient)
390404
if err != nil {
391-
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String())
405+
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String())
392406
}
393407

394408
// Verify if we can get a rest mapping from the workload cluster apiserver.
395409
// Note: This also checks if the apiserver is up in general. We do this already here
396410
// to avoid further effort creating a cache and a client and to produce a clearer error message.
397411
_, err = mapper.RESTMapping(corev1.SchemeGroupVersion.WithKind("Node").GroupKind(), corev1.SchemeGroupVersion.Version)
398412
if err != nil {
399-
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String())
413+
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String())
414+
}
415+
416+
return httpClient, mapper, nil
417+
}
418+
419+
// createUncachedClient creates an uncached client for the given cluster, based on the rest.Config.
420+
func (t *ClusterCacheTracker) createUncachedClient(config *rest.Config, cluster client.ObjectKey, httpClient *http.Client, mapper meta.RESTMapper) (client.Client, error) {
421+
// Create the uncached client for the remote cluster
422+
uncachedClient, err := client.New(config, client.Options{
423+
Scheme: t.scheme,
424+
Mapper: mapper,
425+
HTTPClient: httpClient,
426+
})
427+
if err != nil {
428+
return nil, errors.Wrapf(err, "error creating uncached client for remote cluster %q", cluster.String())
400429
}
401430

431+
return uncachedClient, nil
432+
}
433+
434+
type cachedClientOutput struct {
435+
Client client.Client
436+
Cache *stoppableCache
437+
}
438+
439+
// createCachedClient creates a cached client for the given cluster, based on a rest.Config.
440+
func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, httpClient *http.Client, mapper meta.RESTMapper, indexes []Index) (*cachedClientOutput, error) {
402441
// Create the cache for the remote cluster
403442
cacheOptions := cache.Options{
404443
HTTPClient: httpClient,
@@ -407,7 +446,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
407446
}
408447
remoteCache, err := cache.New(config, cacheOptions)
409448
if err != nil {
410-
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating cache", cluster.String())
449+
return nil, errors.Wrapf(err, "error creating cached client for remote cluster %q: error creating cache", cluster.String())
411450
}
412451

413452
cacheCtx, cacheCtxCancel := context.WithCancel(ctx)
@@ -420,7 +459,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
420459

421460
for _, index := range indexes {
422461
if err := cache.IndexField(ctx, index.Object, index.Field, index.ExtractValue); err != nil {
423-
return nil, nil, nil, errors.Wrapf(err, "error adding index for field %q to cache for remote cluster %q", index.Field, cluster.String())
462+
return nil, errors.Wrapf(err, "error creating cached client for remote cluster %q: error adding index for field %q to cache", cluster.String(), index.Field)
424463
}
425464
}
426465

@@ -436,19 +475,9 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
436475
},
437476
})
438477
if err != nil {
439-
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String())
478+
return nil, errors.Wrapf(err, "error creating cached client for remote cluster %q", cluster.String())
440479
}
441480

442-
// Create an uncached client. This is used in `runningOnWorkloadCluster` to ensure we don't continuously cache
443-
// pods in the client.
444-
uncachedClient, err := client.New(config, client.Options{
445-
Scheme: t.scheme,
446-
Mapper: mapper,
447-
HTTPClient: httpClient,
448-
})
449-
if err != nil {
450-
return nil, nil, nil, errors.Wrapf(err, "error creating uncached client for remote cluster %q", cluster.String())
451-
}
452481
// Start the cache!!!
453482
go cache.Start(cacheCtx) //nolint:errcheck
454483

@@ -457,7 +486,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
457486
defer cacheSyncCtxCancel()
458487
if !cache.WaitForCacheSync(cacheSyncCtx) {
459488
cache.Stop()
460-
return nil, nil, nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err())
489+
return nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err())
461490
}
462491

463492
// Wrap the cached client with a client that sets timeouts on all Get and List calls
@@ -474,7 +503,10 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
474503
httpClient: httpClient,
475504
})
476505

477-
return cachedClient, uncachedClient, cache, nil
506+
return &cachedClientOutput{
507+
Client: cachedClient,
508+
Cache: cache,
509+
}, nil
478510
}
479511

480512
// deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker.

0 commit comments

Comments
 (0)