From 8bbcf07c9f97de18009433c80708e1548e84c724 Mon Sep 17 00:00:00 2001 From: Claudiu Belu Date: Tue, 6 May 2025 09:50:38 +0000 Subject: [PATCH] fix: Resolve k8sd-proxy-related reconciliation errors Currently, the cluster-api-k8s controllers are being filled with reconciliation errors because the k8sd-proxy does not yet exist on a node, or it's not yet Ready. On error, the reconciliation request is put on an exponential backoff queue, which results in those requests being solved later and later. This can cause delays in various CAPI-related operations, such as scaling the number of nodes (requesting join tokens), certificate refreshes, and so on. In case of a k8sd-proxy related error (Pod does not yet exist, or it's not Ready), we're now deferring the request. --- .../controllers/certificates_controller.go | 22 +++++-- .../controllers/ck8sconfig_controller.go | 13 +++- .../ck8scontrolplane_controller.go | 31 ++++++---- pkg/ck8s/workload_cluster.go | 39 +++++++----- pkg/ck8s/workload_cluster_k8sd.go | 14 +++-- pkg/ck8s/workload_cluster_test.go | 17 ++++-- pkg/errors/errors.go | 40 +++++++++++++ pkg/errors/errors_test.go | 60 +++++++++++++++++++ 8 files changed, 190 insertions(+), 46 deletions(-) create mode 100644 pkg/errors/errors_test.go diff --git a/bootstrap/controllers/certificates_controller.go b/bootstrap/controllers/certificates_controller.go index 0142d3f0..a26290e8 100644 --- a/bootstrap/controllers/certificates_controller.go +++ b/bootstrap/controllers/certificates_controller.go @@ -20,6 +20,7 @@ import ( bootstrapv1 "github.com/canonical/cluster-api-k8s/bootstrap/api/v1beta2" "github.com/canonical/cluster-api-k8s/pkg/ck8s" + ck8serrors "github.com/canonical/cluster-api-k8s/pkg/errors" utiltime "github.com/canonical/cluster-api-k8s/pkg/time" "github.com/canonical/cluster-api-k8s/pkg/token" ) @@ -127,19 +128,28 @@ func (r *CertificatesReconciler) Reconcile(ctx context.Context, req ctrl.Request if !hasExpiryDateAnnotation { if err := r.updateExpiryDateAnnotation(ctx, scope); err != nil { - return ctrl.Result{}, err + log.Error(err, "Encountered error during updateExpiryDateAnnotation") + return ck8serrors.RequeueOnK8sdProxyError(err) } } if refreshCertificates { if err := r.refreshCertificates(ctx, scope); err != nil { // On error, we requeue the request to retry. - mAnnotations[bootstrapv1.CertificatesRefreshStatusAnnotation] = bootstrapv1.CertificatesRefreshFailedStatus - m.SetAnnotations(mAnnotations) - if err := r.Client.Update(ctx, m); err != nil { - return ctrl.Result{}, fmt.Errorf("failed to clear status annotation after error: %w", err) + log.Error(err, "Encountered error during refreshCertificates") + + // Only update the machine if this annotation isn't already set to the same value. + // Updating it will re-trigger this Reconciler, in which case we'd probably hit the same error. + // The request is going to requeued anyways, since we'll be returning an error or a non-zero Result. + if annotation, ok := mAnnotations[bootstrapv1.CertificatesRefreshStatusAnnotation]; !ok || annotation != bootstrapv1.CertificatesRefreshFailedStatus { + mAnnotations[bootstrapv1.CertificatesRefreshStatusAnnotation] = bootstrapv1.CertificatesRefreshFailedStatus + m.SetAnnotations(mAnnotations) + if err := r.Client.Update(ctx, m); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to clear status annotation after error: %w", err) + } } - return ctrl.Result{}, err + + return ck8serrors.RequeueOnK8sdProxyError(err) } } diff --git a/bootstrap/controllers/ck8sconfig_controller.go b/bootstrap/controllers/ck8sconfig_controller.go index eede796d..2ce9db2f 100644 --- a/bootstrap/controllers/ck8sconfig_controller.go +++ b/bootstrap/controllers/ck8sconfig_controller.go @@ -45,6 +45,7 @@ import ( bootstrapv1 "github.com/canonical/cluster-api-k8s/bootstrap/api/v1beta2" "github.com/canonical/cluster-api-k8s/pkg/ck8s" "github.com/canonical/cluster-api-k8s/pkg/cloudinit" + ck8serrors "github.com/canonical/cluster-api-k8s/pkg/errors" "github.com/canonical/cluster-api-k8s/pkg/locking" "github.com/canonical/cluster-api-k8s/pkg/secret" "github.com/canonical/cluster-api-k8s/pkg/token" @@ -205,11 +206,19 @@ func (r *CK8sConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) // it's a control plane join if configOwner.IsControlPlaneMachine() { - return reconcile.Result{}, r.joinControlplane(ctx, scope) + if err := r.joinControlplane(ctx, scope); err != nil { + log.Error(err, "Encountered error during joinControlplane") + return ck8serrors.RequeueOnK8sdProxyError(err) + } + return reconcile.Result{}, nil } // It's a worker join - return reconcile.Result{}, r.joinWorker(ctx, scope) + if err := r.joinWorker(ctx, scope); err != nil { + log.Error(err, "Encountered error during joinWorker") + return ck8serrors.RequeueOnK8sdProxyError(err) + } + return reconcile.Result{}, nil } func (r *CK8sConfigReconciler) joinControlplane(ctx context.Context, scope *Scope) error { diff --git a/controlplane/controllers/ck8scontrolplane_controller.go b/controlplane/controllers/ck8scontrolplane_controller.go index 3675b003..52c7f37c 100644 --- a/controlplane/controllers/ck8scontrolplane_controller.go +++ b/controlplane/controllers/ck8scontrolplane_controller.go @@ -142,7 +142,10 @@ func (r *CK8sControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.Req } // Always attempt to update status. - if updateErr := r.updateStatus(ctx, kcp, cluster); updateErr != nil { + if requeue, updateErr := r.updateStatus(ctx, kcp, cluster); updateErr != nil { + if requeue && res.IsZero() { + res.RequeueAfter = 10 * time.Second + } var connFailure *ck8s.RemoteClusterConnectionError if errors.As(updateErr, &connFailure) { logger.Info("Could not connect to workload cluster to fetch status", "updateErr", updateErr.Error()) @@ -167,7 +170,12 @@ func (r *CK8sControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.Req } } - return res, err + if res.IsZero() { + return res, err + } + + // If the result is non-zero and we're requeuing the request, we shouldn't return an error. + return res, nil } // reconcileDelete handles CK8sControlPlane deletion. @@ -325,7 +333,7 @@ func (r *CK8sControlPlaneReconciler) ClusterToCK8sControlPlane(_ context.Context // updateStatus is called after every reconcilitation loop in a defer statement to always make sure we have the // resource status subresourcs up-to-date. -func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *controlplanev1.CK8sControlPlane, cluster *clusterv1.Cluster) error { +func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *controlplanev1.CK8sControlPlane, cluster *clusterv1.Cluster) (bool, error) { selector := collections.ControlPlaneSelectorForCluster(cluster.Name) // Copy label selector to its status counterpart in string format. // This is necessary for CRDs including scale subresources. @@ -333,14 +341,14 @@ func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *cont ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster), collections.OwnedMachines(kcp)) if err != nil { - return fmt.Errorf("failed to get list of owned machines: %w", err) + return false, fmt.Errorf("failed to get list of owned machines: %w", err) } logger := r.Log.WithValues("namespace", kcp.Namespace, "CK8sControlPlane", kcp.Name, "cluster", cluster.Name) controlPlane, err := ck8s.NewControlPlane(ctx, r.Client, cluster, kcp, ownedMachines) if err != nil { logger.Error(err, "failed to initialize control plane") - return err + return false, err } kcp.Status.UpdatedReplicas = int32(len(controlPlane.UpToDateMachines())) @@ -360,7 +368,7 @@ func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *cont // Return early if the deletion timestamp is set, because we don't want to try to connect to the workload cluster // and we don't want to report resize condition (because it is set to deleting into reconcile delete). if !kcp.DeletionTimestamp.IsZero() { - return nil + return false, nil } switch { @@ -383,11 +391,12 @@ func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *cont microclusterPort := kcp.Spec.CK8sConfigSpec.ControlPlaneConfig.GetMicroclusterPort() workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster), microclusterPort) if err != nil { - return fmt.Errorf("failed to create remote cluster client: %w", err) + return false, fmt.Errorf("failed to create remote cluster client: %w", err) } status, err := workloadCluster.ClusterStatus(ctx) if err != nil { - return err + // we will requeue if HasK8sdConfigMap hasn't been set yet. + return !status.HasK8sdConfigMap, err } logger.Info("ClusterStatus", "workload", status) @@ -413,7 +422,7 @@ func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *cont if v, ok := controlPlane.KCP.Annotations[controlplanev1.RemediationInProgressAnnotation]; ok { remediationData, err := RemediationDataFromAnnotation(v) if err != nil { - return err + return false, err } lastRemediation = remediationData } else { @@ -421,7 +430,7 @@ func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *cont if v, ok := m.Annotations[controlplanev1.RemediationForAnnotation]; ok { remediationData, err := RemediationDataFromAnnotation(v) if err != nil { - return err + return false, err } if lastRemediation == nil || lastRemediation.Timestamp.Time.Before(remediationData.Timestamp.Time) { lastRemediation = remediationData @@ -434,7 +443,7 @@ func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *cont controlPlane.KCP.Status.LastRemediation = lastRemediation.ToStatus() } - return nil + return false, nil } // reconcile handles CK8sControlPlane reconciliation. diff --git a/pkg/ck8s/workload_cluster.go b/pkg/ck8s/workload_cluster.go index 2fde6ed8..f0a7b813 100644 --- a/pkg/ck8s/workload_cluster.go +++ b/pkg/ck8s/workload_cluster.go @@ -25,6 +25,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" controlplanev1 "github.com/canonical/cluster-api-k8s/controlplane/api/v1beta2" + ck8serrors "github.com/canonical/cluster-api-k8s/pkg/errors" ) const ( @@ -83,6 +84,23 @@ func (w *Workload) getControlPlaneNodes(ctx context.Context) (*corev1.NodeList, func (w *Workload) ClusterStatus(ctx context.Context) (ClusterStatus, error) { status := ClusterStatus{} + // NOTE(neoaggelos): Check that the k8sd-config on the kube-system configmap exists. + key := ctrlclient.ObjectKey{ + Name: k8sdConfigSecretName, + Namespace: metav1.NamespaceSystem, + } + + err := w.Client.Get(ctx, key, &corev1.ConfigMap{}) + // In case of error we do assume the control plane is not initialized yet. + if err != nil { + logger := log.FromContext(ctx) + logger.Info("Control Plane does not seem to be initialized yet.", "reason", err.Error()) + status.HasK8sdConfigMap = false + return status, err + } + + status.HasK8sdConfigMap = true + // count the control plane nodes nodes, err := w.getControlPlaneNodes(ctx) if err != nil { @@ -97,21 +115,6 @@ func (w *Workload) ClusterStatus(ctx context.Context) (ClusterStatus, error) { } } - // NOTE(neoaggelos): Check that the k8sd-config on the kube-system configmap exists. - key := ctrlclient.ObjectKey{ - Name: k8sdConfigSecretName, - Namespace: metav1.NamespaceSystem, - } - - err = w.Client.Get(ctx, key, &corev1.ConfigMap{}) - // In case of error we do assume the control plane is not initialized yet. - if err != nil { - logger := log.FromContext(ctx) - logger.Info("Control Plane does not seem to be initialized yet.", "reason", err.Error()) - } - - status.HasK8sdConfigMap = err == nil - return status, nil } @@ -165,6 +168,10 @@ func (w *Workload) GetK8sdProxyForControlPlane(ctx context.Context, options k8sd return nil, fmt.Errorf("failed to get proxy pods: %w", err) } + if len(podmap) == 0 { + return nil, &ck8serrors.K8sdProxyNotFound{} + } + var allErrors []error for _, node := range cplaneNodes.Items { if _, ok := options.IgnoreNodes[node.Name]; ok { @@ -179,7 +186,7 @@ func (w *Workload) GetK8sdProxyForControlPlane(ctx context.Context, options k8sd if !podv1.IsPodReady(&pod) { // if the Pod is not Ready, it won't be able to accept any k8sd API calls. - allErrors = append(allErrors, fmt.Errorf("pod '%s' is not Ready", pod.Name)) + allErrors = append(allErrors, &ck8serrors.K8sdProxyNotReady{PodName: pod.Name}) continue } diff --git a/pkg/ck8s/workload_cluster_k8sd.go b/pkg/ck8s/workload_cluster_k8sd.go index c87e5a29..054e6f0a 100644 --- a/pkg/ck8s/workload_cluster_k8sd.go +++ b/pkg/ck8s/workload_cluster_k8sd.go @@ -4,7 +4,6 @@ import ( "context" "crypto/tls" _ "embed" - "errors" "fmt" "net/http" "time" @@ -13,7 +12,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + podv1 "k8s.io/kubernetes/pkg/api/v1/pod" + ck8serrors "github.com/canonical/cluster-api-k8s/pkg/errors" "github.com/canonical/cluster-api-k8s/pkg/proxy" ) @@ -49,7 +50,12 @@ func (g *k8sdClientGenerator) forNode(ctx context.Context, node *corev1.Node) (* pod, ok := podmap[node.Name] if !ok { - return nil, fmt.Errorf("missing k8sd proxy pod for node %s", node.Name) + return nil, &ck8serrors.K8sdProxyNotFound{NodeName: node.Name} + } + + if !podv1.IsPodReady(&pod) { + // if the Pod is not Ready, it won't be able to accept any k8sd API calls. + return nil, &ck8serrors.K8sdProxyNotReady{PodName: pod.Name} } return g.forNodePod(ctx, node, pod.Name) @@ -78,10 +84,6 @@ func (g *k8sdClientGenerator) getProxyPods(ctx context.Context) (map[string]core return nil, fmt.Errorf("unable to list k8sd-proxy pods in target cluster: %w", err) } - if len(pods.Items) == 0 { - return nil, errors.New("there isn't any k8sd-proxy pods in target cluster") - } - podmap := make(map[string]corev1.Pod, len(pods.Items)) for _, pod := range pods.Items { podmap[pod.Spec.NodeName] = pod diff --git a/pkg/ck8s/workload_cluster_test.go b/pkg/ck8s/workload_cluster_test.go index a7827abd..036cf138 100644 --- a/pkg/ck8s/workload_cluster_test.go +++ b/pkg/ck8s/workload_cluster_test.go @@ -54,12 +54,13 @@ func TestClusterStatus(t *testing.T) { }{ { name: "returns cluster status", - objs: []client.Object{node1, node2}, + objs: []client.Object{}, expectHasSecret: false, + expectErr: true, }, { name: "returns cluster status with k8sd-config configmap", - objs: []client.Object{node1, node2, servingSecret}, + objs: []client.Object{servingSecret, node1, node2}, expectHasSecret: true, }, } @@ -72,9 +73,15 @@ func TestClusterStatus(t *testing.T) { Client: fakeClient, } status, err := w.ClusterStatus(context.TODO()) - g.Expect(err).ToNot(HaveOccurred()) - g.Expect(status.Nodes).To(BeEquivalentTo(2)) - g.Expect(status.ReadyNodes).To(BeEquivalentTo(1)) + if tt.expectErr { + g.Expect(err).To(HaveOccurred()) + g.Expect(status.Nodes).To(BeEquivalentTo(0)) + g.Expect(status.ReadyNodes).To(BeEquivalentTo(0)) + } else { + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(status.Nodes).To(BeEquivalentTo(2)) + g.Expect(status.ReadyNodes).To(BeEquivalentTo(1)) + } g.Expect(status.HasK8sdConfigMap).To(Equal(tt.expectHasSecret)) }) } diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 5df9bbc0..5e8e3058 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -1,5 +1,13 @@ package errors +import ( + "errors" + "fmt" + "time" + + ctrl "sigs.k8s.io/controller-runtime" +) + type CK8sControlPlaneStatusError string const ( @@ -24,3 +32,35 @@ const ( // when trying to delete the CK8s control plane. DeleteCK8sControlPlaneError CK8sControlPlaneStatusError = "DeleteError" ) + +type K8sdProxyNotFound struct { + NodeName string +} + +func (e *K8sdProxyNotFound) Error() string { + if e.NodeName == "" { + return "missing k8sd proxy pod(s)" + } + return fmt.Sprintf("missing k8sd proxy pod for node %s", e.NodeName) +} + +type K8sdProxyNotReady struct { + PodName string +} + +func (e *K8sdProxyNotReady) Error() string { + return fmt.Sprintf("pod '%s' is not Ready", e.PodName) +} + +func RequeueOnK8sdProxyError(err error) (ctrl.Result, error) { + var ( + notFoundErr *K8sdProxyNotFound + notReadyErr *K8sdProxyNotReady + ) + if errors.As(err, ¬FoundErr) || errors.As(err, ¬ReadyErr) { + return ctrl.Result{RequeueAfter: 30 * time.Second}, nil + } + + // Not a k8sd-proxy related error. + return ctrl.Result{}, err +} diff --git a/pkg/errors/errors_test.go b/pkg/errors/errors_test.go new file mode 100644 index 00000000..77d44071 --- /dev/null +++ b/pkg/errors/errors_test.go @@ -0,0 +1,60 @@ +package errors + +import ( + "errors" + "fmt" + "testing" + "time" + + . "github.com/onsi/gomega" +) + +func TestRequeueOnK8sdProxyError(t *testing.T) { + notFoundErr := &K8sdProxyNotFound{NodeName: "foo"} + notReadyErr := &K8sdProxyNotReady{PodName: "lish"} + otherErr := fmt.Errorf("bar err") + + tests := []struct { + name string + err error + expectErr bool + }{ + { + name: "k8sd-proxy not found", + err: notFoundErr, + }, + { + name: "k8sd-proxy not ready", + err: notFoundErr, + }, + { + name: "wrapped error", + err: fmt.Errorf("wrapped error: %w", fmt.Errorf("another wrap: %w", notFoundErr)), + }, + { + name: "joined error", + err: fmt.Errorf("wrapped joined error: %w", errors.Join(otherErr, notReadyErr)), + }, + { + name: "other error", + err: otherErr, + expectErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + result, err := RequeueOnK8sdProxyError(tt.err) + + if tt.expectErr { + g.Expect(err).To(HaveOccurred()) + g.Expect(result.IsZero()).To(BeTrue()) + } else { + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(result.RequeueAfter).To(Equal(30 * time.Second)) + } + }) + } +}