Skip to content

Commit afdcb4c

Browse files
authored
Merge pull request #10826 from sbueringer/pr-improv-cct-error-logging
🌱 Improve CCT error logging
2 parents fd94039 + 7f01628 commit afdcb4c

File tree

2 files changed

+58
-46
lines changed

2 files changed

+58
-46
lines changed

controllers/remote/cluster_cache_healthcheck_test.go

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
. "github.com/onsi/gomega"
2727
corev1 "k8s.io/api/core/v1"
2828
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/apimachinery/pkg/runtime"
30+
"k8s.io/apimachinery/pkg/runtime/serializer"
2931
"k8s.io/client-go/kubernetes/scheme"
3032
"k8s.io/client-go/rest"
3133
ctrl "sigs.k8s.io/controller-runtime"
@@ -132,13 +134,12 @@ func TestClusterCacheHealthCheck(t *testing.T) {
132134
ctx, cancel := context.WithCancel(ctx)
133135
defer cancel()
134136

135-
httpClient, err := rest.HTTPClientFor(env.Config)
137+
restClient, err := getRESTClient(env.Config)
136138
g.Expect(err).ToNot(HaveOccurred())
137139

138140
go cct.healthCheckCluster(ctx, &healthCheckInput{
139141
cluster: testClusterKey,
140-
cfg: env.Config,
141-
httpClient: httpClient,
142+
restClient: restClient,
142143
interval: testPollInterval,
143144
requestTimeout: testPollTimeout,
144145
unhealthyThreshold: testUnhealthyThreshold,
@@ -164,12 +165,12 @@ func TestClusterCacheHealthCheck(t *testing.T) {
164165
g.Expect(cct.clusterLock.TryLock(testClusterKey)).To(BeTrue())
165166
startHealthCheck := time.Now()
166167

167-
httpClient, err := rest.HTTPClientFor(env.Config)
168+
restClient, err := getRESTClient(env.Config)
168169
g.Expect(err).ToNot(HaveOccurred())
170+
169171
cct.healthCheckCluster(ctx, &healthCheckInput{
170172
cluster: testClusterKey,
171-
cfg: env.Config,
172-
httpClient: httpClient,
173+
restClient: restClient,
173174
interval: testPollInterval,
174175
requestTimeout: testPollTimeout,
175176
unhealthyThreshold: testUnhealthyThreshold,
@@ -190,13 +191,13 @@ func TestClusterCacheHealthCheck(t *testing.T) {
190191
ctx, cancel := context.WithCancel(ctx)
191192
defer cancel()
192193

193-
httpClient, err := rest.HTTPClientFor(env.Config)
194+
restClient, err := getRESTClient(env.Config)
194195
g.Expect(err).ToNot(HaveOccurred())
196+
195197
go cct.healthCheckCluster(ctx,
196198
&healthCheckInput{
197199
cluster: testClusterKey,
198-
cfg: env.Config,
199-
httpClient: httpClient,
200+
restClient: restClient,
200201
interval: testPollInterval,
201202
requestTimeout: testPollTimeout,
202203
unhealthyThreshold: testUnhealthyThreshold,
@@ -228,12 +229,12 @@ func TestClusterCacheHealthCheck(t *testing.T) {
228229
config := rest.CopyConfig(env.Config)
229230
config.Host = fmt.Sprintf("http://127.0.0.1:%d", l.Addr().(*net.TCPAddr).Port)
230231

231-
httpClient, err := rest.HTTPClientFor(env.Config)
232+
restClient, err := getRESTClient(config)
232233
g.Expect(err).ToNot(HaveOccurred())
234+
233235
go cct.healthCheckCluster(ctx, &healthCheckInput{
234236
cluster: testClusterKey,
235-
cfg: config,
236-
httpClient: httpClient,
237+
restClient: restClient,
237238
interval: testPollInterval,
238239
requestTimeout: testPollTimeout,
239240
unhealthyThreshold: testUnhealthyThreshold,
@@ -248,3 +249,15 @@ func TestClusterCacheHealthCheck(t *testing.T) {
248249
})
249250
})
250251
}
252+
253+
func getRESTClient(config *rest.Config) (*rest.RESTClient, error) {
254+
httpClient, err := rest.HTTPClientFor(config)
255+
if err != nil {
256+
return nil, err
257+
}
258+
259+
codec := runtime.NoopEncoder{Decoder: scheme.Codecs.UniversalDecoder()}
260+
restClientConfig := rest.CopyConfig(config)
261+
restClientConfig.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec})
262+
return rest.UnversionedRESTClientForConfigAndClient(restClientConfig, httpClient)
263+
}

controllers/remote/cluster_cache_tracker.go

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOpt
183183
func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error) {
184184
accessor, err := t.getClusterAccessor(ctx, cluster)
185185
if err != nil {
186-
return nil, err
186+
return nil, errors.Wrapf(err, "failed to get client")
187187
}
188188

189189
return accessor.client, nil
@@ -198,7 +198,7 @@ func (t *ClusterCacheTracker) GetReader(ctx context.Context, cluster client.Obje
198198
func (t *ClusterCacheTracker) GetRESTConfig(ctc context.Context, cluster client.ObjectKey) (*rest.Config, error) {
199199
accessor, err := t.getClusterAccessor(ctc, cluster)
200200
if err != nil {
201-
return nil, err
201+
return nil, errors.Wrapf(err, "failed to get REST config")
202202
}
203203

204204
return accessor.config, nil
@@ -208,7 +208,7 @@ func (t *ClusterCacheTracker) GetRESTConfig(ctc context.Context, cluster client.
208208
func (t *ClusterCacheTracker) GetEtcdClientCertificateKey(ctx context.Context, cluster client.ObjectKey) (*rsa.PrivateKey, error) {
209209
accessor, err := t.getClusterAccessor(ctx, cluster)
210210
if err != nil {
211-
return nil, err
211+
return nil, errors.Wrapf(err, "failed to get etcd client certificate key")
212212
}
213213

214214
return accessor.etcdClientCertificateKey, nil
@@ -267,7 +267,7 @@ func (t *ClusterCacheTracker) getClusterAccessor(ctx context.Context, cluster cl
267267
// for the cluster at the same time.
268268
// Return an error if another go routine already tries to create a clusterAccessor.
269269
if ok := t.clusterLock.TryLock(cluster); !ok {
270-
return nil, errors.Wrapf(ErrClusterLocked, "failed to create cluster accessor: failed to get lock for cluster")
270+
return nil, errors.Wrapf(ErrClusterLocked, "failed to create cluster accessor: failed to get lock for cluster (probably because another worker is trying to create the client at the moment)")
271271
}
272272
defer t.clusterLock.Unlock(cluster)
273273

@@ -305,7 +305,7 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
305305
}
306306

307307
// Create a http client and a mapper for the cluster.
308-
httpClient, mapper, err := t.createHTTPClientAndMapper(config, cluster)
308+
httpClient, mapper, restClient, err := t.createHTTPClientAndMapper(ctx, config, cluster)
309309
if err != nil {
310310
return nil, errors.Wrapf(err, "error creating http client and mapper for remote cluster %q", cluster.String())
311311
}
@@ -337,7 +337,7 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
337337
config.Host = inClusterConfig.Host
338338

339339
// Update the http client and the mapper to use in-cluster config.
340-
httpClient, mapper, err = t.createHTTPClientAndMapper(config, cluster)
340+
httpClient, mapper, restClient, err = t.createHTTPClientAndMapper(ctx, config, cluster)
341341
if err != nil {
342342
return nil, errors.Wrapf(err, "error creating http client and mapper (using in-cluster config) for remote cluster %q", cluster.String())
343343
}
@@ -348,7 +348,7 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
348348
}
349349

350350
// Create a client and a cache for the cluster.
351-
cachedClient, err := t.createCachedClient(ctx, config, cluster, httpClient, mapper)
351+
cachedClient, err := t.createCachedClient(ctx, config, cluster, httpClient, restClient, mapper)
352352
if err != nil {
353353
return nil, err
354354
}
@@ -397,28 +397,40 @@ func (t *ClusterCacheTracker) runningOnWorkloadCluster(ctx context.Context, c cl
397397
}
398398

399399
// createHTTPClientAndMapper creates a http client and a dynamic rest mapper for the given cluster, based on the rest.Config.
400-
func (t *ClusterCacheTracker) createHTTPClientAndMapper(config *rest.Config, cluster client.ObjectKey) (*http.Client, meta.RESTMapper, error) {
400+
func (t *ClusterCacheTracker) createHTTPClientAndMapper(ctx context.Context, config *rest.Config, cluster client.ObjectKey) (*http.Client, meta.RESTMapper, *rest.RESTClient, error) {
401401
// Create a http client for the cluster.
402402
httpClient, err := rest.HTTPClientFor(config)
403403
if err != nil {
404-
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String())
404+
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String())
405405
}
406406

407407
// Create a mapper for it
408408
mapper, err := apiutil.NewDynamicRESTMapper(config, httpClient)
409409
if err != nil {
410-
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String())
410+
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String())
411+
}
412+
413+
// Create a REST client for the cluster (this is later used for health checking as well).
414+
codec := runtime.NoopEncoder{Decoder: scheme.Codecs.UniversalDecoder()}
415+
restClientConfig := rest.CopyConfig(config)
416+
restClientConfig.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec})
417+
restClient, err := rest.UnversionedRESTClientForConfigAndClient(restClientConfig, httpClient)
418+
if err != nil {
419+
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating REST client", cluster.String())
420+
}
421+
422+
// Note: This checks if the apiserver is up. We do this already here to produce a clearer error message if the cluster is unreachable.
423+
if _, err := restClient.Get().AbsPath("/").Timeout(healthCheckRequestTimeout).DoRaw(ctx); err != nil {
424+
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: cluster is not reachable", cluster.String())
411425
}
412426

413427
// Verify if we can get a rest mapping from the workload cluster apiserver.
414-
// Note: This also checks if the apiserver is up in general. We do this already here
415-
// to avoid further effort creating a cache and a client and to produce a clearer error message.
416428
_, err = mapper.RESTMapping(corev1.SchemeGroupVersion.WithKind("Node").GroupKind(), corev1.SchemeGroupVersion.Version)
417429
if err != nil {
418-
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String())
430+
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String())
419431
}
420432

421-
return httpClient, mapper, nil
433+
return httpClient, mapper, restClient, nil
422434
}
423435

424436
// createUncachedClient creates an uncached client for the given cluster, based on the rest.Config.
@@ -442,7 +454,7 @@ type cachedClientOutput struct {
442454
}
443455

444456
// createCachedClient creates a cached client for the given cluster, based on a rest.Config.
445-
func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, httpClient *http.Client, mapper meta.RESTMapper) (*cachedClientOutput, error) {
457+
func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, httpClient *http.Client, restClient *rest.RESTClient, mapper meta.RESTMapper) (*cachedClientOutput, error) {
446458
// Create the cache for the remote cluster
447459
cacheOptions := cache.Options{
448460
HTTPClient: httpClient,
@@ -504,8 +516,7 @@ func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *re
504516
// Start cluster healthcheck!!!
505517
go t.healthCheckCluster(cacheCtx, &healthCheckInput{
506518
cluster: cluster,
507-
cfg: config,
508-
httpClient: httpClient,
519+
restClient: restClient,
509520
})
510521

511522
return &cachedClientOutput{
@@ -568,13 +579,13 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error
568579

569580
accessor, err := t.getClusterAccessor(ctx, input.Cluster)
570581
if err != nil {
571-
return errors.Wrapf(err, "failed to add %s watch on cluster %s", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
582+
return errors.Wrapf(err, "failed to add %T watch on cluster %s", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
572583
}
573584

574585
// We have to lock the cluster, so that the watch is not created multiple times in parallel.
575586
ok := t.clusterLock.TryLock(input.Cluster)
576587
if !ok {
577-
return errors.Wrapf(ErrClusterLocked, "failed to add %T watch on cluster %s: failed to get lock for cluster", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
588+
return errors.Wrapf(ErrClusterLocked, "failed to add %T watch on cluster %s: failed to get lock for cluster (probably because another worker is trying to create the client at the moment)", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
578589
}
579590
defer t.clusterLock.Unlock(input.Cluster)
580591

@@ -586,7 +597,7 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error
586597

587598
// Need to create the watch
588599
if err := input.Watcher.Watch(source.Kind(accessor.cache, input.Kind, input.EventHandler, input.Predicates...)); err != nil {
589-
return errors.Wrapf(err, "failed to add %s watch on cluster %s: failed to create watch", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
600+
return errors.Wrapf(err, "failed to add %T watch on cluster %s: failed to create watch", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
590601
}
591602

592603
accessor.watches.Insert(input.Name)
@@ -597,8 +608,7 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error
597608
// healthCheckInput provides the input for the healthCheckCluster method.
598609
type healthCheckInput struct {
599610
cluster client.ObjectKey
600-
httpClient *http.Client
601-
cfg *rest.Config
611+
restClient *rest.RESTClient
602612
interval time.Duration
603613
requestTimeout time.Duration
604614
unhealthyThreshold int
@@ -630,18 +640,7 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health
630640

631641
unhealthyCount := 0
632642

633-
// This gets us a client that can make raw http(s) calls to the remote apiserver. We only need to create it once
634-
// and we can reuse it inside the polling loop.
635-
codec := runtime.NoopEncoder{Decoder: scheme.Codecs.UniversalDecoder()}
636-
cfg := rest.CopyConfig(in.cfg)
637-
cfg.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec})
638-
restClient, restClientErr := rest.UnversionedRESTClientForConfigAndClient(cfg, in.httpClient)
639-
640643
runHealthCheckWithThreshold := func(ctx context.Context) (bool, error) {
641-
if restClientErr != nil {
642-
return false, restClientErr
643-
}
644-
645644
cluster := &clusterv1.Cluster{}
646645
if err := t.client.Get(ctx, in.cluster, cluster); err != nil {
647646
if apierrors.IsNotFound(err) {
@@ -672,7 +671,7 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health
672671

673672
// An error here means there was either an issue connecting or the API returned an error.
674673
// If no error occurs, reset the unhealthy counter.
675-
_, err := restClient.Get().AbsPath(in.path).Timeout(in.requestTimeout).DoRaw(ctx)
674+
_, err := in.restClient.Get().AbsPath(in.path).Timeout(in.requestTimeout).DoRaw(ctx)
676675
if err != nil {
677676
if apierrors.IsUnauthorized(err) {
678677
// Unauthorized means that the underlying kubeconfig is not authorizing properly anymore, which

0 commit comments

Comments
 (0)