@@ -19,15 +19,19 @@ package remote
1919import (
2020 "context"
2121 "fmt"
22+ "os"
2223 "sync"
2324 "time"
2425
2526 "github.com/go-logr/logr"
2627 "github.com/pkg/errors"
2728 corev1 "k8s.io/api/core/v1"
2829 apierrors "k8s.io/apimachinery/pkg/api/errors"
30+ "k8s.io/apimachinery/pkg/api/meta"
31+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2932 "k8s.io/apimachinery/pkg/runtime"
3033 "k8s.io/apimachinery/pkg/runtime/serializer"
34+ "k8s.io/apimachinery/pkg/types"
3135 "k8s.io/apimachinery/pkg/util/sets"
3236 "k8s.io/apimachinery/pkg/util/wait"
3337 "k8s.io/client-go/kubernetes/scheme"
@@ -62,6 +66,12 @@ type ClusterCacheTracker struct {
6266 lock sync.RWMutex
6367 clusterAccessors map [client.ObjectKey ]* clusterAccessor
6468 indexes []Index
69+
70+ // controllerPodMetadata is the Pod metadata of the controller using this ClusterCacheTracker.
71+ // This is only set when the POD_NAMESPACE, POD_NAME and POD_UID environment variables are set.
72+ // This information will be used to detected if the controller is running on a workload cluster, so
73+ // that we can then access the apiserver directly.
74+ controllerPodMetadata * metav1.ObjectMeta
6575}
6676
6777// ClusterCacheTrackerOptions defines options to configure
@@ -96,7 +106,23 @@ func setDefaultOptions(opts *ClusterCacheTrackerOptions) {
96106func NewClusterCacheTracker (manager ctrl.Manager , options ClusterCacheTrackerOptions ) (* ClusterCacheTracker , error ) {
97107 setDefaultOptions (& options )
98108
109+ var controllerPodMetadata * metav1.ObjectMeta
110+ podNamespace := os .Getenv ("POD_NAMESPACE" )
111+ podName := os .Getenv ("POD_NAME" )
112+ podUID := os .Getenv ("POD_UID" )
113+ if podNamespace != "" && podName != "" && podUID != "" {
114+ options .Log .Info ("Found controller pod metadata, the ClusterCacheTracker will try to access the cluster directly when possible" )
115+ controllerPodMetadata = & metav1.ObjectMeta {
116+ Namespace : podNamespace ,
117+ Name : podName ,
118+ UID : types .UID (podUID ),
119+ }
120+ } else {
121+ options .Log .Info ("Couldn't find controller pod metadata, the ClusterCacheTracker will always access clusters using the regular apiserver endpoint" )
122+ }
123+
99124 return & ClusterCacheTracker {
125+ controllerPodMetadata : controllerPodMetadata ,
100126 log : * options .Log ,
101127 clientUncachedObjects : options .ClientUncachedObjects ,
102128 client : manager .GetClient (),
@@ -119,11 +145,25 @@ func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.Obje
119145 return accessor .client , nil
120146}
121147
148+ // GetRESTConfig returns a cached REST config for the given cluster.
149+ func (t * ClusterCacheTracker ) GetRESTConfig (ctc context.Context , cluster client.ObjectKey ) (* rest.Config , error ) {
150+ t .lock .Lock ()
151+ defer t .lock .Unlock ()
152+
153+ accessor , err := t .getClusterAccessorLH (ctc , cluster , t .indexes ... )
154+ if err != nil {
155+ return nil , err
156+ }
157+
158+ return accessor .config , nil
159+ }
160+
122161// clusterAccessor represents the combination of a delegating client, cache, and watches for a remote cluster.
123162type clusterAccessor struct {
124163 cache * stoppableCache
125164 client client.Client
126165 watches sets.String
166+ config * rest.Config
127167}
128168
129169// clusterAccessorExists returns true if a clusterAccessor exists for cluster.
@@ -155,22 +195,47 @@ func (t *ClusterCacheTracker) getClusterAccessorLH(ctx context.Context, cluster
155195
156196// newClusterAccessor creates a new clusterAccessor.
157197func (t * ClusterCacheTracker ) newClusterAccessor (ctx context.Context , cluster client.ObjectKey , indexes ... Index ) (* clusterAccessor , error ) {
198+ log := ctrl .LoggerFrom (ctx )
199+
158200 // Get a rest config for the remote cluster
159201 config , err := RESTConfig (ctx , clusterCacheControllerName , t .client , cluster )
160202 if err != nil {
161203 return nil , errors .Wrapf (err , "error fetching REST client config for remote cluster %q" , cluster .String ())
162204 }
163205
164- // Create a mapper for it
165- mapper , err := apiutil . NewDynamicRESTMapper (config )
206+ // Create a client and a mapper for the cluster.
207+ c , mapper , err := t . createClient (config , cluster )
166208 if err != nil {
167- return nil , errors . Wrapf ( err , "error creating dynamic rest mapper for remote cluster %q" , cluster . String ())
209+ return nil , err
168210 }
169211
170- // Create the client for the remote cluster
171- c , err := client . New ( config , client. Options { Scheme : t . scheme , Mapper : mapper } )
212+ // Detect if the controller is running on the workload cluster.
213+ runningOnCluster , err := t . runningOnWorkloadCluster ( ctx , c , cluster )
172214 if err != nil {
173- return nil , errors .Wrapf (err , "error creating client for remote cluster %q" , cluster .String ())
215+ return nil , err
216+ }
217+
218+ // If the controller runs on the workload cluster, access the apiserver directly by using the
219+ // CA and Host from the in-cluster configuration.
220+ if runningOnCluster {
221+ inClusterConfig , err := ctrl .GetConfig ()
222+ if err != nil {
223+ return nil , errors .Wrap (err , "error creating client for self-hosted cluster" )
224+ }
225+
226+ // Use CA and Host from in-cluster config.
227+ config .CAData = nil
228+ config .CAFile = inClusterConfig .CAFile
229+ config .Host = inClusterConfig .Host
230+
231+ // Create a new client and overwrite the previously created client.
232+ c , mapper , err = t .createClient (config , cluster )
233+ if err != nil {
234+ return nil , errors .Wrap (err , "error creating client for self-hosted cluster" )
235+ }
236+ log .Info (fmt .Sprintf ("Creating cluster accessor for cluster %q with in-cluster service %q" , cluster .String (), config .Host ))
237+ } else {
238+ log .Info (fmt .Sprintf ("Creating cluster accessor for cluster %q with the regular apiserver endpoint %q" , cluster .String (), config .Host ))
174239 }
175240
176241 // Create the cache for the remote cluster
@@ -220,13 +285,59 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
220285
221286 return & clusterAccessor {
222287 cache : cache ,
288+ config : config ,
223289 client : delegatingClient ,
224290 watches : sets .NewString (),
225291 }, nil
226292}
227293
294+ // runningOnWorkloadCluster detects if the current controller runs on the workload cluster.
295+ func (t * ClusterCacheTracker ) runningOnWorkloadCluster (ctx context.Context , c client.Client , cluster client.ObjectKey ) (bool , error ) {
296+ // Controller Pod metadata was not found, so we can't detect if we run on the workload cluster.
297+ if t .controllerPodMetadata == nil {
298+ return false , nil
299+ }
300+
301+ // Try to get the controller pod.
302+ var pod corev1.Pod
303+ if err := c .Get (ctx , client.ObjectKey {
304+ Namespace : t .controllerPodMetadata .Namespace ,
305+ Name : t .controllerPodMetadata .Name ,
306+ }, & pod ); err != nil {
307+ // If the controller pod is not found, we assume we are not running on the workload cluster.
308+ if apierrors .IsNotFound (err ) {
309+ return false , nil
310+ }
311+
312+ // If we got another error, we return the error so that this will be retried later.
313+ return false , errors .Wrapf (err , "error checking if we're running on workload cluster %q" , cluster .String ())
314+ }
315+
316+ // If the uid is the same we found the controller pod on the workload cluster.
317+ return t .controllerPodMetadata .UID == pod .UID , nil
318+ }
319+
320+ // createClient creates a client and a mapper based on a rest.Config.
321+ func (t * ClusterCacheTracker ) createClient (config * rest.Config , cluster client.ObjectKey ) (client.Client , meta.RESTMapper , error ) {
322+ // Create a mapper for it
323+ mapper , err := apiutil .NewDynamicRESTMapper (config )
324+ if err != nil {
325+ return nil , nil , errors .Wrapf (err , "error creating dynamic rest mapper for remote cluster %q" , cluster .String ())
326+ }
327+
328+ // Create the client for the remote cluster
329+ c , err := client .New (config , client.Options {Scheme : t .scheme , Mapper : mapper })
330+ if err != nil {
331+ return nil , nil , errors .Wrapf (err , "error creating client for remote cluster %q" , cluster .String ())
332+ }
333+
334+ return c , mapper , nil
335+ }
336+
228337// deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker.
229- func (t * ClusterCacheTracker ) deleteAccessor (cluster client.ObjectKey ) {
338+ func (t * ClusterCacheTracker ) deleteAccessor (ctx context.Context , cluster client.ObjectKey ) {
339+ log := ctrl .LoggerFrom (ctx )
340+
230341 t .lock .Lock ()
231342 defer t .lock .Unlock ()
232343
@@ -235,11 +346,11 @@ func (t *ClusterCacheTracker) deleteAccessor(cluster client.ObjectKey) {
235346 return
236347 }
237348
238- t . log .V (2 ).Info ("Deleting clusterAccessor" , "cluster" , cluster .String ())
349+ log .V (2 ).Info ("Deleting clusterAccessor" , "cluster" , cluster .String ())
239350
240- t . log .V (4 ).Info ("Stopping cache" , "cluster" , cluster .String ())
351+ log .V (4 ).Info ("Stopping cache" , "cluster" , cluster .String ())
241352 a .cache .Stop ()
242- t . log .V (4 ).Info ("Cache stopped" , "cluster" , cluster .String ())
353+ log .V (4 ).Info ("Cache stopped" , "cluster" , cluster .String ())
243354
244355 delete (t .clusterAccessors , cluster )
245356}
@@ -286,7 +397,8 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error
286397 }
287398
288399 if a .watches .Has (input .Name ) {
289- t .log .V (6 ).Info ("Watch already exists" , "namespace" , input .Cluster .Namespace , "cluster" , input .Cluster .Name , "name" , input .Name )
400+ log := ctrl .LoggerFrom (ctx )
401+ log .V (6 ).Info ("Watch already exists" , "namespace" , input .Cluster .Namespace , "cluster" , input .Cluster .Name , "name" , input .Name )
290402 return nil
291403 }
292404
@@ -391,7 +503,8 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health
391503 // NB. we are ignoring ErrWaitTimeout because this error happens when the channel is close, that in this case
392504 // happens when the cache is explicitly stopped.
393505 if err != nil && err != wait .ErrWaitTimeout {
394- t .log .Error (err , "Error health checking cluster" , "cluster" , in .cluster .String ())
395- t .deleteAccessor (in .cluster )
506+ log := ctrl .LoggerFrom (ctx )
507+ log .Error (err , "Error health checking cluster" , "cluster" , in .cluster .String ())
508+ t .deleteAccessor (ctx , in .cluster )
396509 }
397510}
0 commit comments