@@ -19,15 +19,19 @@ package remote
19
19
import (
20
20
"context"
21
21
"fmt"
22
+ "os"
22
23
"sync"
23
24
"time"
24
25
25
26
"github.com/go-logr/logr"
26
27
"github.com/pkg/errors"
27
28
corev1 "k8s.io/api/core/v1"
28
29
apierrors "k8s.io/apimachinery/pkg/api/errors"
30
+ "k8s.io/apimachinery/pkg/api/meta"
31
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
32
"k8s.io/apimachinery/pkg/runtime"
30
33
"k8s.io/apimachinery/pkg/runtime/serializer"
34
+ "k8s.io/apimachinery/pkg/types"
31
35
"k8s.io/apimachinery/pkg/util/sets"
32
36
"k8s.io/apimachinery/pkg/util/wait"
33
37
"k8s.io/client-go/kubernetes/scheme"
@@ -62,6 +66,12 @@ type ClusterCacheTracker struct {
62
66
lock sync.RWMutex
63
67
clusterAccessors map [client.ObjectKey ]* clusterAccessor
64
68
indexes []Index
69
+
70
+ // controllerPodMetadata is the Pod metadata of the controller using this ClusterCacheTracker.
71
+ // This is only set when the POD_NAMESPACE, POD_NAME and POD_UID environment variables are set.
72
+ // This information will be used to detected if the controller is running on a workload cluster, so
73
+ // that we can then access the apiserver directly.
74
+ controllerPodMetadata * metav1.ObjectMeta
65
75
}
66
76
67
77
// ClusterCacheTrackerOptions defines options to configure
@@ -96,7 +106,23 @@ func setDefaultOptions(opts *ClusterCacheTrackerOptions) {
96
106
func NewClusterCacheTracker (manager ctrl.Manager , options ClusterCacheTrackerOptions ) (* ClusterCacheTracker , error ) {
97
107
setDefaultOptions (& options )
98
108
109
+ var controllerPodMetadata * metav1.ObjectMeta
110
+ podNamespace := os .Getenv ("POD_NAMESPACE" )
111
+ podName := os .Getenv ("POD_NAME" )
112
+ podUID := os .Getenv ("POD_UID" )
113
+ if podNamespace != "" && podName != "" && podUID != "" {
114
+ options .Log .Info ("Found controller pod metadata, the ClusterCacheTracker will try to access the cluster directly when possible" )
115
+ controllerPodMetadata = & metav1.ObjectMeta {
116
+ Namespace : podNamespace ,
117
+ Name : podName ,
118
+ UID : types .UID (podUID ),
119
+ }
120
+ } else {
121
+ options .Log .Info ("Couldn't find controller pod metadata, the ClusterCacheTracker will always access clusters using the regular apiserver endpoint" )
122
+ }
123
+
99
124
return & ClusterCacheTracker {
125
+ controllerPodMetadata : controllerPodMetadata ,
100
126
log : * options .Log ,
101
127
clientUncachedObjects : options .ClientUncachedObjects ,
102
128
client : manager .GetClient (),
@@ -119,11 +145,25 @@ func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.Obje
119
145
return accessor .client , nil
120
146
}
121
147
148
+ // GetRESTConfig returns a cached REST config for the given cluster.
149
+ func (t * ClusterCacheTracker ) GetRESTConfig (ctc context.Context , cluster client.ObjectKey ) (* rest.Config , error ) {
150
+ t .lock .Lock ()
151
+ defer t .lock .Unlock ()
152
+
153
+ accessor , err := t .getClusterAccessorLH (ctc , cluster , t .indexes ... )
154
+ if err != nil {
155
+ return nil , err
156
+ }
157
+
158
+ return accessor .config , nil
159
+ }
160
+
122
161
// clusterAccessor represents the combination of a delegating client, cache, and watches for a remote cluster.
123
162
type clusterAccessor struct {
124
163
cache * stoppableCache
125
164
client client.Client
126
165
watches sets.String
166
+ config * rest.Config
127
167
}
128
168
129
169
// clusterAccessorExists returns true if a clusterAccessor exists for cluster.
@@ -155,22 +195,47 @@ func (t *ClusterCacheTracker) getClusterAccessorLH(ctx context.Context, cluster
155
195
156
196
// newClusterAccessor creates a new clusterAccessor.
157
197
func (t * ClusterCacheTracker ) newClusterAccessor (ctx context.Context , cluster client.ObjectKey , indexes ... Index ) (* clusterAccessor , error ) {
198
+ log := ctrl .LoggerFrom (ctx )
199
+
158
200
// Get a rest config for the remote cluster
159
201
config , err := RESTConfig (ctx , clusterCacheControllerName , t .client , cluster )
160
202
if err != nil {
161
203
return nil , errors .Wrapf (err , "error fetching REST client config for remote cluster %q" , cluster .String ())
162
204
}
163
205
164
- // Create a mapper for it
165
- mapper , err := apiutil . NewDynamicRESTMapper (config )
206
+ // Create a client and a mapper for the cluster.
207
+ c , mapper , err := t . createClient (config , cluster )
166
208
if err != nil {
167
- return nil , errors . Wrapf ( err , "error creating dynamic rest mapper for remote cluster %q" , cluster . String ())
209
+ return nil , err
168
210
}
169
211
170
- // Create the client for the remote cluster
171
- c , err := client . New ( config , client. Options { Scheme : t . scheme , Mapper : mapper } )
212
+ // Detect if the controller is running on the workload cluster.
213
+ runningOnCluster , err := t . runningOnWorkloadCluster ( ctx , c , cluster )
172
214
if err != nil {
173
- return nil , errors .Wrapf (err , "error creating client for remote cluster %q" , cluster .String ())
215
+ return nil , err
216
+ }
217
+
218
+ // If the controller runs on the workload cluster, access the apiserver directly by using the
219
+ // CA and Host from the in-cluster configuration.
220
+ if runningOnCluster {
221
+ inClusterConfig , err := ctrl .GetConfig ()
222
+ if err != nil {
223
+ return nil , errors .Wrap (err , "error creating client for self-hosted cluster" )
224
+ }
225
+
226
+ // Use CA and Host from in-cluster config.
227
+ config .CAData = nil
228
+ config .CAFile = inClusterConfig .CAFile
229
+ config .Host = inClusterConfig .Host
230
+
231
+ // Create a new client and overwrite the previously created client.
232
+ c , mapper , err = t .createClient (config , cluster )
233
+ if err != nil {
234
+ return nil , errors .Wrap (err , "error creating client for self-hosted cluster" )
235
+ }
236
+ log .Info (fmt .Sprintf ("Creating cluster accessor for cluster %q with in-cluster service %q" , cluster .String (), config .Host ))
237
+ } else {
238
+ log .Info (fmt .Sprintf ("Creating cluster accessor for cluster %q with the regular apiserver endpoint %q" , cluster .String (), config .Host ))
174
239
}
175
240
176
241
// Create the cache for the remote cluster
@@ -220,13 +285,59 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
220
285
221
286
return & clusterAccessor {
222
287
cache : cache ,
288
+ config : config ,
223
289
client : delegatingClient ,
224
290
watches : sets .NewString (),
225
291
}, nil
226
292
}
227
293
294
+ // runningOnWorkloadCluster detects if the current controller runs on the workload cluster.
295
+ func (t * ClusterCacheTracker ) runningOnWorkloadCluster (ctx context.Context , c client.Client , cluster client.ObjectKey ) (bool , error ) {
296
+ // Controller Pod metadata was not found, so we can't detect if we run on the workload cluster.
297
+ if t .controllerPodMetadata == nil {
298
+ return false , nil
299
+ }
300
+
301
+ // Try to get the controller pod.
302
+ var pod corev1.Pod
303
+ if err := c .Get (ctx , client.ObjectKey {
304
+ Namespace : t .controllerPodMetadata .Namespace ,
305
+ Name : t .controllerPodMetadata .Name ,
306
+ }, & pod ); err != nil {
307
+ // If the controller pod is not found, we assume we are not running on the workload cluster.
308
+ if apierrors .IsNotFound (err ) {
309
+ return false , nil
310
+ }
311
+
312
+ // If we got another error, we return the error so that this will be retried later.
313
+ return false , errors .Wrapf (err , "error checking if we're running on workload cluster %q" , cluster .String ())
314
+ }
315
+
316
+ // If the uid is the same we found the controller pod on the workload cluster.
317
+ return t .controllerPodMetadata .UID == pod .UID , nil
318
+ }
319
+
320
+ // createClient creates a client and a mapper based on a rest.Config.
321
+ func (t * ClusterCacheTracker ) createClient (config * rest.Config , cluster client.ObjectKey ) (client.Client , meta.RESTMapper , error ) {
322
+ // Create a mapper for it
323
+ mapper , err := apiutil .NewDynamicRESTMapper (config )
324
+ if err != nil {
325
+ return nil , nil , errors .Wrapf (err , "error creating dynamic rest mapper for remote cluster %q" , cluster .String ())
326
+ }
327
+
328
+ // Create the client for the remote cluster
329
+ c , err := client .New (config , client.Options {Scheme : t .scheme , Mapper : mapper })
330
+ if err != nil {
331
+ return nil , nil , errors .Wrapf (err , "error creating client for remote cluster %q" , cluster .String ())
332
+ }
333
+
334
+ return c , mapper , nil
335
+ }
336
+
228
337
// deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker.
229
- func (t * ClusterCacheTracker ) deleteAccessor (cluster client.ObjectKey ) {
338
+ func (t * ClusterCacheTracker ) deleteAccessor (ctx context.Context , cluster client.ObjectKey ) {
339
+ log := ctrl .LoggerFrom (ctx )
340
+
230
341
t .lock .Lock ()
231
342
defer t .lock .Unlock ()
232
343
@@ -235,11 +346,11 @@ func (t *ClusterCacheTracker) deleteAccessor(cluster client.ObjectKey) {
235
346
return
236
347
}
237
348
238
- t . log .V (2 ).Info ("Deleting clusterAccessor" , "cluster" , cluster .String ())
349
+ log .V (2 ).Info ("Deleting clusterAccessor" , "cluster" , cluster .String ())
239
350
240
- t . log .V (4 ).Info ("Stopping cache" , "cluster" , cluster .String ())
351
+ log .V (4 ).Info ("Stopping cache" , "cluster" , cluster .String ())
241
352
a .cache .Stop ()
242
- t . log .V (4 ).Info ("Cache stopped" , "cluster" , cluster .String ())
353
+ log .V (4 ).Info ("Cache stopped" , "cluster" , cluster .String ())
243
354
244
355
delete (t .clusterAccessors , cluster )
245
356
}
@@ -286,7 +397,8 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error
286
397
}
287
398
288
399
if a .watches .Has (input .Name ) {
289
- t .log .V (6 ).Info ("Watch already exists" , "namespace" , input .Cluster .Namespace , "cluster" , input .Cluster .Name , "name" , input .Name )
400
+ log := ctrl .LoggerFrom (ctx )
401
+ log .V (6 ).Info ("Watch already exists" , "namespace" , input .Cluster .Namespace , "cluster" , input .Cluster .Name , "name" , input .Name )
290
402
return nil
291
403
}
292
404
@@ -391,7 +503,8 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health
391
503
// NB. we are ignoring ErrWaitTimeout because this error happens when the channel is close, that in this case
392
504
// happens when the cache is explicitly stopped.
393
505
if err != nil && err != wait .ErrWaitTimeout {
394
- t .log .Error (err , "Error health checking cluster" , "cluster" , in .cluster .String ())
395
- t .deleteAccessor (in .cluster )
506
+ log := ctrl .LoggerFrom (ctx )
507
+ log .Error (err , "Error health checking cluster" , "cluster" , in .cluster .String ())
508
+ t .deleteAccessor (ctx , in .cluster )
396
509
}
397
510
}
0 commit comments