Skip to content

Commit 9b363c7

Browse files
committed
Adjust ClusterCacheTracker
Signed-off-by: Stefan Büringer [email protected]
1 parent 7b3f213 commit 9b363c7

File tree

7 files changed

+95
-79
lines changed

7 files changed

+95
-79
lines changed

controllers/external/tracker.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/pkg/errors"
2424
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2525
"k8s.io/apimachinery/pkg/runtime"
26+
"sigs.k8s.io/controller-runtime/pkg/cache"
2627
"sigs.k8s.io/controller-runtime/pkg/controller"
2728
"sigs.k8s.io/controller-runtime/pkg/handler"
2829
"sigs.k8s.io/controller-runtime/pkg/predicate"
@@ -36,6 +37,7 @@ type ObjectTracker struct {
3637
m sync.Map
3738

3839
Controller controller.Controller
40+
Cache cache.Cache
3941
}
4042

4143
// Watch uses the controller to issue a Watch only if the object hasn't been seen before.
@@ -56,7 +58,7 @@ func (o *ObjectTracker) Watch(log logr.Logger, obj runtime.Object, handler handl
5658

5759
log.Info("Adding watcher on external object", "groupVersionKind", gvk.String())
5860
err := o.Controller.Watch(
59-
&source.Kind{Type: u},
61+
source.Kind(o.Cache, u),
6062
handler,
6163
append(p, predicates.ResourceNotPaused(log))...,
6264
)

controllers/remote/cluster_cache_healthcheck_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,13 @@ func TestClusterCacheHealthCheck(t *testing.T) {
130130
ctx, cancel := context.WithCancel(ctx)
131131
defer cancel()
132132

133+
httpClient, err := rest.HTTPClientFor(env.Config)
134+
g.Expect(err).ToNot(HaveOccurred())
135+
133136
go cct.healthCheckCluster(ctx, &healthCheckInput{
134137
cluster: testClusterKey,
135138
cfg: env.Config,
139+
httpClient: httpClient,
136140
interval: testPollInterval,
137141
requestTimeout: testPollTimeout,
138142
unhealthyThreshold: testUnhealthyThreshold,
@@ -157,9 +161,13 @@ func TestClusterCacheHealthCheck(t *testing.T) {
157161
cct.deleteAccessor(ctx, testClusterKey)
158162
g.Expect(cct.clusterLock.TryLock(testClusterKey)).To(BeTrue())
159163
startHealthCheck := time.Now()
164+
165+
httpClient, err := rest.HTTPClientFor(env.Config)
166+
g.Expect(err).ToNot(HaveOccurred())
160167
cct.healthCheckCluster(ctx, &healthCheckInput{
161168
cluster: testClusterKey,
162169
cfg: env.Config,
170+
httpClient: httpClient,
163171
interval: testPollInterval,
164172
requestTimeout: testPollTimeout,
165173
unhealthyThreshold: testUnhealthyThreshold,
@@ -180,10 +188,13 @@ func TestClusterCacheHealthCheck(t *testing.T) {
180188
ctx, cancel := context.WithCancel(ctx)
181189
defer cancel()
182190

191+
httpClient, err := rest.HTTPClientFor(env.Config)
192+
g.Expect(err).ToNot(HaveOccurred())
183193
go cct.healthCheckCluster(ctx,
184194
&healthCheckInput{
185195
cluster: testClusterKey,
186196
cfg: env.Config,
197+
httpClient: httpClient,
187198
interval: testPollInterval,
188199
requestTimeout: testPollTimeout,
189200
unhealthyThreshold: testUnhealthyThreshold,
@@ -215,9 +226,12 @@ func TestClusterCacheHealthCheck(t *testing.T) {
215226
config := rest.CopyConfig(env.Config)
216227
config.Host = fmt.Sprintf("http://127.0.0.1:%d", l.Addr().(*net.TCPAddr).Port)
217228

229+
httpClient, err := rest.HTTPClientFor(env.Config)
230+
g.Expect(err).ToNot(HaveOccurred())
218231
go cct.healthCheckCluster(ctx, &healthCheckInput{
219232
cluster: testClusterKey,
220233
cfg: config,
234+
httpClient: httpClient,
221235
interval: testPollInterval,
222236
requestTimeout: testPollTimeout,
223237
unhealthyThreshold: testUnhealthyThreshold,

controllers/remote/cluster_cache_tracker.go

Lines changed: 75 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"crypto/rsa"
2222
"fmt"
23+
"net/http"
2324
"os"
2425
"sync"
2526
"time"
@@ -263,8 +264,8 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
263264
return nil, errors.Wrapf(err, "error fetching REST client config for remote cluster %q", cluster.String())
264265
}
265266

266-
// Create a client and a mapper for the cluster.
267-
c, mapper, err := t.createClient(config, cluster)
267+
// Create a client and a cache for the cluster.
268+
c, cache, err := t.createClient(ctx, config, cluster, indexes)
268269
if err != nil {
269270
return nil, err
270271
}
@@ -280,7 +281,7 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
280281
if runningOnCluster {
281282
inClusterConfig, err := ctrl.GetConfig()
282283
if err != nil {
283-
return nil, errors.Wrap(err, "error creating client for self-hosted cluster")
284+
return nil, errors.Wrapf(err, "error creating client for self-hosted cluster %q", cluster.String())
284285
}
285286

286287
// Use CA and Host from in-cluster config.
@@ -289,7 +290,7 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
289290
config.Host = inClusterConfig.Host
290291

291292
// Create a new client and overwrite the previously created client.
292-
c, mapper, err = t.createClient(config, cluster)
293+
c, cache, err = t.createClient(ctx, config, cluster, indexes)
293294
if err != nil {
294295
return nil, errors.Wrap(err, "error creating client for self-hosted cluster")
295296
}
@@ -298,56 +299,6 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
298299
log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with the regular apiserver endpoint %q", cluster.String(), config.Host))
299300
}
300301

301-
// Create the cache for the remote cluster
302-
cacheOptions := cache.Options{
303-
Scheme: t.scheme,
304-
Mapper: mapper,
305-
}
306-
remoteCache, err := cache.New(config, cacheOptions)
307-
if err != nil {
308-
return nil, errors.Wrapf(err, "error creating cache for remote cluster %q", cluster.String())
309-
}
310-
311-
cacheCtx, cacheCtxCancel := context.WithCancel(ctx)
312-
313-
// We need to be able to stop the cache's shared informers, so wrap this in a stoppableCache.
314-
cache := &stoppableCache{
315-
Cache: remoteCache,
316-
cancelFunc: cacheCtxCancel,
317-
}
318-
319-
for _, index := range indexes {
320-
if err := cache.IndexField(ctx, index.Object, index.Field, index.ExtractValue); err != nil {
321-
return nil, fmt.Errorf("failed to index field %s: %w", index.Field, err)
322-
}
323-
}
324-
325-
// Start the cache!!!
326-
go cache.Start(cacheCtx) //nolint:errcheck
327-
328-
// Wait until the cache is initially synced
329-
cacheSyncCtx, cacheSyncCtxCancel := context.WithTimeout(ctx, initialCacheSyncTimeout)
330-
defer cacheSyncCtxCancel()
331-
if !cache.WaitForCacheSync(cacheSyncCtx) {
332-
cache.Stop()
333-
return nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err())
334-
}
335-
336-
// Start cluster healthcheck!!!
337-
go t.healthCheckCluster(cacheCtx, &healthCheckInput{
338-
cluster: cluster,
339-
cfg: config,
340-
})
341-
342-
delegatingClient, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
343-
CacheReader: cache,
344-
Client: c,
345-
UncachedObjects: t.clientUncachedObjects,
346-
})
347-
if err != nil {
348-
return nil, err
349-
}
350-
351302
// Generating a new private key to be used for generating temporary certificates to connect to
352303
// etcd on the target cluster.
353304
// NOTE: Generating a private key is an expensive operation, so we store it in the cluster accessor.
@@ -359,7 +310,7 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
359310
return &clusterAccessor{
360311
cache: cache,
361312
config: config,
362-
client: delegatingClient,
313+
client: c,
363314
watches: sets.Set[string]{},
364315
etcdClientCertificateKey: etcdKey,
365316
}, nil
@@ -392,27 +343,83 @@ func (t *ClusterCacheTracker) runningOnWorkloadCluster(ctx context.Context, c cl
392343
}
393344

394345
// createClient creates a client and a mapper based on a rest.Config.
395-
func (t *ClusterCacheTracker) createClient(config *rest.Config, cluster client.ObjectKey) (client.Client, meta.RESTMapper, error) {
396-
var mapper meta.RESTMapper
397-
var err error
346+
func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, indexes []Index) (client.Client, *stoppableCache, error) {
347+
// Create a http client for the cluster.
348+
httpClient, err := rest.HTTPClientFor(config)
349+
if err != nil {
350+
return nil, nil, errors.Wrapf(err, "error creating http client for remote cluster %q", cluster.String())
351+
}
398352

353+
var mapper meta.RESTMapper
399354
// Create a mapper for it
400355
if !feature.Gates.Enabled(feature.LazyRestmapper) {
401-
mapper, err = apiutil.NewDynamicRESTMapper(config)
356+
mapper, err = apiutil.NewDynamicRESTMapper(config, httpClient)
402357
} else {
403-
mapper, err = apiutil.NewDynamicRESTMapper(config, apiutil.WithExperimentalLazyMapper)
358+
mapper, err = apiutil.NewDynamicRESTMapper(config, httpClient, apiutil.WithExperimentalLazyMapper)
404359
}
405360
if err != nil {
406361
return nil, nil, errors.Wrapf(err, "error creating dynamic rest mapper for remote cluster %q", cluster.String())
407362
}
408363

364+
// Create the cache for the remote cluster
365+
cacheOptions := cache.Options{
366+
HTTPClient: httpClient,
367+
Scheme: t.scheme,
368+
Mapper: mapper,
369+
}
370+
remoteCache, err := cache.New(config, cacheOptions)
371+
if err != nil {
372+
return nil, nil, errors.Wrapf(err, "error creating cache for remote cluster %q", cluster.String())
373+
}
374+
375+
cacheCtx, cacheCtxCancel := context.WithCancel(ctx)
376+
377+
// We need to be able to stop the cache's shared informers, so wrap this in a stoppableCache.
378+
cache := &stoppableCache{
379+
Cache: remoteCache,
380+
cancelFunc: cacheCtxCancel,
381+
}
382+
383+
for _, index := range indexes {
384+
if err := cache.IndexField(ctx, index.Object, index.Field, index.ExtractValue); err != nil {
385+
return nil, nil, errors.Wrapf(err, "error adding index for field %q to cache for remote cluster %q", index.Field, cluster.String())
386+
}
387+
}
388+
409389
// Create the client for the remote cluster
410-
c, err := client.New(config, client.Options{Scheme: t.scheme, Mapper: mapper})
390+
c, err := client.New(config, client.Options{
391+
Scheme: t.scheme,
392+
Mapper: mapper,
393+
HTTPClient: httpClient,
394+
Cache: &client.CacheOptions{
395+
Reader: cache,
396+
DisableFor: t.clientUncachedObjects,
397+
Unstructured: true,
398+
},
399+
})
411400
if err != nil {
412401
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String())
413402
}
414403

415-
return c, mapper, nil
404+
// Start the cache!!!
405+
go cache.Start(cacheCtx) //nolint:errcheck
406+
407+
// Wait until the cache is initially synced
408+
cacheSyncCtx, cacheSyncCtxCancel := context.WithTimeout(ctx, initialCacheSyncTimeout)
409+
defer cacheSyncCtxCancel()
410+
if !cache.WaitForCacheSync(cacheSyncCtx) {
411+
cache.Stop()
412+
return nil, nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err())
413+
}
414+
415+
// Start cluster healthcheck!!!
416+
go t.healthCheckCluster(cacheCtx, &healthCheckInput{
417+
cluster: cluster,
418+
cfg: config,
419+
httpClient: httpClient,
420+
})
421+
422+
return c, cache, nil
416423
}
417424

418425
// deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker.
@@ -486,7 +493,7 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error
486493
}
487494

488495
// Need to create the watch
489-
if err := input.Watcher.Watch(source.NewKindWithCache(input.Kind, accessor.cache), input.EventHandler, input.Predicates...); err != nil {
496+
if err := input.Watcher.Watch(source.Kind(accessor.cache, input.Kind), input.EventHandler, input.Predicates...); err != nil {
490497
return errors.Wrapf(err, "failed to add %s watch on cluster %s: failed to create watch", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
491498
}
492499

@@ -498,6 +505,7 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error
498505
// healthCheckInput provides the input for the healthCheckCluster method.
499506
type healthCheckInput struct {
500507
cluster client.ObjectKey
508+
httpClient *http.Client
501509
cfg *rest.Config
502510
interval time.Duration
503511
requestTimeout time.Duration
@@ -535,9 +543,9 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health
535543
codec := runtime.NoopEncoder{Decoder: scheme.Codecs.UniversalDecoder()}
536544
cfg := rest.CopyConfig(in.cfg)
537545
cfg.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec})
538-
restClient, restClientErr := rest.UnversionedRESTClientFor(cfg)
546+
restClient, restClientErr := rest.UnversionedRESTClientForConfigAndClient(cfg, in.httpClient)
539547

540-
runHealthCheckWithThreshold := func() (bool, error) {
548+
runHealthCheckWithThreshold := func(ctx context.Context) (bool, error) {
541549
if restClientErr != nil {
542550
return false, restClientErr
543551
}
@@ -593,12 +601,12 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health
593601
return false, nil
594602
}
595603

596-
err := wait.PollImmediateUntil(in.interval, runHealthCheckWithThreshold, ctx.Done())
604+
err := wait.PollUntilContextCancel(ctx, in.interval, true, runHealthCheckWithThreshold)
597605
// An error returned implies the health check has failed a sufficient number of
598606
// times for the cluster to be considered unhealthy
599607
// NB. we are ignoring ErrWaitTimeout because this error happens when the channel is close, that in this case
600608
// happens when the cache is explicitly stopped.
601-
if err != nil && err != wait.ErrWaitTimeout {
609+
if err != nil && !wait.Interrupted(err) {
602610
t.log.Error(err, "Error health checking cluster", "Cluster", klog.KRef(in.cluster.Namespace, in.cluster.Name))
603611
t.deleteAccessor(ctx, in.cluster)
604612
}

controllers/remote/cluster_cache_tracker_fake.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,9 @@ func NewTestClusterCacheTracker(log logr.Logger, cl client.Client, scheme *runti
3232
clusterAccessors: make(map[client.ObjectKey]*clusterAccessor),
3333
}
3434

35-
delegatingClient, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
36-
CacheReader: cl,
37-
Client: cl,
38-
})
39-
if err != nil {
40-
panic(err)
41-
}
42-
4335
testCacheTracker.clusterAccessors[objKey] = &clusterAccessor{
4436
cache: nil,
45-
client: delegatingClient,
37+
client: cl,
4638
watches: sets.Set[string]{}.Insert(watchObjects...),
4739
}
4840
return testCacheTracker

controllers/remote/cluster_cache_tracker_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import (
3838
"sigs.k8s.io/cluster-api/util/conditions"
3939
)
4040

41-
func mapper(i client.Object) []reconcile.Request {
41+
func mapper(_ context.Context, i client.Object) []reconcile.Request {
4242
return []reconcile.Request{
4343
{
4444
NamespacedName: types.NamespacedName{

internal/controllers/machineset/machineset_controller.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -782,7 +782,6 @@ func (r *Reconciler) MachineToMachineSets(ctx context.Context, o client.Object)
782782
panic(fmt.Sprintf("Expected a Machine but got a %T", o))
783783
}
784784

785-
// FIXME(sbueringer): This won't log unless the global logger is set
786785
log := ctrl.LoggerFrom(ctx, "Machine", klog.KObj(m))
787786

788787
// Check if the controller reference is already set and

main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
368368

369369
if feature.Gates.Enabled(feature.ClusterTopology) {
370370
unstructuredCachingClient, err := client.New(mgr.GetConfig(), client.Options{
371+
HTTPClient: mgr.GetHTTPClient(),
371372
Cache: &client.CacheOptions{
372373
Reader: mgr.GetCache(),
373374
Unstructured: true,

0 commit comments

Comments
 (0)