Skip to content

Commit 840d4ab

Browse files
committed
Add clustercache for worklaod clusters connections
Signed-off-by: apedriza <adripedriza@gmail.com>
1 parent bf5fa86 commit 840d4ab

13 files changed

+144
-69
lines changed

cmd/main.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
_ "k8s.io/client-go/plugin/pkg/client/auth"
3737
"k8s.io/client-go/rest"
3838
clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
39+
"sigs.k8s.io/cluster-api/controllers/clustercache"
3940
"sigs.k8s.io/cluster-api/util/flags"
4041
ctrl "sigs.k8s.io/controller-runtime"
4142
"sigs.k8s.io/controller-runtime/pkg/cache"
@@ -254,10 +255,33 @@ func main() {
254255
MaxConcurrentReconciles: concurrency,
255256
}
256257

258+
ctx := ctrl.SetupSignalHandler()
259+
260+
clusterCache, err := clustercache.SetupWithManager(ctx, mgr, clustercache.Options{
261+
SecretClient: secretCachingClient,
262+
Cache: clustercache.CacheOptions{},
263+
Client: clustercache.ClientOptions{
264+
UserAgent: "k0smotron",
265+
Cache: clustercache.ClientCacheOptions{
266+
DisableFor: []client.Object{
267+
// Don't cache ConfigMaps & Secrets.
268+
&corev1.ConfigMap{},
269+
&corev1.Secret{},
270+
},
271+
},
272+
},
273+
}, ctrlOptions)
274+
if err != nil {
275+
setupLog.Error(err, "Unable to create ClusterCache")
276+
os.Exit(1)
277+
}
278+
257279
if isControllerEnabled(bootstrapController) && runCAPIControllers {
280+
258281
if err = (&bootstrap.Controller{
259282
Client: mgr.GetClient(),
260283
SecretCachingClient: secretCachingClient,
284+
ClusterCache: clusterCache,
261285
Scheme: mgr.GetScheme(),
262286
ClientSet: clientSet,
263287
RESTConfig: restConfig,
@@ -268,6 +292,7 @@ func main() {
268292
if err = (&bootstrap.ControlPlaneController{
269293
Client: mgr.GetClient(),
270294
SecretCachingClient: secretCachingClient,
295+
ClusterCache: clusterCache,
271296
Scheme: mgr.GetScheme(),
272297
ClientSet: clientSet,
273298
RESTConfig: restConfig,
@@ -290,6 +315,7 @@ func main() {
290315
if err = (&controlplane.K0smotronController{
291316
Client: mgr.GetClient(),
292317
SecretCachingClient: secretCachingClient,
318+
ClusterCache: clusterCache,
293319
Scheme: mgr.GetScheme(),
294320
ClientSet: clientSet,
295321
RESTConfig: restConfig,
@@ -301,6 +327,7 @@ func main() {
301327
if err = (&controlplane.K0sController{
302328
Client: mgr.GetClient(),
303329
SecretCachingClient: secretCachingClient,
330+
ClusterCache: clusterCache,
304331
ClientSet: clientSet,
305332
RESTConfig: restConfig,
306333
}).SetupWithManager(mgr, ctrlOptions); err != nil {
@@ -348,9 +375,10 @@ func main() {
348375
// as both of them create/update Machines.
349376
if runCAPIControllers && (isControllerEnabled(infrastructureController) || isControllerEnabled(bootstrapController)) {
350377
if err = (&bootstrap.ProviderIDController{
351-
Client: mgr.GetClient(),
352-
Scheme: mgr.GetScheme(),
353-
ClientSet: clientSet,
378+
Client: mgr.GetClient(),
379+
Scheme: mgr.GetScheme(),
380+
ClientSet: clientSet,
381+
ClusterCache: clusterCache,
354382
}).SetupWithManager(mgr, ctrlOptions); err != nil {
355383
setupLog.Error(err, "unable to create controller", "controller", "ProviderID")
356384
os.Exit(1)
@@ -367,7 +395,7 @@ func main() {
367395
}
368396

369397
setupLog.Info("starting manager")
370-
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
398+
if err := mgr.Start(ctx); err != nil {
371399
setupLog.Error(err, "problem running manager")
372400
os.Exit(1)
373401
}

internal/controller/bootstrap/controlplane_bootstrap_controller.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ import (
4141
kubeadmbootstrapv1 "sigs.k8s.io/cluster-api/api/bootstrap/kubeadm/v1beta2"
4242
clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
4343
bsutil "sigs.k8s.io/cluster-api/bootstrap/util"
44+
"sigs.k8s.io/cluster-api/controllers/clustercache"
4445
"sigs.k8s.io/cluster-api/controllers/external"
45-
"sigs.k8s.io/cluster-api/controllers/remote"
4646
capiutil "sigs.k8s.io/cluster-api/util"
4747
"sigs.k8s.io/cluster-api/util/annotations"
4848
"sigs.k8s.io/cluster-api/util/collections"
@@ -66,6 +66,7 @@ import (
6666
type ControlPlaneController struct {
6767
client.Client
6868
SecretCachingClient client.Client
69+
ClusterCache clustercache.ClusterCache
6970
Scheme *runtime.Scheme
7071
ClientSet *kubernetes.Clientset
7172
RESTConfig *rest.Config
@@ -461,7 +462,7 @@ func (c *ControlPlaneController) genControlPlaneJoinFiles(ctx context.Context, s
461462
token := fmt.Sprintf("%s.%s", tokenID, tokenSecret)
462463
tokenKubeSecret := createTokenSecret(tokenID, tokenSecret)
463464

464-
chCS, err := remote.NewClusterClient(ctx, "k0smotron", c.Client, capiutil.ObjectKey(scope.Cluster))
465+
chCS, err := c.ClusterCache.GetClient(ctx, client.ObjectKeyFromObject(scope.Cluster))
465466
if err != nil {
466467
log.Error(err, "Failed to getting child cluster client set")
467468
return nil, err

internal/controller/bootstrap/providerid_controller.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"k8s.io/client-go/kubernetes"
1515
"k8s.io/client-go/util/retry"
1616
clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
17+
"sigs.k8s.io/cluster-api/controllers/clustercache"
1718
capiutil "sigs.k8s.io/cluster-api/util"
1819
ctrl "sigs.k8s.io/controller-runtime"
1920
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -26,8 +27,9 @@ import (
2627
// ProviderIDController is responsible for reconciling the ProviderID field of the Machine resource.
2728
type ProviderIDController struct {
2829
client.Client
29-
Scheme *runtime.Scheme
30-
ClientSet *kubernetes.Clientset
30+
Scheme *runtime.Scheme
31+
ClientSet *kubernetes.Clientset
32+
ClusterCache clustercache.ClusterCache
3133
}
3234

3335
// Reconcile reconciles the ProviderID field of the Machine resource and
@@ -67,7 +69,7 @@ func (p *ProviderIDController) Reconcile(ctx context.Context, req ctrl.Request)
6769
return ctrl.Result{}, fmt.Errorf("can't get cluster %s/%s: %w", machine.Namespace, machine.Spec.ClusterName, err)
6870
}
6971

70-
childClient, err := k0smoutil.GetKubeClient(context.Background(), p.Client, cluster)
72+
childClient, err := p.getWorkloadClusterKubeClient(context.Background(), cluster)
7173
if err != nil {
7274
return ctrl.Result{}, fmt.Errorf("can't get kube client for cluster %s/%s: %w. may not be created yet", machine.Namespace, machine.Spec.ClusterName, err)
7375
}
@@ -130,6 +132,15 @@ func (p *ProviderIDController) Reconcile(ctx context.Context, req ctrl.Request)
130132
return ctrl.Result{}, nil
131133
}
132134

135+
func (p *ProviderIDController) getWorkloadClusterKubeClient(ctx context.Context, cluster *clusterv1.Cluster) (*kubernetes.Clientset, error) {
136+
restConfig, err := p.ClusterCache.GetRESTConfig(ctx, client.ObjectKeyFromObject(cluster))
137+
if err != nil {
138+
return nil, fmt.Errorf("failed to get REST config: %w", err)
139+
}
140+
141+
return k0smoutil.GetKubeClient(restConfig)
142+
}
143+
133144
// SetupWithManager sets up the controller with the Manager.
134145
func (p *ProviderIDController) SetupWithManager(mgr ctrl.Manager, opts controller.Options) error {
135146
apiResources, err := p.ClientSet.Discovery().ServerResourcesForGroupVersion(clusterv1.GroupVersion.String())

internal/controller/bootstrap/worker_bootstrap_controller.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ import (
3434
"k8s.io/utils/ptr"
3535
clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
3636
bsutil "sigs.k8s.io/cluster-api/bootstrap/util"
37+
"sigs.k8s.io/cluster-api/controllers/clustercache"
3738
"sigs.k8s.io/cluster-api/controllers/external"
38-
"sigs.k8s.io/cluster-api/controllers/remote"
3939
capiutil "sigs.k8s.io/cluster-api/util"
4040
"sigs.k8s.io/cluster-api/util/annotations"
4141
"sigs.k8s.io/cluster-api/util/conditions"
@@ -66,6 +66,7 @@ const (
6666
type Controller struct {
6767
client.Client
6868
SecretCachingClient client.Client
69+
ClusterCache clustercache.ClusterCache
6970
Scheme *runtime.Scheme
7071
ClientSet *kubernetes.Clientset
7172
RESTConfig *rest.Config
@@ -357,10 +358,10 @@ func (r *Controller) generateBootstrapDataForWorker(ctx context.Context, log log
357358

358359
func (r *Controller) getK0sToken(ctx context.Context, scope *Scope) (string, error) {
359360
// Check if the workload cluster client is already set. This client is used for testing purposes to inject a fake client.
360-
client := r.workloadClusterClient
361-
if client == nil {
361+
wcClient := r.workloadClusterClient
362+
if wcClient == nil {
362363
var err error
363-
client, err = remote.NewClusterClient(ctx, "k0smotron", r.Client, capiutil.ObjectKey(scope.Cluster))
364+
wcClient, err = r.ClusterCache.GetClient(ctx, client.ObjectKeyFromObject(scope.Cluster))
364365
if err != nil {
365366
return "", fmt.Errorf("failed to create child cluster client: %w", err)
366367
}
@@ -370,7 +371,7 @@ func (r *Controller) getK0sToken(ctx context.Context, scope *Scope) (string, err
370371
tokenID := kutil.RandomString(6)
371372
tokenSecret := kutil.RandomString(16)
372373
token := fmt.Sprintf("%s.%s", tokenID, tokenSecret)
373-
if err := client.Create(ctx, &corev1.Secret{
374+
if err := wcClient.Create(ctx, &corev1.Secret{
374375
ObjectMeta: metav1.ObjectMeta{
375376
Name: fmt.Sprintf("bootstrap-token-%s", tokenID),
376377
Namespace: "kube-system",

internal/controller/controlplane/helper.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -503,10 +503,11 @@ func (c *K0sController) markChildControlNodeToLeave(ctx context.Context, name st
503503
}
504504

505505
func (c *K0sController) deleteOldControlNodes(ctx context.Context, cluster *clusterv1.Cluster) error {
506-
kubeClient, err := c.getKubeClient(ctx, cluster)
506+
kubeClient, err := c.getWorkloadClusterClientset(ctx, cluster)
507507
if err != nil {
508-
return fmt.Errorf("error getting kube client: %w", err)
508+
return fmt.Errorf("error getting workload cluster client: %w", err)
509509
}
510+
510511
machines, err := collections.GetFilteredMachinesForCluster(ctx, c, cluster, collections.ControlPlaneMachines(cluster.Name))
511512
if err != nil {
512513
return fmt.Errorf("error getting all machines: %w", err)

internal/controller/controlplane/k0s_controlplane_controller.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
"k8s.io/utils/ptr"
4646
kubeadmbootstrapv1 "sigs.k8s.io/cluster-api/api/bootstrap/kubeadm/v1beta2"
4747
clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
48+
"sigs.k8s.io/cluster-api/controllers/clustercache"
4849
capiutil "sigs.k8s.io/cluster-api/util"
4950
"sigs.k8s.io/cluster-api/util/annotations"
5051
"sigs.k8s.io/cluster-api/util/certs"
@@ -88,6 +89,7 @@ var (
8889
type K0sController struct {
8990
client.Client
9091
SecretCachingClient client.Client
92+
ClusterCache clustercache.ClusterCache
9193
ClientSet *kubernetes.Clientset
9294
RESTConfig *rest.Config
9395
// workloadClusterKubeClient is used during testing to inject a fake client
@@ -177,7 +179,7 @@ func (c *K0sController) Reconcile(ctx context.Context, req ctrl.Request) (res ct
177179
// Separate var for status update errors to avoid shadowing err
178180
derr = c.updateStatus(ctx, kcp, cluster)
179181
if derr != nil {
180-
if !errors.Is(derr, errUpgradeNotCompleted) {
182+
if !errors.Is(derr, errUpgradeNotCompleted) && !errors.Is(derr, ErrNotReady) {
181183
log.Error(derr, "Failed to update status")
182184
return
183185
}
@@ -474,7 +476,7 @@ func (c *K0sController) reconcileMachines(ctx context.Context, cluster *clusterv
474476
}
475477
}
476478
} else {
477-
kubeClient, err := c.getKubeClient(ctx, cluster)
479+
kubeClient, err := c.getWorkloadClusterClientset(ctx, cluster)
478480
if err != nil {
479481
return fmt.Errorf("error getting cluster client set for machine update: %w", err)
480482
}
@@ -597,7 +599,7 @@ func (c *K0sController) deleteK0sNodeResources(ctx context.Context, cluster *clu
597599
logger := log.FromContext(ctx)
598600

599601
if kcp.Status.Ready {
600-
kubeClient, err := c.getKubeClient(ctx, cluster)
602+
kubeClient, err := c.getWorkloadClusterClientset(ctx, cluster)
601603
if err != nil {
602604
return fmt.Errorf("error getting cluster client set for deletion: %w", err)
603605
}
@@ -664,7 +666,7 @@ func (c *K0sController) createBootstrapConfig(ctx context.Context, name string,
664666
}
665667

666668
func (c *K0sController) checkMachineIsReady(ctx context.Context, machineName string, cluster *clusterv1.Cluster) error {
667-
kubeClient, err := c.getKubeClient(ctx, cluster)
669+
kubeClient, err := c.getWorkloadClusterClientset(ctx, cluster)
668670
if err != nil {
669671
return fmt.Errorf("error getting cluster client set for machine update: %w", err)
670672
}
@@ -733,11 +735,19 @@ func (c *K0sController) reconcileConfig(ctx context.Context, cluster *clusterv1.
733735
}
734736
}
735737

736-
// Reconcile the dynamic config
737-
dErr := kutil.ReconcileDynamicConfig(ctx, cluster, c.Client, *kcp.Spec.K0sConfigSpec.K0s.DeepCopy())
738+
workloadClient, err := c.ClusterCache.GetClient(ctx, client.ObjectKeyFromObject(cluster))
739+
if err != nil {
740+
if errors.Is(err, clustercache.ErrClusterNotConnected) {
741+
log.Info("Connection to workload cluster is not ready, skipping config reconciliation")
742+
return nil
743+
}
744+
745+
return fmt.Errorf("error getting workload cluster client: %w", err)
746+
}
747+
748+
dErr := kutil.ReconcileDynamicConfig(ctx, workloadClient, *kcp.Spec.K0sConfigSpec.K0s.DeepCopy())
738749
if dErr != nil {
739-
// Don't return error from dynamic config reconciliation, as it may not be created yet
740-
log.Error(fmt.Errorf("failed to reconcile dynamic config, kubeconfig may not be available yet: %w", dErr), "Failed to reconcile dynamic config")
750+
return fmt.Errorf("error reconciling dynamic config: %w", dErr)
741751
}
742752
}
743753

internal/controller/controlplane/k0s_controlplane_controller_env_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"k8s.io/kubectl/pkg/scheme"
5151
"k8s.io/utils/ptr"
5252
clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
53+
"sigs.k8s.io/cluster-api/controllers/clustercache"
5354
"sigs.k8s.io/cluster-api/controllers/external"
5455
"sigs.k8s.io/cluster-api/util"
5556
"sigs.k8s.io/cluster-api/util/certs"
@@ -59,6 +60,7 @@ import (
5960
"sigs.k8s.io/cluster-api/util/secret"
6061
ctrl "sigs.k8s.io/controller-runtime"
6162
"sigs.k8s.io/controller-runtime/pkg/client"
63+
"sigs.k8s.io/controller-runtime/pkg/client/fake"
6264

6365
autopilot "github.com/k0sproject/k0s/pkg/apis/autopilot/v1beta2"
6466
bootstrapv1 "github.com/k0sproject/k0smotron/api/bootstrap/v1beta1"
@@ -82,6 +84,7 @@ func TestReconcileReturnErrorWhenOwnerClusterIsMissing(t *testing.T) {
8284
r := &K0sController{
8385
Client: testEnv,
8486
SecretCachingClient: secretCachingClient,
87+
ClusterCache: clustercache.NewFakeClusterCache(fake.NewClientBuilder().Build(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
8588
}
8689

8790
result, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: util.ObjectKey(kcp)})
@@ -666,6 +669,10 @@ func TestReconcileK0sConfigWithNLLBEnabled(t *testing.T) {
666669

667670
r := &K0sController{
668671
Client: testEnv,
672+
// We set a ClusterCache with a non-related Cluster accessor to verify to trigger an 'clustercache.ErrClusterNotConnected' error, which should be handled
673+
// gracefully by the controller and not cause the reconciliation to fail because it is expected that the workload cluster config reconciliation might not
674+
// be complete until the Cluster is fully connected and available.
675+
ClusterCache: clustercache.NewFakeClusterCache(fake.NewClientBuilder().Build(), client.ObjectKey{Name: "another-cluster", Namespace: "another-namespace"}),
669676
}
670677
err = r.reconcileConfig(ctx, cluster, kcp)
671678
require.NoError(t, err)
@@ -724,6 +731,10 @@ func TestReconcileK0sConfigWithNLLBDisabled(t *testing.T) {
724731

725732
r := &K0sController{
726733
Client: testEnv,
734+
// We set a ClusterCache with a non-related Cluster accessor to verify to trigger an 'clustercache.ErrClusterNotConnected' error, which should be handled
735+
// gracefully by the controller and not cause the reconciliation to fail because it is expected that the workload cluster config reconciliation might not
736+
// be complete until the Cluster is fully connected and available.
737+
ClusterCache: clustercache.NewFakeClusterCache(fake.NewClientBuilder().Build(), client.ObjectKey{Name: "another-cluster", Namespace: "another-namespace"}),
727738
}
728739
err = r.reconcileConfig(ctx, cluster, kcp)
729740
require.NoError(t, err)
@@ -780,6 +791,10 @@ func TestReconcileK0sConfigTunnelingServerAddressToApiSans(t *testing.T) {
780791

781792
r := &K0sController{
782793
Client: testEnv,
794+
// We set a ClusterCache with a non-related Cluster accessor to verify to trigger an 'clustercache.ErrClusterNotConnected' error, which should be handled
795+
// gracefully by the controller and not cause the reconciliation to fail because it is expected that the workload cluster config reconciliation might not
796+
// be complete until the Cluster is fully connected and available.
797+
ClusterCache: clustercache.NewFakeClusterCache(fake.NewClientBuilder().Build(), client.ObjectKey{Name: "another-cluster", Namespace: "another-namespace"}),
783798
}
784799
err = r.reconcileConfig(ctx, cluster, kcp)
785800
require.NoError(t, err)
@@ -1516,6 +1531,7 @@ func TestReconcileInitializeControlPlanes(t *testing.T) {
15161531
Client: testEnv,
15171532
workloadClusterKubeClient: kubernetes.New(restClient),
15181533
SecretCachingClient: secretCachingClient,
1534+
ClusterCache: clustercache.NewFakeClusterCache(fake.NewClientBuilder().Build(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
15191535
}
15201536

15211537
_, err = r.Reconcile(ctx, ctrl.Request{NamespacedName: util.ObjectKey(kcp)})

internal/controller/controlplane/k0smotron_controlplane_controller.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ import (
3636
"k8s.io/apimachinery/pkg/util/wait"
3737
"k8s.io/client-go/kubernetes"
3838
"k8s.io/client-go/rest"
39+
"sigs.k8s.io/cluster-api/controllers/clustercache"
3940
"sigs.k8s.io/cluster-api/controllers/external"
40-
"sigs.k8s.io/cluster-api/controllers/remote"
4141
crcontroller "sigs.k8s.io/controller-runtime/pkg/controller"
4242

4343
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -82,6 +82,7 @@ var currentSpec kapi.ClusterSpec
8282
type K0smotronController struct {
8383
client.Client
8484
SecretCachingClient client.Client
85+
ClusterCache clustercache.ClusterCache
8586
Scheme *runtime.Scheme
8687
ClientSet *kubernetes.Clientset
8788
RESTConfig *rest.Config
@@ -633,9 +634,9 @@ func (c *K0smotronController) computeAvailability(ctx context.Context, cluster *
633634
logger.Info("Pinging the workload cluster API")
634635

635636
// Get the CAPI cluster accessor
636-
client, err := remote.NewClusterClient(ctx, "k0smotron", c.Client, capiutil.ObjectKey(cluster))
637+
client, err := c.ClusterCache.GetClient(ctx, client.ObjectKeyFromObject(cluster))
637638
if err != nil {
638-
logger.Info("Failed to create cluster client", "error", err)
639+
logger.Info("Failed to get cluster client", "error", err)
639640
conditions.Set(kcp, metav1.Condition{
640641
Type: string(cpv1beta1.ControlPlaneAvailableCondition),
641642
Status: metav1.ConditionFalse,

0 commit comments

Comments
 (0)