diff --git a/bootstrap/controllers/certificates_controller.go b/bootstrap/controllers/certificates_controller.go index 0142d3f0..a26290e8 100644 --- a/bootstrap/controllers/certificates_controller.go +++ b/bootstrap/controllers/certificates_controller.go @@ -20,6 +20,7 @@ import ( bootstrapv1 "github.com/canonical/cluster-api-k8s/bootstrap/api/v1beta2" "github.com/canonical/cluster-api-k8s/pkg/ck8s" + ck8serrors "github.com/canonical/cluster-api-k8s/pkg/errors" utiltime "github.com/canonical/cluster-api-k8s/pkg/time" "github.com/canonical/cluster-api-k8s/pkg/token" ) @@ -127,19 +128,28 @@ func (r *CertificatesReconciler) Reconcile(ctx context.Context, req ctrl.Request if !hasExpiryDateAnnotation { if err := r.updateExpiryDateAnnotation(ctx, scope); err != nil { - return ctrl.Result{}, err + log.Error(err, "Encountered error during updateExpiryDateAnnotation") + return ck8serrors.RequeueOnK8sdProxyError(err) } } if refreshCertificates { if err := r.refreshCertificates(ctx, scope); err != nil { // On error, we requeue the request to retry. - mAnnotations[bootstrapv1.CertificatesRefreshStatusAnnotation] = bootstrapv1.CertificatesRefreshFailedStatus - m.SetAnnotations(mAnnotations) - if err := r.Client.Update(ctx, m); err != nil { - return ctrl.Result{}, fmt.Errorf("failed to clear status annotation after error: %w", err) + log.Error(err, "Encountered error during refreshCertificates") + + // Only update the machine if this annotation isn't already set to the same value. + // Updating it will re-trigger this Reconciler, in which case we'd probably hit the same error. + // The request is going to requeued anyways, since we'll be returning an error or a non-zero Result. + if annotation, ok := mAnnotations[bootstrapv1.CertificatesRefreshStatusAnnotation]; !ok || annotation != bootstrapv1.CertificatesRefreshFailedStatus { + mAnnotations[bootstrapv1.CertificatesRefreshStatusAnnotation] = bootstrapv1.CertificatesRefreshFailedStatus + m.SetAnnotations(mAnnotations) + if err := r.Client.Update(ctx, m); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to clear status annotation after error: %w", err) + } } - return ctrl.Result{}, err + + return ck8serrors.RequeueOnK8sdProxyError(err) } } diff --git a/bootstrap/controllers/ck8sconfig_controller.go b/bootstrap/controllers/ck8sconfig_controller.go index eede796d..2ce9db2f 100644 --- a/bootstrap/controllers/ck8sconfig_controller.go +++ b/bootstrap/controllers/ck8sconfig_controller.go @@ -45,6 +45,7 @@ import ( bootstrapv1 "github.com/canonical/cluster-api-k8s/bootstrap/api/v1beta2" "github.com/canonical/cluster-api-k8s/pkg/ck8s" "github.com/canonical/cluster-api-k8s/pkg/cloudinit" + ck8serrors "github.com/canonical/cluster-api-k8s/pkg/errors" "github.com/canonical/cluster-api-k8s/pkg/locking" "github.com/canonical/cluster-api-k8s/pkg/secret" "github.com/canonical/cluster-api-k8s/pkg/token" @@ -205,11 +206,19 @@ func (r *CK8sConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) // it's a control plane join if configOwner.IsControlPlaneMachine() { - return reconcile.Result{}, r.joinControlplane(ctx, scope) + if err := r.joinControlplane(ctx, scope); err != nil { + log.Error(err, "Encountered error during joinControlplane") + return ck8serrors.RequeueOnK8sdProxyError(err) + } + return reconcile.Result{}, nil } // It's a worker join - return reconcile.Result{}, r.joinWorker(ctx, scope) + if err := r.joinWorker(ctx, scope); err != nil { + log.Error(err, "Encountered error during joinWorker") + return ck8serrors.RequeueOnK8sdProxyError(err) + } + return reconcile.Result{}, nil } func (r *CK8sConfigReconciler) joinControlplane(ctx context.Context, scope *Scope) error { diff --git a/controlplane/controllers/ck8scontrolplane_controller.go b/controlplane/controllers/ck8scontrolplane_controller.go index 3675b003..52c7f37c 100644 --- a/controlplane/controllers/ck8scontrolplane_controller.go +++ b/controlplane/controllers/ck8scontrolplane_controller.go @@ -142,7 +142,10 @@ func (r *CK8sControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.Req } // Always attempt to update status. - if updateErr := r.updateStatus(ctx, kcp, cluster); updateErr != nil { + if requeue, updateErr := r.updateStatus(ctx, kcp, cluster); updateErr != nil { + if requeue && res.IsZero() { + res.RequeueAfter = 10 * time.Second + } var connFailure *ck8s.RemoteClusterConnectionError if errors.As(updateErr, &connFailure) { logger.Info("Could not connect to workload cluster to fetch status", "updateErr", updateErr.Error()) @@ -167,7 +170,12 @@ func (r *CK8sControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.Req } } - return res, err + if res.IsZero() { + return res, err + } + + // If the result is non-zero and we're requeuing the request, we shouldn't return an error. + return res, nil } // reconcileDelete handles CK8sControlPlane deletion. @@ -325,7 +333,7 @@ func (r *CK8sControlPlaneReconciler) ClusterToCK8sControlPlane(_ context.Context // updateStatus is called after every reconcilitation loop in a defer statement to always make sure we have the // resource status subresourcs up-to-date. -func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *controlplanev1.CK8sControlPlane, cluster *clusterv1.Cluster) error { +func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *controlplanev1.CK8sControlPlane, cluster *clusterv1.Cluster) (bool, error) { selector := collections.ControlPlaneSelectorForCluster(cluster.Name) // Copy label selector to its status counterpart in string format. // This is necessary for CRDs including scale subresources. @@ -333,14 +341,14 @@ func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *cont ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster), collections.OwnedMachines(kcp)) if err != nil { - return fmt.Errorf("failed to get list of owned machines: %w", err) + return false, fmt.Errorf("failed to get list of owned machines: %w", err) } logger := r.Log.WithValues("namespace", kcp.Namespace, "CK8sControlPlane", kcp.Name, "cluster", cluster.Name) controlPlane, err := ck8s.NewControlPlane(ctx, r.Client, cluster, kcp, ownedMachines) if err != nil { logger.Error(err, "failed to initialize control plane") - return err + return false, err } kcp.Status.UpdatedReplicas = int32(len(controlPlane.UpToDateMachines())) @@ -360,7 +368,7 @@ func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *cont // Return early if the deletion timestamp is set, because we don't want to try to connect to the workload cluster // and we don't want to report resize condition (because it is set to deleting into reconcile delete). if !kcp.DeletionTimestamp.IsZero() { - return nil + return false, nil } switch { @@ -383,11 +391,12 @@ func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *cont microclusterPort := kcp.Spec.CK8sConfigSpec.ControlPlaneConfig.GetMicroclusterPort() workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster), microclusterPort) if err != nil { - return fmt.Errorf("failed to create remote cluster client: %w", err) + return false, fmt.Errorf("failed to create remote cluster client: %w", err) } status, err := workloadCluster.ClusterStatus(ctx) if err != nil { - return err + // we will requeue if HasK8sdConfigMap hasn't been set yet. + return !status.HasK8sdConfigMap, err } logger.Info("ClusterStatus", "workload", status) @@ -413,7 +422,7 @@ func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *cont if v, ok := controlPlane.KCP.Annotations[controlplanev1.RemediationInProgressAnnotation]; ok { remediationData, err := RemediationDataFromAnnotation(v) if err != nil { - return err + return false, err } lastRemediation = remediationData } else { @@ -421,7 +430,7 @@ func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *cont if v, ok := m.Annotations[controlplanev1.RemediationForAnnotation]; ok { remediationData, err := RemediationDataFromAnnotation(v) if err != nil { - return err + return false, err } if lastRemediation == nil || lastRemediation.Timestamp.Time.Before(remediationData.Timestamp.Time) { lastRemediation = remediationData @@ -434,7 +443,7 @@ func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *cont controlPlane.KCP.Status.LastRemediation = lastRemediation.ToStatus() } - return nil + return false, nil } // reconcile handles CK8sControlPlane reconciliation. diff --git a/pkg/ck8s/workload_cluster.go b/pkg/ck8s/workload_cluster.go index 2fde6ed8..f0a7b813 100644 --- a/pkg/ck8s/workload_cluster.go +++ b/pkg/ck8s/workload_cluster.go @@ -25,6 +25,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" controlplanev1 "github.com/canonical/cluster-api-k8s/controlplane/api/v1beta2" + ck8serrors "github.com/canonical/cluster-api-k8s/pkg/errors" ) const ( @@ -83,6 +84,23 @@ func (w *Workload) getControlPlaneNodes(ctx context.Context) (*corev1.NodeList, func (w *Workload) ClusterStatus(ctx context.Context) (ClusterStatus, error) { status := ClusterStatus{} + // NOTE(neoaggelos): Check that the k8sd-config on the kube-system configmap exists. + key := ctrlclient.ObjectKey{ + Name: k8sdConfigSecretName, + Namespace: metav1.NamespaceSystem, + } + + err := w.Client.Get(ctx, key, &corev1.ConfigMap{}) + // In case of error we do assume the control plane is not initialized yet. + if err != nil { + logger := log.FromContext(ctx) + logger.Info("Control Plane does not seem to be initialized yet.", "reason", err.Error()) + status.HasK8sdConfigMap = false + return status, err + } + + status.HasK8sdConfigMap = true + // count the control plane nodes nodes, err := w.getControlPlaneNodes(ctx) if err != nil { @@ -97,21 +115,6 @@ func (w *Workload) ClusterStatus(ctx context.Context) (ClusterStatus, error) { } } - // NOTE(neoaggelos): Check that the k8sd-config on the kube-system configmap exists. - key := ctrlclient.ObjectKey{ - Name: k8sdConfigSecretName, - Namespace: metav1.NamespaceSystem, - } - - err = w.Client.Get(ctx, key, &corev1.ConfigMap{}) - // In case of error we do assume the control plane is not initialized yet. - if err != nil { - logger := log.FromContext(ctx) - logger.Info("Control Plane does not seem to be initialized yet.", "reason", err.Error()) - } - - status.HasK8sdConfigMap = err == nil - return status, nil } @@ -165,6 +168,10 @@ func (w *Workload) GetK8sdProxyForControlPlane(ctx context.Context, options k8sd return nil, fmt.Errorf("failed to get proxy pods: %w", err) } + if len(podmap) == 0 { + return nil, &ck8serrors.K8sdProxyNotFound{} + } + var allErrors []error for _, node := range cplaneNodes.Items { if _, ok := options.IgnoreNodes[node.Name]; ok { @@ -179,7 +186,7 @@ func (w *Workload) GetK8sdProxyForControlPlane(ctx context.Context, options k8sd if !podv1.IsPodReady(&pod) { // if the Pod is not Ready, it won't be able to accept any k8sd API calls. - allErrors = append(allErrors, fmt.Errorf("pod '%s' is not Ready", pod.Name)) + allErrors = append(allErrors, &ck8serrors.K8sdProxyNotReady{PodName: pod.Name}) continue } diff --git a/pkg/ck8s/workload_cluster_k8sd.go b/pkg/ck8s/workload_cluster_k8sd.go index c87e5a29..054e6f0a 100644 --- a/pkg/ck8s/workload_cluster_k8sd.go +++ b/pkg/ck8s/workload_cluster_k8sd.go @@ -4,7 +4,6 @@ import ( "context" "crypto/tls" _ "embed" - "errors" "fmt" "net/http" "time" @@ -13,7 +12,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + podv1 "k8s.io/kubernetes/pkg/api/v1/pod" + ck8serrors "github.com/canonical/cluster-api-k8s/pkg/errors" "github.com/canonical/cluster-api-k8s/pkg/proxy" ) @@ -49,7 +50,12 @@ func (g *k8sdClientGenerator) forNode(ctx context.Context, node *corev1.Node) (* pod, ok := podmap[node.Name] if !ok { - return nil, fmt.Errorf("missing k8sd proxy pod for node %s", node.Name) + return nil, &ck8serrors.K8sdProxyNotFound{NodeName: node.Name} + } + + if !podv1.IsPodReady(&pod) { + // if the Pod is not Ready, it won't be able to accept any k8sd API calls. + return nil, &ck8serrors.K8sdProxyNotReady{PodName: pod.Name} } return g.forNodePod(ctx, node, pod.Name) @@ -78,10 +84,6 @@ func (g *k8sdClientGenerator) getProxyPods(ctx context.Context) (map[string]core return nil, fmt.Errorf("unable to list k8sd-proxy pods in target cluster: %w", err) } - if len(pods.Items) == 0 { - return nil, errors.New("there isn't any k8sd-proxy pods in target cluster") - } - podmap := make(map[string]corev1.Pod, len(pods.Items)) for _, pod := range pods.Items { podmap[pod.Spec.NodeName] = pod diff --git a/pkg/ck8s/workload_cluster_test.go b/pkg/ck8s/workload_cluster_test.go index a7827abd..036cf138 100644 --- a/pkg/ck8s/workload_cluster_test.go +++ b/pkg/ck8s/workload_cluster_test.go @@ -54,12 +54,13 @@ func TestClusterStatus(t *testing.T) { }{ { name: "returns cluster status", - objs: []client.Object{node1, node2}, + objs: []client.Object{}, expectHasSecret: false, + expectErr: true, }, { name: "returns cluster status with k8sd-config configmap", - objs: []client.Object{node1, node2, servingSecret}, + objs: []client.Object{servingSecret, node1, node2}, expectHasSecret: true, }, } @@ -72,9 +73,15 @@ func TestClusterStatus(t *testing.T) { Client: fakeClient, } status, err := w.ClusterStatus(context.TODO()) - g.Expect(err).ToNot(HaveOccurred()) - g.Expect(status.Nodes).To(BeEquivalentTo(2)) - g.Expect(status.ReadyNodes).To(BeEquivalentTo(1)) + if tt.expectErr { + g.Expect(err).To(HaveOccurred()) + g.Expect(status.Nodes).To(BeEquivalentTo(0)) + g.Expect(status.ReadyNodes).To(BeEquivalentTo(0)) + } else { + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(status.Nodes).To(BeEquivalentTo(2)) + g.Expect(status.ReadyNodes).To(BeEquivalentTo(1)) + } g.Expect(status.HasK8sdConfigMap).To(Equal(tt.expectHasSecret)) }) } diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 5df9bbc0..5e8e3058 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -1,5 +1,13 @@ package errors +import ( + "errors" + "fmt" + "time" + + ctrl "sigs.k8s.io/controller-runtime" +) + type CK8sControlPlaneStatusError string const ( @@ -24,3 +32,35 @@ const ( // when trying to delete the CK8s control plane. DeleteCK8sControlPlaneError CK8sControlPlaneStatusError = "DeleteError" ) + +type K8sdProxyNotFound struct { + NodeName string +} + +func (e *K8sdProxyNotFound) Error() string { + if e.NodeName == "" { + return "missing k8sd proxy pod(s)" + } + return fmt.Sprintf("missing k8sd proxy pod for node %s", e.NodeName) +} + +type K8sdProxyNotReady struct { + PodName string +} + +func (e *K8sdProxyNotReady) Error() string { + return fmt.Sprintf("pod '%s' is not Ready", e.PodName) +} + +func RequeueOnK8sdProxyError(err error) (ctrl.Result, error) { + var ( + notFoundErr *K8sdProxyNotFound + notReadyErr *K8sdProxyNotReady + ) + if errors.As(err, ¬FoundErr) || errors.As(err, ¬ReadyErr) { + return ctrl.Result{RequeueAfter: 30 * time.Second}, nil + } + + // Not a k8sd-proxy related error. + return ctrl.Result{}, err +} diff --git a/pkg/errors/errors_test.go b/pkg/errors/errors_test.go new file mode 100644 index 00000000..77d44071 --- /dev/null +++ b/pkg/errors/errors_test.go @@ -0,0 +1,60 @@ +package errors + +import ( + "errors" + "fmt" + "testing" + "time" + + . "github.com/onsi/gomega" +) + +func TestRequeueOnK8sdProxyError(t *testing.T) { + notFoundErr := &K8sdProxyNotFound{NodeName: "foo"} + notReadyErr := &K8sdProxyNotReady{PodName: "lish"} + otherErr := fmt.Errorf("bar err") + + tests := []struct { + name string + err error + expectErr bool + }{ + { + name: "k8sd-proxy not found", + err: notFoundErr, + }, + { + name: "k8sd-proxy not ready", + err: notFoundErr, + }, + { + name: "wrapped error", + err: fmt.Errorf("wrapped error: %w", fmt.Errorf("another wrap: %w", notFoundErr)), + }, + { + name: "joined error", + err: fmt.Errorf("wrapped joined error: %w", errors.Join(otherErr, notReadyErr)), + }, + { + name: "other error", + err: otherErr, + expectErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + result, err := RequeueOnK8sdProxyError(tt.err) + + if tt.expectErr { + g.Expect(err).To(HaveOccurred()) + g.Expect(result.IsZero()).To(BeTrue()) + } else { + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(result.RequeueAfter).To(Equal(30 * time.Second)) + } + }) + } +}