diff --git a/.gitignore b/.gitignore index cffac916a..08cd486be 100644 --- a/.gitignore +++ b/.gitignore @@ -11,7 +11,7 @@ # Output of the go coverage tool, specifically when used with LiteIDE *.out cluster-etcd-operator -tnf-setup-runner +tnf-runtime # Log output from telepresence telepresence.log @@ -27,4 +27,4 @@ report.json test-results/ # OpenShift Tests Extension metadata -.openshift-tests-extension/openshift_payload_*.json \ No newline at end of file +.openshift-tests-extension/openshift_payload_*.json diff --git a/Dockerfile.ocp b/Dockerfile.ocp index 7a56a44c9..be0bb5e18 100644 --- a/Dockerfile.ocp +++ b/Dockerfile.ocp @@ -10,7 +10,7 @@ FROM registry.ci.openshift.org/ocp/4.21:base-rhel9 COPY --from=builder /go/src/github.com/openshift/cluster-etcd-operator/bindata/bootkube/bootstrap-manifests /usr/share/bootkube/manifests/bootstrap-manifests/ COPY --from=builder /go/src/github.com/openshift/cluster-etcd-operator/bindata/bootkube/manifests /usr/share/bootkube/manifests/manifests/ COPY --from=builder /go/src/github.com/openshift/cluster-etcd-operator/cluster-etcd-operator /usr/bin/ -COPY --from=builder /go/src/github.com/openshift/cluster-etcd-operator/tnf-setup-runner /usr/bin/ +COPY --from=builder /go/src/github.com/openshift/cluster-etcd-operator/tnf-runtime /usr/bin/ COPY --from=builder /go/src/github.com/openshift/cluster-etcd-operator/cluster-etcd-operator-tests-ext.gz /usr/bin/ COPY manifests/ /manifests diff --git a/bindata/tnfdeployment/clusterrole.yaml b/bindata/tnfdeployment/clusterrole.yaml index e99ff3f27..e20d82c02 100644 --- a/bindata/tnfdeployment/clusterrole.yaml +++ b/bindata/tnfdeployment/clusterrole.yaml @@ -29,6 +29,25 @@ rules: - get - patch - update + - apiGroups: + - etcd.openshift.io + resources: + - pacemakerstatuses + verbs: + - get + - list + - watch + - create + - update + - patch + - apiGroups: + - etcd.openshift.io + resources: + - pacemakerstatuses/status + verbs: + - get + - update + - patch - apiGroups: - config.openshift.io resources: diff --git a/bindata/tnfdeployment/cronjob.yaml b/bindata/tnfdeployment/cronjob.yaml new file mode 100644 index 000000000..ea184a6be --- /dev/null +++ b/bindata/tnfdeployment/cronjob.yaml @@ -0,0 +1,48 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: + namespace: + labels: + app.kubernetes.io/name: +spec: + schedule: + concurrencyPolicy: "Forbid" + failedJobsHistoryLimit: 3 + successfulJobsHistoryLimit: 1 + jobTemplate: + metadata: + labels: + app.kubernetes.io/name: + spec: + ttlSecondsAfterFinished: 600 + template: + metadata: + annotations: + openshift.io/required-scc: "privileged" + labels: + app.kubernetes.io/name: + spec: + containers: + - name: collector + image: + imagePullPolicy: IfNotPresent + terminationMessagePolicy: FallbackToLogsOnError + command: [""] + resources: + requests: + cpu: 10m + memory: 32Mi + securityContext: + privileged: true + allowPrivilegeEscalation: true + hostIPC: false + hostNetwork: false + hostPID: true + priorityClassName: system-node-critical + serviceAccountName: tnf-setup-manager + nodeSelector: + node-role.kubernetes.io/master: "" + tolerations: + - operator: "Exists" + restartPolicy: OnFailure diff --git a/bindata/tnfdeployment/job.yaml b/bindata/tnfdeployment/job.yaml index 544e40d4d..60264066b 100644 --- a/bindata/tnfdeployment/job.yaml +++ b/bindata/tnfdeployment/job.yaml @@ -15,7 +15,7 @@ spec: - name: tnf-job image: imagePullPolicy: IfNotPresent - command: [ "tnf-setup-runner", "" ] + command: [ "tnf-runtime", "" ] terminationMessagePolicy: FallbackToLogsOnError resources: requests: diff --git a/cmd/tnf-setup-runner/OWNERS b/cmd/tnf-runtime/OWNERS similarity index 100% rename from cmd/tnf-setup-runner/OWNERS rename to cmd/tnf-runtime/OWNERS diff --git a/cmd/tnf-setup-runner/main.go b/cmd/tnf-runtime/main.go similarity index 79% rename from cmd/tnf-setup-runner/main.go rename to cmd/tnf-runtime/main.go index 8452dad92..0d81a3f11 100644 --- a/cmd/tnf-setup-runner/main.go +++ b/cmd/tnf-runtime/main.go @@ -18,6 +18,7 @@ import ( tnfaftersetup "github.com/openshift/cluster-etcd-operator/pkg/tnf/after-setup" tnfauth "github.com/openshift/cluster-etcd-operator/pkg/tnf/auth" tnffencing "github.com/openshift/cluster-etcd-operator/pkg/tnf/fencing" + "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/pacemaker" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools" tnfsetup "github.com/openshift/cluster-etcd-operator/pkg/tnf/setup" ) @@ -37,17 +38,18 @@ func main() { logs.InitLogs() defer logs.FlushLogs() - command := NewTnfSetupRunnerCommand() + command := NewTnfRuntimeCommand() if err := command.Execute(); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } } -func NewTnfSetupRunnerCommand() *cobra.Command { +func NewTnfRuntimeCommand() *cobra.Command { cmd := &cobra.Command{ - Use: "tnf-setup-runner", - Short: "OpenShift Two Node Fencing Setup runner", + Use: "tnf-runtime", + Short: "OpenShift Two Node Fencing Runtime", + Long: "Runtime commands for Two Node Fencing setup and monitoring operations", Run: func(cmd *cobra.Command, args []string) { cmd.Help() os.Exit(1) @@ -58,6 +60,7 @@ func NewTnfSetupRunnerCommand() *cobra.Command { cmd.AddCommand(NewSetupCommand()) cmd.AddCommand(NewAfterSetupCommand()) cmd.AddCommand(NewFencingCommand()) + cmd.AddCommand(NewMonitorCommand()) return cmd } @@ -113,3 +116,14 @@ func NewFencingCommand() *cobra.Command { }, } } + +func NewMonitorCommand() *cobra.Command { + // Reuse the existing collector command as the "monitor" subcommand. + cmd := pacemaker.NewPacemakerStatusCollectorCommand() + cmd.Use = "monitor" + cmd.Short = "Monitor and collect pacemaker cluster status" + cmd.Long = "Collects pacemaker status and updates PacemakerStatus CR" + cmd.SilenceUsage = true + cmd.SilenceErrors = true + return cmd +} diff --git a/manifests/0000_25_etcd-operator_01_pacemakerstatus.crd.yaml b/manifests/0000_25_etcd-operator_01_pacemakerstatus.crd.yaml new file mode 100644 index 000000000..5fd75e512 --- /dev/null +++ b/manifests/0000_25_etcd-operator_01_pacemakerstatus.crd.yaml @@ -0,0 +1,177 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: pacemakerstatuses.etcd.openshift.io + annotations: + include.release.openshift.io/self-managed-high-availability: "true" +spec: + group: etcd.openshift.io + names: + kind: PacemakerStatus + listKind: PacemakerStatusList + plural: pacemakerstatuses + singular: pacemakerstatus + scope: Cluster + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + description: PacemakerStatus represents the current state of the Pacemaker cluster as reported by the pcs status command. + type: object + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation of an object.' + type: string + kind: + description: 'Kind is a string value representing the REST resource this object represents.' + type: string + metadata: + type: object + spec: + description: PacemakerStatusSpec defines the desired state of PacemakerStatus + type: object + properties: + nodeName: + description: NodeName identifies which node this status is for + type: string + status: + description: PacemakerStatusStatus contains the actual pacemaker cluster status information + type: object + properties: + lastUpdated: + description: LastUpdated is the timestamp when this status was last updated + type: string + format: date-time + rawXML: + description: RawXML contains the raw XML output from pcs status xml command + type: string + maxLength: 262144 + collectionError: + description: CollectionError contains any error encountered while collecting status + type: string + summary: + description: Summary provides high-level counts and flags for the cluster state + type: object + properties: + pacemakerdState: + description: PacemakerdState indicates if pacemaker is running + type: string + hasQuorum: + description: HasQuorum indicates if the cluster has quorum + type: boolean + nodesOnline: + description: NodesOnline is the count of online nodes + type: integer + nodesTotal: + description: NodesTotal is the total count of configured nodes + type: integer + resourcesStarted: + description: ResourcesStarted is the count of started resources + type: integer + resourcesTotal: + description: ResourcesTotal is the total count of configured resources + type: integer + recentFailures: + description: RecentFailures indicates if there are recent operation failures + type: boolean + recentFencing: + description: RecentFencing indicates if there are recent fencing events + type: boolean + nodes: + description: Nodes provides detailed information about each node in the cluster + type: array + items: + type: object + required: + - name + - online + properties: + name: + description: Name is the name of the node + type: string + online: + description: Online indicates if the node is online + type: boolean + standby: + description: Standby indicates if the node is in standby mode + type: boolean + resources: + description: Resources provides detailed information about each resource in the cluster + type: array + items: + type: object + required: + - name + properties: + name: + description: Name is the name of the resource + type: string + resourceAgent: + description: ResourceAgent is the resource agent type + type: string + role: + description: Role is the current role of the resource + type: string + active: + description: Active indicates if the resource is active + type: boolean + node: + description: Node is the node where the resource is running + type: string + nodeHistory: + description: NodeHistory provides recent operation history for troubleshooting + type: array + items: + type: object + required: + - node + - resource + - operation + - rc + properties: + node: + description: Node is the node where the operation occurred + type: string + resource: + description: Resource is the resource that was operated on + type: string + operation: + description: Operation is the operation that was performed + type: string + rc: + description: RC is the return code from the operation + type: integer + rcText: + description: RCText is the human-readable return code text + type: string + lastRCChange: + description: LastRCChange is the timestamp when the RC last changed + type: string + format: date-time + fencingHistory: + description: FencingHistory provides recent fencing events + type: array + items: + type: object + required: + - target + - action + - status + properties: + target: + description: Target is the node that was fenced + type: string + action: + description: Action is the fencing action performed + type: string + status: + description: Status is the status of the fencing operation + type: string + completed: + description: Completed is the timestamp when the fencing completed + type: string + format: date-time + subresources: + status: {} diff --git a/pkg/operator/ceohelpers/external_etcd_status.go b/pkg/operator/ceohelpers/external_etcd_status.go index 22f74335a..df4eea22e 100644 --- a/pkg/operator/ceohelpers/external_etcd_status.go +++ b/pkg/operator/ceohelpers/external_etcd_status.go @@ -2,9 +2,14 @@ package ceohelpers import ( "context" + "fmt" + "time" configv1listers "github.com/openshift/client-go/config/listers/config/v1" + operatorv1informers "github.com/openshift/client-go/operator/informers/externalversions/operator/v1" "github.com/openshift/library-go/pkg/operator/v1helpers" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/etcd" @@ -158,3 +163,39 @@ func GetExternalEtcdClusterStatus(ctx context.Context, return externalEtcdStatus, nil } + +// WaitForEtcdCondition is a generic helper that waits for an etcd-related condition to become true. +// It first syncs the etcd informer cache, then polls the condition function until it returns true +// or the timeout is reached. +func WaitForEtcdCondition( + ctx context.Context, + etcdInformer operatorv1informers.EtcdInformer, + operatorClient v1helpers.StaticPodOperatorClient, + conditionCheck func(context.Context, v1helpers.StaticPodOperatorClient) (bool, error), + pollInterval time.Duration, + timeout time.Duration, + conditionName string, +) error { + // Wait for the etcd informer to sync before checking condition + // This ensures operatorClient.GetStaticPodOperatorState() has data to work with + klog.Infof("waiting for etcd informer to sync before checking %s...", conditionName) + if !cache.WaitForCacheSync(ctx.Done(), etcdInformer.Informer().HasSynced) { + return fmt.Errorf("failed to sync etcd informer") + } + klog.Infof("etcd informer synced, checking for %s", conditionName) + + // Poll until the condition is met + return wait.PollUntilContextTimeout(ctx, pollInterval, timeout, true, func(ctx context.Context) (bool, error) { + conditionMet, err := conditionCheck(ctx, operatorClient) + if err != nil { + klog.Warningf("error checking %s, will retry: %v", conditionName, err) + return false, nil + } + if conditionMet { + klog.V(2).Infof("%s condition met", conditionName) + return true, nil + } + klog.V(4).Infof("%s condition not yet met, waiting...", conditionName) + return false, nil + }) +} diff --git a/pkg/operator/starter.go b/pkg/operator/starter.go index e0e829a7d..f01edc1e9 100644 --- a/pkg/operator/starter.go +++ b/pkg/operator/starter.go @@ -627,7 +627,8 @@ func RunOperator(ctx context.Context, controllerContext *controllercmd.Controlle controlPlaneNodeInformer, etcdsInformer, kubeClient, - dynamicClient) + dynamicClient, + AlivenessChecker) if err != nil { return err } diff --git a/pkg/tnf/operator/starter.go b/pkg/tnf/operator/starter.go index 46df969a9..1f2b46ccf 100644 --- a/pkg/tnf/operator/starter.go +++ b/pkg/tnf/operator/starter.go @@ -26,17 +26,17 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" corev1listers "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "github.com/openshift/cluster-etcd-operator/bindata" "github.com/openshift/cluster-etcd-operator/pkg/etcdenvvar" - "github.com/openshift/cluster-etcd-operator/pkg/operator/bootstrapteardown" "github.com/openshift/cluster-etcd-operator/pkg/operator/ceohelpers" "github.com/openshift/cluster-etcd-operator/pkg/operator/externaletcdsupportcontroller" + "github.com/openshift/cluster-etcd-operator/pkg/operator/health" "github.com/openshift/cluster-etcd-operator/pkg/operator/operatorclient" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/jobs" + "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/pacemaker" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools" ) @@ -71,7 +71,8 @@ func HandleDualReplicaClusters( controlPlaneNodeInformer cache.SharedIndexInformer, etcdInformer operatorv1informers.EtcdInformer, kubeClient kubernetes.Interface, - dynamicClient dynamic.Interface) (bool, error) { + dynamicClient dynamic.Interface, + alivenessChecker *health.MultiAlivenessChecker) (bool, error) { // Since HandleDualReplicaClusters isn't a controller, we need to ensure that the Infrastructure // informer is synced before we use it. @@ -92,6 +93,7 @@ func HandleDualReplicaClusters( runExternalEtcdSupportController(ctx, controllerContext, operatorClient, envVarGetter, kubeInformersForNamespaces, infrastructureInformer, networkInformer, controlPlaneNodeInformer, etcdInformer, kubeClient) runTnfResourceController(ctx, controllerContext, kubeClient, dynamicClient, operatorClient, kubeInformersForNamespaces) + runPacemakerControllers(ctx, controllerContext, operatorClient, kubeClient, kubeInformersForNamespaces, alivenessChecker, etcdInformer) controlPlaneNodeLister := corev1listers.NewNodeLister(controlPlaneNodeInformer.GetIndexer()) @@ -224,15 +226,15 @@ func handleNodes( kubeInformersForNamespaces v1helpers.KubeInformersForNamespaces, etcdInformer operatorv1informers.EtcdInformer) error { - // Wait for the etcd informer to sync before checking bootstrap status - // This ensures operatorClient.GetStaticPodOperatorState() has data to work with - klog.Infof("waiting for etcd informer to sync...") - if !cache.WaitForCacheSync(ctx.Done(), etcdInformer.Informer().HasSynced) { - return fmt.Errorf("failed to sync etcd informer") - } - klog.Infof("etcd informer synced") - - if err := waitForEtcdBootstrapCompleted(ctx, operatorClient); err != nil { + if err := ceohelpers.WaitForEtcdCondition( + ctx, + etcdInformer, + operatorClient, + ceohelpers.IsEtcdRunningInCluster, + 10*time.Second, + 30*time.Minute, + "etcd bootstrap completion", + ); err != nil { return fmt.Errorf("failed to wait for etcd bootstrap: %w", err) } klog.Infof("bootstrap completed, creating TNF job controllers") @@ -260,25 +262,6 @@ func handleNodes( return nil } -func waitForEtcdBootstrapCompleted(ctx context.Context, operatorClient v1helpers.StaticPodOperatorClient) error { - isEtcdRunningInCluster, err := ceohelpers.IsEtcdRunningInCluster(ctx, operatorClient) - if err != nil { - return fmt.Errorf("failed to check if bootstrap is completed: %v", err) - } - if !isEtcdRunningInCluster { - klog.Infof("waiting for bootstrap to complete with etcd running in cluster") - clientConfig, err := rest.InClusterConfig() - if err != nil { - return fmt.Errorf("failed to get in-cluster config: %v", err) - } - err = bootstrapteardown.WaitForEtcdBootstrap(ctx, clientConfig) - if err != nil { - return fmt.Errorf("failed to wait for bootstrap to complete: %v", err) - } - } - return nil -} - func runExternalEtcdSupportController(ctx context.Context, controllerContext *controllercmd.ControllerContext, operatorClient v1helpers.StaticPodOperatorClient, @@ -327,6 +310,76 @@ func runTnfResourceController(ctx context.Context, controllerContext *controller go tnfResourceController.Run(ctx, 1) } +func runPacemakerControllers(ctx context.Context, controllerContext *controllercmd.ControllerContext, operatorClient v1helpers.StaticPodOperatorClient, kubeClient kubernetes.Interface, kubeInformersForNamespaces v1helpers.KubeInformersForNamespaces, alivenessChecker *health.MultiAlivenessChecker, etcdInformer operatorv1informers.EtcdInformer) { + // Start the healthcheck controller to watch PacemakerStatus CRs + // This can start early since it just subscribes to CR updates + klog.Infof("starting Pacemaker healthcheck controller") + healthCheckController := pacemaker.NewHealthCheck( + alivenessChecker, + operatorClient, + kubeClient, + controllerContext.EventRecorder, + controllerContext.KubeConfig, + ) + go healthCheckController.Run(ctx, 1) + + // The healthcheck controller is responsible for starting the CronJob that collects + // Pacemaker status, but only after etcd has transitioned to running externally. + // This runs in a background goroutine to avoid blocking the main thread. + go func() { + klog.Infof("waiting for etcd to transition to external before starting Pacemaker status collector") + for { + if err := ceohelpers.WaitForEtcdCondition( + ctx, etcdInformer, operatorClient, ceohelpers.HasExternalEtcdCompletedTransition, + 10*time.Second, 30*time.Minute, "external etcd transition", + ); err != nil { + if ctx.Err() != nil { + klog.Infof("context done while waiting for external etcd transition: %v", err) + return + } + klog.Warningf("external etcd transition not complete yet, will retry in 1m: %v", err) + select { + case <-time.After(time.Minute): + continue + case <-ctx.Done(): + return + } + } + klog.Infof("etcd has transitioned to external; starting Pacemaker status collector CronJob") + runPacemakerStatusCollectorCronJob(ctx, controllerContext, operatorClient, kubeClient) + return + } + }() +} + +func runPacemakerStatusCollectorCronJob(ctx context.Context, controllerContext *controllercmd.ControllerContext, operatorClient v1helpers.StaticPodOperatorClient, kubeClient kubernetes.Interface) { + // Start the cronjob controller to create a CronJob for periodic status collection + // The CronJob runs "tnf-runtime monitor" which executes "sudo -n pcs status xml" + // and updates the PacemakerStatus CR + klog.Infof("starting Pacemaker status collector CronJob controller") + statusCronJobController := jobs.NewCronJobController( + "pacemaker-status-collector", + bindata.MustAsset("tnfdeployment/cronjob.yaml"), + operatorClient, + kubeClient, + controllerContext.EventRecorder, + func(_ *operatorv1.OperatorSpec, cronJob *batchv1.CronJob) error { + // Set the schedule - run every minute + cronJob.Spec.Schedule = "* * * * *" + + // Set the name and labels + cronJob.Labels["app.kubernetes.io/name"] = "pacemaker-status-collector" + + // Configure the container + cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image = os.Getenv("OPERATOR_IMAGE") + cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Command = []string{"tnf-runtime", "monitor"} + + return nil + }, + ) + go statusCronJobController.Run(ctx, 1) +} + func runJobController(ctx context.Context, jobType tools.JobType, nodeName *string, controllerContext *controllercmd.ControllerContext, operatorClient v1helpers.StaticPodOperatorClient, kubeClient kubernetes.Interface, kubeInformersForNamespaces v1helpers.KubeInformersForNamespaces, conditions []string) { nodeNameForLogs := "n/a" if nodeName != nil { diff --git a/pkg/tnf/operator/starter_test.go b/pkg/tnf/operator/starter_test.go index dffa68a91..3379be879 100644 --- a/pkg/tnf/operator/starter_test.go +++ b/pkg/tnf/operator/starter_test.go @@ -16,7 +16,6 @@ import ( fakeconfig "github.com/openshift/client-go/config/clientset/versioned/fake" "github.com/openshift/client-go/config/informers/externalversions" configv1informers "github.com/openshift/client-go/config/informers/externalversions/config/v1" - v1 "github.com/openshift/client-go/config/informers/externalversions/config/v1" operatorversionedclientfake "github.com/openshift/client-go/operator/clientset/versioned/fake" extinfops "github.com/openshift/client-go/operator/informers/externalversions" operatorv1informers "github.com/openshift/client-go/operator/informers/externalversions/operator/v1" @@ -32,11 +31,13 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/utils/clock" "github.com/openshift/cluster-etcd-operator/pkg/etcdenvvar" "github.com/openshift/cluster-etcd-operator/pkg/operator/ceohelpers" + "github.com/openshift/cluster-etcd-operator/pkg/operator/health" "github.com/openshift/cluster-etcd-operator/pkg/operator/operatorclient" u "github.com/openshift/cluster-etcd-operator/pkg/testutils" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/etcd" @@ -50,7 +51,7 @@ type args struct { operatorClient v1helpers.StaticPodOperatorClient envVarGetter etcdenvvar.EnvVar kubeInformersForNamespaces v1helpers.KubeInformersForNamespaces - networkInformer v1.NetworkInformer + networkInformer configv1informers.NetworkInformer controlPlaneNodeInformer cache.SharedIndexInformer etcdInformer operatorv1informers.EtcdInformer kubeClient kubernetes.Interface @@ -91,6 +92,7 @@ func TestHandleDualReplicaClusters(t *testing.T) { return } + alivenessChecker := health.NewMultiAlivenessChecker() started, err := HandleDualReplicaClusters( tt.args.ctx, tt.args.controllerContext, @@ -102,7 +104,8 @@ func TestHandleDualReplicaClusters(t *testing.T) { tt.args.controlPlaneNodeInformer, tt.args.etcdInformer, tt.args.kubeClient, - tt.args.dynamicClient) + tt.args.dynamicClient, + alivenessChecker) if started != tt.wantStarted { t.Errorf("HandleDualReplicaClusters() started = %v, wantStarted %v", started, tt.wantStarted) @@ -253,11 +256,17 @@ func getArgs(t *testing.T, dualReplicaControlPlaneEnabled bool) args { } } + // Create a minimal rest.Config for testing + restConfig := &rest.Config{ + Host: "https://localhost:6443", + } + return args{ initErr: nil, // Default to no error - ctx: context.Background(), + ctx: ctx, controllerContext: &controllercmd.ControllerContext{ EventRecorder: eventRecorder, + KubeConfig: restConfig, }, infrastructureInformer: configInformers.Config().V1().Infrastructures(), operatorClient: fakeOperatorClient, diff --git a/pkg/tnf/pkg/jobs/batch.go b/pkg/tnf/pkg/jobs/batch.go index 7e6b36a4b..db7aa6071 100644 --- a/pkg/tnf/pkg/jobs/batch.go +++ b/pkg/tnf/pkg/jobs/batch.go @@ -39,6 +39,14 @@ func ReadJobV1OrDie(objBytes []byte) *batchv1.Job { return requiredObj.(*batchv1.Job) } +func ReadCronJobV1OrDie(objBytes []byte) *batchv1.CronJob { + requiredObj, err := runtime.Decode(batchCodecs.UniversalDecoder(batchv1.SchemeGroupVersion), objBytes) + if err != nil { + panic(err) + } + return requiredObj.(*batchv1.CronJob) +} + // ApplyJob ensures the form of the specified job is present in the API. If it // does not exist, it will be created. If it does exist, the existing job will be stopped, // and a new Job will be created. diff --git a/pkg/tnf/pkg/jobs/cronjob_controller.go b/pkg/tnf/pkg/jobs/cronjob_controller.go new file mode 100644 index 000000000..edca79ed1 --- /dev/null +++ b/pkg/tnf/pkg/jobs/cronjob_controller.go @@ -0,0 +1,213 @@ +package jobs + +import ( + "context" + "fmt" + "time" + + batchv1 "k8s.io/api/batch/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + + operatorv1 "github.com/openshift/api/operator/v1" + "github.com/openshift/library-go/pkg/controller/factory" + "github.com/openshift/library-go/pkg/operator/events" + "github.com/openshift/library-go/pkg/operator/v1helpers" + + "github.com/openshift/cluster-etcd-operator/pkg/operator/operatorclient" +) + +const ( + // Default resync interval for the controller + defaultResyncInterval = 1 * time.Minute +) + +// CronJobHookFunc is a hook function to modify the CronJob. +// Similar to JobHookFunc, this allows flexible modification of the CronJob object. +type CronJobHookFunc func(*operatorv1.OperatorSpec, *batchv1.CronJob) error + +// CronJobController manages a Kubernetes CronJob resource +type CronJobController struct { + operatorClient v1helpers.OperatorClient + kubeClient kubernetes.Interface + eventRecorder events.Recorder + + // Configuration for the CronJob + manifest []byte + name string + optionalCronJobHooks []CronJobHookFunc +} + +// NewCronJobController creates a new controller for managing a CronJob. +// The name parameter is used for the controller instance name. +// Hook functions are called during sync to modify the CronJob before creation/update. +func NewCronJobController( + name string, + manifest []byte, + operatorClient v1helpers.OperatorClient, + kubeClient kubernetes.Interface, + eventRecorder events.Recorder, + optionalCronJobHooks ...CronJobHookFunc, +) factory.Controller { + c := &CronJobController{ + operatorClient: operatorClient, + kubeClient: kubeClient, + eventRecorder: eventRecorder, + manifest: manifest, + name: name, + optionalCronJobHooks: optionalCronJobHooks, + } + + controllerName := fmt.Sprintf("%sCronJobController", name) + syncCtx := factory.NewSyncContext(controllerName, eventRecorder.WithComponentSuffix(fmt.Sprintf("%s-cronjob", name))) + + return factory.New(). + WithSyncContext(syncCtx). + ResyncEvery(defaultResyncInterval). + WithSync(c.sync). + WithInformers( + operatorClient.Informer(), + ).ToController(controllerName, syncCtx.Recorder()) +} + +func (c *CronJobController) sync(ctx context.Context, syncCtx factory.SyncContext) error { + klog.V(4).Infof("%s sync started", c.name) + defer klog.V(4).Infof("%s sync completed", c.name) + + // Check operator state + opSpec, _, _, err := c.operatorClient.GetOperatorState() + if err != nil { + return err + } + + if opSpec.ManagementState != operatorv1.Managed { + klog.V(4).Infof("Operator is not in Managed state, skipping CronJob creation") + return nil + } + + // Ensure the CronJob exists with the correct configuration + if err := c.ensureCronJob(ctx); err != nil { + return fmt.Errorf("failed to ensure CronJob: %w", err) + } + + return nil +} + +func (c *CronJobController) ensureCronJob(ctx context.Context) error { + // Build the desired CronJob + desired, err := c.buildCronJob(ctx) + if err != nil { + return fmt.Errorf("failed to build CronJob: %w", err) + } + + // Get existing CronJob if it exists + existing, err := c.kubeClient.BatchV1().CronJobs(operatorclient.TargetNamespace).Get(ctx, c.name, metav1.GetOptions{}) + if err != nil { + if !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to get CronJob: %w", err) + } + // CronJob doesn't exist, create it + return c.createCronJob(ctx, desired) + } + + // CronJob exists, check if it needs updating + if c.needsUpdate(existing, desired) { + klog.V(2).Infof("CronJob %s needs updating", c.name) + return c.updateCronJob(ctx, existing, desired) + } + + klog.V(4).Infof("CronJob %s is up to date", c.name) + return nil +} + +func (c *CronJobController) createCronJob(ctx context.Context, cronJob *batchv1.CronJob) error { + _, err := c.kubeClient.BatchV1().CronJobs(operatorclient.TargetNamespace).Create(ctx, cronJob, metav1.CreateOptions{}) + if err != nil { + if apierrors.IsAlreadyExists(err) { + klog.V(4).Infof("CronJob %s already exists", c.name) + return nil + } + return fmt.Errorf("failed to create CronJob: %w", err) + } + + klog.V(2).Infof("Created CronJob: %s/%s with schedule %s", operatorclient.TargetNamespace, c.name, cronJob.Spec.Schedule) + c.eventRecorder.Eventf("CronJobCreated", "Created CronJob: %s/%s", operatorclient.TargetNamespace, c.name) + + return nil +} + +func (c *CronJobController) updateCronJob(ctx context.Context, existing, desired *batchv1.CronJob) error { + // Preserve the resource version + desired.ResourceVersion = existing.ResourceVersion + + _, err := c.kubeClient.BatchV1().CronJobs(operatorclient.TargetNamespace).Update(ctx, desired, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to update CronJob: %w", err) + } + + klog.V(2).Infof("Updated CronJob: %s/%s", operatorclient.TargetNamespace, c.name) + c.eventRecorder.Eventf("CronJobUpdated", "Updated CronJob: %s/%s", operatorclient.TargetNamespace, c.name) + + return nil +} + +func (c *CronJobController) buildCronJob(ctx context.Context) (*batchv1.CronJob, error) { + // Parse the manifest + cronJob := ReadCronJobV1OrDie(c.manifest) + + // Get operator spec for hooks + opSpec, _, _, err := c.operatorClient.GetOperatorState() + if err != nil { + return nil, fmt.Errorf("failed to get operator state: %w", err) + } + + // Enforce controller-managed identity and scope + cronJob.Namespace = operatorclient.TargetNamespace + cronJob.Name = c.name + + // Apply all hook functions + for i, hook := range c.optionalCronJobHooks { + if err := hook(opSpec, cronJob); err != nil { + return nil, fmt.Errorf("hook function %d failed: %w", i, err) + } + } + + return cronJob, nil +} + +func (c *CronJobController) needsUpdate(existing, desired *batchv1.CronJob) bool { + // Check if schedule has changed + if existing.Spec.Schedule != desired.Spec.Schedule { + klog.V(4).Infof("Schedule changed from %q to %q", existing.Spec.Schedule, desired.Spec.Schedule) + return true + } + + // Check if the job template has changed (containers, command, image, etc.) + if len(existing.Spec.JobTemplate.Spec.Template.Spec.Containers) > 0 && + len(desired.Spec.JobTemplate.Spec.Template.Spec.Containers) > 0 { + existingContainer := existing.Spec.JobTemplate.Spec.Template.Spec.Containers[0] + desiredContainer := desired.Spec.JobTemplate.Spec.Template.Spec.Containers[0] + + // Check if command has changed + if len(existingContainer.Command) != len(desiredContainer.Command) { + klog.V(4).Infof("Command length changed") + return true + } + for i := range desiredContainer.Command { + if existingContainer.Command[i] != desiredContainer.Command[i] { + klog.V(4).Infof("Command changed at index %d", i) + return true + } + } + + // Check if image has changed + if existingContainer.Image != desiredContainer.Image { + klog.V(4).Infof("Image changed from %q to %q", existingContainer.Image, desiredContainer.Image) + return true + } + } + + return false +} diff --git a/pkg/tnf/pkg/pacemaker/client_helpers.go b/pkg/tnf/pkg/pacemaker/client_helpers.go new file mode 100644 index 000000000..c0ddfe78b --- /dev/null +++ b/pkg/tnf/pkg/pacemaker/client_helpers.go @@ -0,0 +1,56 @@ +package pacemaker + +import ( + "fmt" + "os" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + + "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/pacemaker/v1alpha1" +) + +// getKubeConfig returns a Kubernetes REST config, trying in-cluster config first, +// then falling back to kubeconfig file from environment or default location. +func getKubeConfig() (*rest.Config, error) { + // Try in-cluster config first + config, err := rest.InClusterConfig() + if err != nil { + // Fall back to kubeconfig + kubeconfig := os.Getenv(envKubeconfig) + if kubeconfig == "" { + kubeconfig = os.Getenv(envHome) + defaultKubeconfigPath + } + config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + return nil, fmt.Errorf("failed to create kubernetes config: %w", err) + } + } + return config, nil +} + +// createPacemakerRESTClient creates a REST client configured for PacemakerStatus CRs. +// It takes a base Kubernetes REST config and configures it with the necessary +// scheme, group version, and serializer for the PacemakerStatus CRD. +func createPacemakerRESTClient(baseConfig *rest.Config) (rest.Interface, error) { + // Set up the scheme for PacemakerStatus CRD + scheme := runtime.NewScheme() + if err := v1alpha1.AddToScheme(scheme); err != nil { + return nil, fmt.Errorf("failed to add PacemakerStatus scheme: %w", err) + } + + // Configure the REST client for the PacemakerStatus CRD + pacemakerConfig := rest.CopyConfig(baseConfig) + pacemakerConfig.GroupVersion = &v1alpha1.SchemeGroupVersion + pacemakerConfig.APIPath = kubernetesAPIPath + pacemakerConfig.NegotiatedSerializer = serializer.NewCodecFactory(scheme) + + restClient, err := rest.RESTClientFor(pacemakerConfig) + if err != nil { + return nil, fmt.Errorf("failed to create REST client for PacemakerStatus: %w", err) + } + + return restClient, nil +} diff --git a/pkg/tnf/pkg/pacemaker/healthcheck.go b/pkg/tnf/pkg/pacemaker/healthcheck.go new file mode 100644 index 000000000..04bc2bfbb --- /dev/null +++ b/pkg/tnf/pkg/pacemaker/healthcheck.go @@ -0,0 +1,623 @@ +package pacemaker + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + operatorv1 "github.com/openshift/api/operator/v1" + "github.com/openshift/library-go/pkg/controller/factory" + "github.com/openshift/library-go/pkg/operator/events" + "github.com/openshift/library-go/pkg/operator/v1helpers" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + + "github.com/openshift/cluster-etcd-operator/pkg/operator/health" + "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/pacemaker/v1alpha1" +) + +// Constants for time windows and status strings +const ( + // Resync interval for health check controller + healthCheckResyncInterval = 30 * time.Second + + // Event deduplication windows - avoid recording the same event within these times + eventDeduplicationWindowFencing = 1 * time.Hour // Fencing events create reminders every hour for 24 hours + eventDeduplicationWindowDefault = 5 * time.Minute // Resource actions and general warnings + + // Expected number of nodes in ExternalEtcd cluster + expectedNodeCount = 2 + + // Status strings + statusHealthy = "Healthy" + statusWarning = "Warning" + statusError = "Error" + statusUnknown = "Unknown" + + // Pacemaker state + pacemakerStateRunning = "running" + + // Resource names + resourceKubelet = "kubelet" + resourceEtcd = "etcd" + + // Resource agent names + resourceAgentKubelet = "systemd:kubelet" + resourceAgentEtcd = "ocf:heartbeat:podman-etcd" + + // Degraded condition reason + reasonPacemakerUnhealthy = "PacemakerUnhealthy" + + // PacemakerStatus CR name + PacemakerStatusResourceName = "cluster" + + // Warning message prefixes for categorizing events + warningPrefixFailedAction = "Recent failed resource action:" + warningPrefixFencingEvent = "Recent fencing event:" + + // Operator condition types + conditionTypePacemakerDegraded = "PacemakerHealthCheckDegraded" + + // Event reasons + eventReasonFailedAction = "PacemakerFailedResourceAction" + eventReasonFencingEvent = "PacemakerFencingEvent" + eventReasonWarning = "PacemakerWarning" + eventReasonError = "PacemakerError" + eventReasonHealthy = "PacemakerHealthy" + + // Resource roles + roleStarted = "Started" + + // Kubernetes API constants + kubernetesAPIPath = "/apis" + pacemakerResourceName = "pacemakerstatuses" + + // Time thresholds + statusStalenessThreshold = 2 * time.Minute + + // Event key prefixes + eventKeyPrefixWarning = "warning:" + eventKeyPrefixError = "error:" + + // Error messages + msgPacemakerNotRunning = "Pacemaker is not running" + msgClusterNoQuorum = "Cluster does not have quorum" + msgNoNodesFound = "No nodes found" + msgNodeOffline = "Node %s is not online" + msgNodeStandby = "Node %s is in standby (unexpected behavior; treated as offline)" + msgResourceNotStarted = "%s resource not started on all nodes (started on %d/%d nodes)" + msgPacemakerDegraded = "Pacemaker cluster is in degraded state" + msgPacemakerHealthy = "Pacemaker cluster is healthy - all nodes online and critical resources started" + + // Event message templates + msgDetectedFailedAction = "Pacemaker detected a recent failed resource action: %s" + msgDetectedFencing = "Pacemaker detected a recent fencing event: %s" + msgPacemakerWarning = "Pacemaker warning: %s" + msgPacemakerError = "Pacemaker error: %s" +) + +// HealthStatus represents the overall status of pacemaker in the ExternalEtcd cluster +type HealthStatus struct { + OverallStatus string + Warnings []string + Errors []string +} + +// newUnknownHealthStatus creates a HealthStatus with Unknown status and an error message +func newUnknownHealthStatus(errMsg string) *HealthStatus { + return &HealthStatus{ + OverallStatus: statusUnknown, + Warnings: []string{}, + Errors: []string{errMsg}, + } +} + +// HealthCheck monitors pacemaker status in ExternalEtcd topology clusters +type HealthCheck struct { + operatorClient v1helpers.StaticPodOperatorClient + kubeClient kubernetes.Interface + eventRecorder events.Recorder + pacemakerRESTClient rest.Interface + pacemakerInformer cache.SharedIndexInformer + + // Track recently recorded events to avoid duplicates + recordedEventsMu sync.Mutex + recordedEvents map[string]time.Time + + // Track previous health status to determine if we should record healthy events + previousStatusMu sync.Mutex + previousStatus string + + // Track last processed PacemakerStatus to detect changes + lastProcessedStatusMu sync.Mutex + lastProcessedStatus *v1alpha1.PacemakerStatus +} + +// NewHealthCheck creates a new HealthCheck for monitoring pacemaker status +// in clusters that use ExternalEtcd clusters +func NewHealthCheck( + livenessChecker *health.MultiAlivenessChecker, + operatorClient v1helpers.StaticPodOperatorClient, + kubeClient kubernetes.Interface, + eventRecorder events.Recorder, + restConfig *rest.Config, +) factory.Controller { + // Create REST client for PacemakerStatus CRs + restClient, err := createPacemakerRESTClient(restConfig) + if err != nil { + panic(err) + } + + // Create scheme for the parameter codec + scheme := runtime.NewScheme() + if err := v1alpha1.AddToScheme(scheme); err != nil { + panic(fmt.Errorf("failed to add scheme for informer: %w", err)) + } + + // Create informer for PacemakerStatus + informer := cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + result := &v1alpha1.PacemakerStatusList{} + err := restClient.Get(). + Resource(pacemakerResourceName). + VersionedParams(&options, runtime.NewParameterCodec(scheme)). + Do(context.Background()). + Into(result) + return result, err + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return restClient.Get(). + Resource(pacemakerResourceName). + VersionedParams(&options, runtime.NewParameterCodec(scheme)). + Watch(context.Background()) + }, + }, + &v1alpha1.PacemakerStatus{}, + healthCheckResyncInterval, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + + c := &HealthCheck{ + operatorClient: operatorClient, + kubeClient: kubeClient, + eventRecorder: eventRecorder, + pacemakerRESTClient: restClient, + pacemakerInformer: informer, + recordedEvents: make(map[string]time.Time), + previousStatus: statusUnknown, + } + + syncCtx := factory.NewSyncContext("PacemakerHealthCheck", eventRecorder.WithComponentSuffix("pacemaker-health-check")) + + syncer := health.NewDefaultCheckingSyncWrapper(c.sync) + livenessChecker.Add("PacemakerHealthCheck", syncer) + + return factory.New(). + WithSyncContext(syncCtx). + ResyncEvery(healthCheckResyncInterval). + WithSync(syncer.Sync). + WithInformers( + operatorClient.Informer(), + informer, + ).ToController("PacemakerHealthCheck", syncCtx.Recorder()) +} + +// sync is the main sync function that gets called periodically to check pacemaker status +func (c *HealthCheck) sync(ctx context.Context, syncCtx factory.SyncContext) error { + klog.V(4).Infof("PacemakerHealthCheck sync started") + defer klog.V(4).Infof("PacemakerHealthCheck sync completed") + + // Get pacemaker status + healthStatus, err := c.getPacemakerStatus(ctx) + if err != nil { + return err + } + + // Update operator status conditions based on pacemaker status + if err := c.updateOperatorStatus(ctx, healthStatus); err != nil { + return err + } + + // Record pacemaker health check events + c.recordHealthCheckEvents(healthStatus) + + return nil +} + +// getPacemakerStatus retrieves pacemaker status from the PacemakerStatus CR and returns a HealthStatus struct +func (c *HealthCheck) getPacemakerStatus(ctx context.Context) (*HealthStatus, error) { + klog.V(4).Infof("Retrieving pacemaker status from CR...") + + // Get the PacemakerStatus CR + pacemakerStatus := &v1alpha1.PacemakerStatus{} + err := c.pacemakerRESTClient.Get(). + Resource(pacemakerResourceName). + Name(PacemakerStatusResourceName). + Do(ctx). + Into(pacemakerStatus) + + if err != nil { + return newUnknownHealthStatus(fmt.Sprintf("Failed to get PacemakerStatus CR: %v", err)), nil + } + + // Check if there was an error collecting the status + if pacemakerStatus.Status.CollectionError != "" { + return newUnknownHealthStatus(fmt.Sprintf("Status collection error: %s", pacemakerStatus.Status.CollectionError)), nil + } + + // Check if the status is stale + if time.Since(pacemakerStatus.Status.LastUpdated.Time) > statusStalenessThreshold { + return newUnknownHealthStatus(fmt.Sprintf("Pacemaker status is stale (last updated: %v)", pacemakerStatus.Status.LastUpdated.Time)), nil + } + + // Store the last processed status to detect changes + c.lastProcessedStatusMu.Lock() + c.lastProcessedStatus = pacemakerStatus.DeepCopy() + c.lastProcessedStatusMu.Unlock() + + // Build health status from the CRD status fields + status := c.buildHealthStatusFromCR(pacemakerStatus) + + return status, nil +} + +// buildHealthStatusFromCR builds a HealthStatus from the PacemakerStatus CR status fields +func (c *HealthCheck) buildHealthStatusFromCR(pacemakerStatus *v1alpha1.PacemakerStatus) *HealthStatus { + status := &HealthStatus{ + OverallStatus: statusUnknown, + Warnings: []string{}, + Errors: []string{}, + } + + // Check if pacemaker is running + if pacemakerStatus.Status.Summary.PacemakerdState != pacemakerStateRunning { + status.Errors = append(status.Errors, msgPacemakerNotRunning) + status.OverallStatus = statusError + return status + } + + // Check quorum + if !pacemakerStatus.Status.Summary.HasQuorum { + status.Errors = append(status.Errors, msgClusterNoQuorum) + } + + // Check node status + c.checkNodeStatus(pacemakerStatus, status) + + // Check resource status + c.checkResourceStatus(pacemakerStatus, status) + + // Check for recent failures + c.checkRecentFailures(pacemakerStatus, status) + + // Check for recent fencing events + c.checkFencingEvents(pacemakerStatus, status) + + // Determine overall status + status.OverallStatus = c.determineOverallStatus(status) + + return status +} + +// checkNodeStatus checks if all nodes are online using CRD status fields +func (c *HealthCheck) checkNodeStatus(pacemakerStatus *v1alpha1.PacemakerStatus, status *HealthStatus) { + nodes := pacemakerStatus.Status.Nodes + + // If this happens, something is seriously wrong with the cluster. + if len(nodes) == 0 { + status.Errors = append(status.Errors, msgNoNodesFound) + return + } + + // Check if we have the expected number of nodes. + // This should almost always be 2, but 1 node is possible during a control-plane node replacement event. + if len(nodes) != expectedNodeCount { + status.Warnings = append(status.Warnings, fmt.Sprintf("Expected %d nodes, found %d", expectedNodeCount, len(nodes))) + } + + for _, node := range nodes { + if !node.Online { + status.Errors = append(status.Errors, fmt.Sprintf(msgNodeOffline, node.Name)) + } + + if node.Standby { + status.Errors = append(status.Errors, fmt.Sprintf(msgNodeStandby, node.Name)) + } + } +} + +// checkResourceStatus checks if kubelet and etcd resources are started on both nodes using CRD status fields +func (c *HealthCheck) checkResourceStatus(pacemakerStatus *v1alpha1.PacemakerStatus, status *HealthStatus) { + resources := pacemakerStatus.Status.Resources + + resourcesStarted := make(map[string]map[string]bool) + resourcesStarted[resourceKubelet] = make(map[string]bool) + resourcesStarted[resourceEtcd] = make(map[string]bool) + + for _, resource := range resources { + // A resource is considered "started" if it has Role="Started" and Active=true + if resource.Role == roleStarted && resource.Active && resource.Node != "" { + // Check if this is a kubelet or etcd resource + switch { + case strings.HasPrefix(resource.ResourceAgent, resourceAgentKubelet): + resourcesStarted[resourceKubelet][resource.Node] = true + case strings.HasPrefix(resource.ResourceAgent, resourceAgentEtcd): + resourcesStarted[resourceEtcd][resource.Node] = true + } + } + } + + // Check if we have both resources started on all nodes + kubeletCount := len(resourcesStarted[resourceKubelet]) + etcdCount := len(resourcesStarted[resourceEtcd]) + + c.validateResourceCount(resourceKubelet, kubeletCount, status) + c.validateResourceCount(resourceEtcd, etcdCount, status) +} + +// validateResourceCount validates that a resource is started on the expected number of nodes +func (c *HealthCheck) validateResourceCount(resourceName string, actualCount int, status *HealthStatus) { + if actualCount < expectedNodeCount { + status.Errors = append(status.Errors, fmt.Sprintf(msgResourceNotStarted, + resourceName, actualCount, expectedNodeCount)) + } +} + +// checkRecentFailures checks for recent failed resource actions using CRD status fields +func (c *HealthCheck) checkRecentFailures(pacemakerStatus *v1alpha1.PacemakerStatus, status *HealthStatus) { + // The status collector already filters NodeHistory to recent events (last 5 minutes) + // and only includes failed operations (RC != 0) + for _, entry := range pacemakerStatus.Status.NodeHistory { + // Check for failed operations (rc != 0) + if entry.RC != 0 { + status.Warnings = append(status.Warnings, fmt.Sprintf("%s %s %s on %s failed (rc=%d, %s, %s)", + warningPrefixFailedAction, entry.Resource, entry.Operation, entry.Node, + entry.RC, entry.RCText, entry.LastRCChange.Format(time.RFC3339))) + } + } +} + +// checkFencingEvents checks for recent fencing events using CRD status fields +func (c *HealthCheck) checkFencingEvents(pacemakerStatus *v1alpha1.PacemakerStatus, status *HealthStatus) { + // The status collector already filters FencingHistory to recent events (last 24 hours) + for _, fenceEvent := range pacemakerStatus.Status.FencingHistory { + status.Warnings = append(status.Warnings, fmt.Sprintf("%s %s of %s %s", + warningPrefixFencingEvent, fenceEvent.Action, fenceEvent.Target, fenceEvent.Status)) + } +} + +// determineOverallStatus determines the overall health status based on collected information +func (c *HealthCheck) determineOverallStatus(status *HealthStatus) string { + // Determine status based on current state + if len(status.Errors) > 0 { + return statusError + } + + if len(status.Warnings) > 0 { + return statusWarning + } + + return statusHealthy +} + +// updateOperatorStatus processes the HealthStatus and conditionally updates the operator state +func (c *HealthCheck) updateOperatorStatus(ctx context.Context, status *HealthStatus) error { + klog.V(4).Infof("Updating operator availability based on pacemaker status: %s", status.OverallStatus) + + // Log warnings and errors + c.logStatusMessages(status) + + // Update operator conditions based on pacemaker status + switch status.OverallStatus { + case statusError: + return c.setPacemakerDegradedCondition(ctx, status) + case statusHealthy: + return c.clearPacemakerDegradedCondition(ctx) + default: + // For Warning or Unknown status, we don't update the degraded condition + return nil + } +} + +// logStatusMessages logs all warnings and errors +func (c *HealthCheck) logStatusMessages(status *HealthStatus) { + for _, warning := range status.Warnings { + klog.Warningf(msgPacemakerWarning, warning) + } + + for _, err := range status.Errors { + klog.Errorf(msgPacemakerError, err) + } +} + +func (c *HealthCheck) setPacemakerDegradedCondition(ctx context.Context, status *HealthStatus) error { + message := strings.Join(status.Errors, "; ") + if message == "" { + message = msgPacemakerDegraded + } + + // Check if the condition is already set with the same message + currentCondition, err := c.getCurrentPacemakerCondition() + if err != nil { + return err + } + + // Only update if the condition is not already set to True with the same message + if currentCondition != nil && + currentCondition.Status == operatorv1.ConditionTrue && + currentCondition.Message == message { + klog.V(4).Infof("Pacemaker degraded condition already set with same message, skipping update") + return nil + } + + condition := operatorv1.OperatorCondition{ + Type: conditionTypePacemakerDegraded, + Status: operatorv1.ConditionTrue, + Reason: reasonPacemakerUnhealthy, + Message: message, + } + + return c.updateOperatorCondition(ctx, condition) +} + +func (c *HealthCheck) clearPacemakerDegradedCondition(ctx context.Context) error { + // Check if the condition is currently set to True before attempting to clear it + currentCondition, err := c.getCurrentPacemakerCondition() + if err != nil { + return err + } + + // Only update if the condition is currently True (degraded) + if currentCondition == nil || currentCondition.Status != operatorv1.ConditionTrue { + klog.V(4).Infof("Pacemaker degraded condition is not set or already False, skipping update") + return nil + } + + condition := operatorv1.OperatorCondition{ + Type: conditionTypePacemakerDegraded, + Status: operatorv1.ConditionFalse, + } + + return c.updateOperatorCondition(ctx, condition) +} + +// getCurrentPacemakerCondition retrieves the current PacemakerHealthCheckDegraded condition +func (c *HealthCheck) getCurrentPacemakerCondition() (*operatorv1.OperatorCondition, error) { + _, currentStatus, _, err := c.operatorClient.GetStaticPodOperatorState() + if err != nil { + return nil, fmt.Errorf("failed to get current operator status: %w", err) + } + + // Find the current pacemaker degraded condition + for i := range currentStatus.Conditions { + if currentStatus.Conditions[i].Type == conditionTypePacemakerDegraded { + return ¤tStatus.Conditions[i], nil + } + } + + return nil, nil +} + +// updateOperatorCondition updates the operator condition +func (c *HealthCheck) updateOperatorCondition(ctx context.Context, condition operatorv1.OperatorCondition) error { + _, _, err := v1helpers.UpdateStaticPodStatus(ctx, c.operatorClient, v1helpers.UpdateStaticPodConditionFn(condition)) + if err != nil { + klog.Errorf("Failed to update operator status: %v", err) + return err + } + + klog.V(2).Infof("Updated operator condition: %s=%s", condition.Type, condition.Status) + return nil +} + +// cleanupExpiredEvents removes events from the deduplication map that have exceeded their window. +// Fencing events are kept for 1 hour, other events for 5 minutes. +func (c *HealthCheck) cleanupExpiredEvents() { + c.recordedEventsMu.Lock() + defer c.recordedEventsMu.Unlock() + + now := time.Now() + + for key, timestamp := range c.recordedEvents { + // Determine the appropriate window based on event type + // Fencing events have longer window + window := eventDeduplicationWindowDefault + if strings.Contains(key, warningPrefixFencingEvent) { + window = eventDeduplicationWindowFencing + } + + // Remove if expired + if now.Sub(timestamp) > window { + delete(c.recordedEvents, key) + } + } +} + +// shouldRecordEvent checks if an event should be recorded based on deduplication logic. +// This should be called after cleanupExpiredEvents() has been called. +func (c *HealthCheck) shouldRecordEvent(eventKey string, deduplicationWindow time.Duration) bool { + c.recordedEventsMu.Lock() + defer c.recordedEventsMu.Unlock() + + now := time.Now() + + // Check if this event was recently recorded + if lastRecorded, exists := c.recordedEvents[eventKey]; exists { + if now.Sub(lastRecorded) < deduplicationWindow { + return false + } + } + + // Record this event + c.recordedEvents[eventKey] = now + return true +} + +// recordHealthCheckEvents records events for pacemaker warnings and fencing history +func (c *HealthCheck) recordHealthCheckEvents(status *HealthStatus) { + // Clean up expired events before processing new ones + c.cleanupExpiredEvents() + + // Record events for warnings with appropriate deduplication window + for _, warning := range status.Warnings { + eventKey := fmt.Sprintf(eventKeyPrefixWarning+"%s", warning) + // Use longer deduplication window for fencing events + deduplicationWindow := eventDeduplicationWindowDefault + if strings.Contains(warning, warningPrefixFencingEvent) { + deduplicationWindow = eventDeduplicationWindowFencing + } + if c.shouldRecordEvent(eventKey, deduplicationWindow) { + c.recordWarningEvent(warning) + } + } + + // Record events for errors with default deduplication window + for _, err := range status.Errors { + eventKey := fmt.Sprintf(eventKeyPrefixError+"%s", err) + if c.shouldRecordEvent(eventKey, eventDeduplicationWindowDefault) { + c.eventRecorder.Warningf(eventReasonError, msgPacemakerError, err) + } + } + + // Record a normal event for healthy status only when transitioning from unhealthy/unknown + if status.OverallStatus == statusHealthy { + var wasUnhealthy bool + func() { + c.previousStatusMu.Lock() + defer c.previousStatusMu.Unlock() + wasUnhealthy = c.previousStatus != statusHealthy + c.previousStatus = statusHealthy + }() + + if wasUnhealthy { + c.eventRecorder.Eventf(eventReasonHealthy, msgPacemakerHealthy) + } + } else { + // Update previous status for non-healthy states + func() { + c.previousStatusMu.Lock() + defer c.previousStatusMu.Unlock() + c.previousStatus = status.OverallStatus + }() + } +} + +// recordWarningEvent records appropriate warning events based on warning type +func (c *HealthCheck) recordWarningEvent(warning string) { + switch { + case strings.Contains(warning, warningPrefixFailedAction): + c.eventRecorder.Warningf(eventReasonFailedAction, msgDetectedFailedAction, warning) + case strings.Contains(warning, warningPrefixFencingEvent): + c.eventRecorder.Warningf(eventReasonFencingEvent, msgDetectedFencing, warning) + default: + c.eventRecorder.Warningf(eventReasonWarning, msgPacemakerWarning, warning) + } +} diff --git a/pkg/tnf/pkg/pacemaker/healthcheck_test.go b/pkg/tnf/pkg/pacemaker/healthcheck_test.go new file mode 100644 index 000000000..b839088d7 --- /dev/null +++ b/pkg/tnf/pkg/pacemaker/healthcheck_test.go @@ -0,0 +1,949 @@ +package pacemaker + +import ( + "bytes" + "context" + "io" + "net/http" + "strings" + "testing" + "time" + + operatorv1 "github.com/openshift/api/operator/v1" + "github.com/openshift/library-go/pkg/operator/events" + "github.com/openshift/library-go/pkg/operator/v1helpers" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/rest" + fakerest "k8s.io/client-go/rest/fake" + "k8s.io/utils/clock" + + "github.com/openshift/cluster-etcd-operator/pkg/operator/health" + "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/pacemaker/v1alpha1" +) + +// Test helper functions +func createTestHealthCheck() *HealthCheck { + kubeClient := fake.NewSimpleClientset() + operatorClient := v1helpers.NewFakeStaticPodOperatorClient( + &operatorv1.StaticPodOperatorSpec{ + OperatorSpec: operatorv1.OperatorSpec{ + ManagementState: operatorv1.Managed, + }, + }, + &operatorv1.StaticPodOperatorStatus{ + OperatorStatus: operatorv1.OperatorStatus{ + Conditions: []operatorv1.OperatorCondition{}, + }, + }, + nil, + nil, + ) + eventRecorder := events.NewInMemoryRecorder("test", clock.RealClock{}) + + // Note: This creates a HealthCheck with mock REST client + // In a real scenario, we'd need to mock the PacemakerStatus CR + return &HealthCheck{ + operatorClient: operatorClient, + kubeClient: kubeClient, + eventRecorder: eventRecorder, + pacemakerRESTClient: nil, // TODO: Mock this for actual tests + pacemakerInformer: nil, // TODO: Mock this for actual tests + recordedEvents: make(map[string]time.Time), + previousStatus: statusUnknown, + } +} + +// createTestHealthCheckWithMockStatus creates a HealthCheck with a mocked PacemakerStatus CR +func createTestHealthCheckWithMockStatus(t *testing.T, mockStatus *v1alpha1.PacemakerStatus) *HealthCheck { + kubeClient := fake.NewSimpleClientset() + operatorClient := v1helpers.NewFakeStaticPodOperatorClient( + &operatorv1.StaticPodOperatorSpec{ + OperatorSpec: operatorv1.OperatorSpec{ + ManagementState: operatorv1.Managed, + }, + }, + &operatorv1.StaticPodOperatorStatus{ + OperatorStatus: operatorv1.OperatorStatus{ + Conditions: []operatorv1.OperatorCondition{}, + }, + }, + nil, + nil, + ) + eventRecorder := events.NewInMemoryRecorder("test", clock.RealClock{}) + + // Create a fake REST client that returns the mock PacemakerStatus + scheme := runtime.NewScheme() + err := v1alpha1.AddToScheme(scheme) + require.NoError(t, err) + + codec := serializer.NewCodecFactory(scheme) + fakeClient := &fakerest.RESTClient{ + Client: fakerest.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { + body, err := runtime.Encode(codec.LegacyCodec(v1alpha1.SchemeGroupVersion), mockStatus) + if err != nil { + return nil, err + } + + header := http.Header{} + header.Set("Content-Type", runtime.ContentTypeJSON) + return &http.Response{ + StatusCode: http.StatusOK, + Header: header, + Body: io.NopCloser(bytes.NewReader(body)), + }, nil + }), + NegotiatedSerializer: codec, + GroupVersion: v1alpha1.SchemeGroupVersion, + VersionedAPIPath: "/apis/" + v1alpha1.SchemeGroupVersion.String(), + } + + return &HealthCheck{ + operatorClient: operatorClient, + kubeClient: kubeClient, + eventRecorder: eventRecorder, + pacemakerRESTClient: fakeClient, + pacemakerInformer: nil, // Not needed for getPacemakerStatus tests + recordedEvents: make(map[string]time.Time), + previousStatus: statusUnknown, + } +} + +// Helper functions removed: loadTestXML and getRecentFailuresXML +// These were used for XML parsing tests, which have been replaced with CRD-based tests + +// Tests +func TestNewHealthCheck(t *testing.T) { + kubeClient := fake.NewSimpleClientset() + operatorClient := v1helpers.NewFakeStaticPodOperatorClient( + &operatorv1.StaticPodOperatorSpec{ + OperatorSpec: operatorv1.OperatorSpec{ + ManagementState: operatorv1.Managed, + }, + }, + &operatorv1.StaticPodOperatorStatus{ + OperatorStatus: operatorv1.OperatorStatus{ + Conditions: []operatorv1.OperatorCondition{}, + }, + }, + nil, + nil, + ) + + livenessChecker := health.NewMultiAlivenessChecker() + eventRecorder := events.NewInMemoryRecorder("test", clock.RealClock{}) + + // Create a minimal rest.Config for testing + restConfig := &rest.Config{ + Host: "https://localhost:6443", + } + + controller := NewHealthCheck( + livenessChecker, + operatorClient, + kubeClient, + eventRecorder, + restConfig, + ) + + require.NotNil(t, controller, "Controller should not be nil") +} + +func TestHealthCheck_getPacemakerStatus(t *testing.T) { + tests := []struct { + name string + mockStatus *v1alpha1.PacemakerStatus + expectedStatus string + expectErrors bool + expectWarnings bool + }{ + { + name: "healthy_status", + mockStatus: &v1alpha1.PacemakerStatus{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1alpha1.SchemeGroupVersion.String(), + Kind: "PacemakerStatus", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster", + }, + Status: v1alpha1.PacemakerStatusStatus{ + Summary: v1alpha1.PacemakerSummary{ + PacemakerdState: "running", + HasQuorum: true, + NodesOnline: 2, + NodesTotal: 2, + }, + Nodes: []v1alpha1.NodeStatus{ + {Name: "master-0", Online: true, Standby: false}, + {Name: "master-1", Online: true, Standby: false}, + }, + Resources: []v1alpha1.ResourceStatus{ + {Name: "kubelet-clone-0", ResourceAgent: "systemd:kubelet", Role: "Started", Active: true, Node: "master-0"}, + {Name: "kubelet-clone-1", ResourceAgent: "systemd:kubelet", Role: "Started", Active: true, Node: "master-1"}, + {Name: "etcd-clone-0", ResourceAgent: "ocf:heartbeat:podman-etcd", Role: "Started", Active: true, Node: "master-0"}, + {Name: "etcd-clone-1", ResourceAgent: "ocf:heartbeat:podman-etcd", Role: "Started", Active: true, Node: "master-1"}, + }, + CollectionError: "", + LastUpdated: metav1.Now(), + }, + }, + expectedStatus: statusHealthy, + expectErrors: false, + expectWarnings: false, + }, + { + name: "collection_error", + mockStatus: &v1alpha1.PacemakerStatus{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1alpha1.SchemeGroupVersion.String(), + Kind: "PacemakerStatus", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster", + }, + Status: v1alpha1.PacemakerStatusStatus{ + CollectionError: "Failed to execute pcs command", + LastUpdated: metav1.Now(), + }, + }, + expectedStatus: statusUnknown, + expectErrors: true, + expectWarnings: false, + }, + { + name: "stale_status", + mockStatus: &v1alpha1.PacemakerStatus{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1alpha1.SchemeGroupVersion.String(), + Kind: "PacemakerStatus", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster", + }, + Status: v1alpha1.PacemakerStatusStatus{ + Summary: v1alpha1.PacemakerSummary{ + PacemakerdState: "running", + HasQuorum: true, + }, + CollectionError: "", + LastUpdated: metav1.Time{Time: time.Now().Add(-5 * time.Minute)}, + }, + }, + expectedStatus: statusUnknown, + expectErrors: true, + expectWarnings: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + controller := createTestHealthCheckWithMockStatus(t, tt.mockStatus) + ctx := context.TODO() + + status, err := controller.getPacemakerStatus(ctx) + + require.NoError(t, err, "getPacemakerStatus should not return an error") + require.NotNil(t, status, "HealthStatus should not be nil") + require.Equal(t, tt.expectedStatus, status.OverallStatus, "Status should match expected") + + if tt.expectErrors { + require.NotEmpty(t, status.Errors, "Should have errors") + } else { + require.Empty(t, status.Errors, "Should not have errors") + } + + if tt.expectWarnings { + require.NotEmpty(t, status.Warnings, "Should have warnings") + } else { + require.Empty(t, status.Warnings, "Should not have warnings") + } + }) + } +} + +func TestHealthCheck_updateOperatorStatus(t *testing.T) { + controller := createTestHealthCheck() + ctx := context.TODO() + + // Test degraded status + status := &HealthStatus{ + OverallStatus: statusError, + Warnings: []string{"Test warning"}, + Errors: []string{"Test error"}, + } + + err := controller.updateOperatorStatus(ctx, status) + require.NoError(t, err, "updateOperatorStatus should not return an error") + + // Test healthy status + status = &HealthStatus{ + OverallStatus: statusHealthy, + Warnings: []string{}, + Errors: []string{}, + } + + err = controller.updateOperatorStatus(ctx, status) + require.NoError(t, err, "updateOperatorStatus should not return an error") + + // Test warning status (should not update operator condition) + status = &HealthStatus{ + OverallStatus: statusWarning, + Warnings: []string{"Test warning"}, + Errors: []string{}, + } + + err = controller.updateOperatorStatus(ctx, status) + require.NoError(t, err, "updateOperatorStatus should not return an error") +} + +func TestHealthCheck_recordHealthCheckEvents(t *testing.T) { + controller := createTestHealthCheck() + + // Test with warnings and errors + status := &HealthStatus{ + OverallStatus: statusWarning, + Warnings: []string{ + "Recent failed resource action: kubelet monitor on master-0 failed (rc=1, Thu Sep 18 13:12:00 2025)", + "Recent fencing event: reboot of master-1 success", + "Some other warning", + }, + Errors: []string{"Test error"}, + } + + controller.recordHealthCheckEvents(status) + + // Test with healthy status + status = &HealthStatus{ + OverallStatus: statusHealthy, + Warnings: []string{}, + Errors: []string{}, + } + + controller.recordHealthCheckEvents(status) +} + +func TestHealthCheck_eventDeduplication(t *testing.T) { + controller := createTestHealthCheck() + recorder := controller.eventRecorder.(events.InMemoryRecorder) + + // First recording - should create events + status := &HealthStatus{ + OverallStatus: statusWarning, + Warnings: []string{ + "Recent failed resource action: kubelet monitor on master-0 failed", + }, + Errors: []string{"Test error"}, + } + controller.recordHealthCheckEvents(status) + + // Count events after first recording + initialEvents := len(recorder.Events()) + require.Greater(t, initialEvents, 0, "Should have recorded events") + + // Immediately record the same events again - should be deduplicated (5 min window) + controller.recordHealthCheckEvents(status) + afterDuplicateEvents := len(recorder.Events()) + require.Equal(t, initialEvents, afterDuplicateEvents, "Duplicate events should not be recorded") + + // Record a different error - should create new event + status.Errors = []string{"Different error"} + controller.recordHealthCheckEvents(status) + afterNewErrorEvents := len(recorder.Events()) + require.Greater(t, afterNewErrorEvents, afterDuplicateEvents, "New error should be recorded") + + // Test fencing event deduplication (longer window) + fencingStatus := &HealthStatus{ + OverallStatus: statusWarning, + Warnings: []string{ + "Recent fencing event: reboot of master-1 success", + }, + Errors: []string{}, + } + controller.recordHealthCheckEvents(fencingStatus) + afterFencingEvents := len(recorder.Events()) + require.Greater(t, afterFencingEvents, afterNewErrorEvents, "First fencing event should be recorded") + + // Immediately record the same fencing event - should be deduplicated (1 hour window) + controller.recordHealthCheckEvents(fencingStatus) + afterFencingDuplicateEvents := len(recorder.Events()) + require.Equal(t, afterFencingEvents, afterFencingDuplicateEvents, "Duplicate fencing event should not be recorded") + + // Test healthy event only recorded on transition from unhealthy + healthyStatus := &HealthStatus{ + OverallStatus: statusHealthy, + Warnings: []string{}, + Errors: []string{}, + } + controller.recordHealthCheckEvents(healthyStatus) + afterHealthyEvents := len(recorder.Events()) + require.Greater(t, afterHealthyEvents, afterFencingDuplicateEvents, "Healthy event should be recorded when transitioning from unhealthy") + + // Record healthy again immediately - should NOT be recorded (only on transition) + controller.recordHealthCheckEvents(healthyStatus) + afterHealthyDuplicateEvents := len(recorder.Events()) + require.Equal(t, afterHealthyEvents, afterHealthyDuplicateEvents, "Healthy event should not be recorded when already healthy") + + // Transition to unhealthy and back to healthy - should record healthy event + controller.recordHealthCheckEvents(status) // Become unhealthy + controller.recordHealthCheckEvents(healthyStatus) + afterTransitionEvents := len(recorder.Events()) + require.Greater(t, afterTransitionEvents, afterHealthyDuplicateEvents, "Healthy event should be recorded on transition from unhealthy to healthy") +} + +func TestHealthCheck_eventDeduplication_DifferentWindows(t *testing.T) { + controller := createTestHealthCheck() + recorder := controller.eventRecorder.(events.InMemoryRecorder) + + // Record a resource action warning + resourceActionStatus := &HealthStatus{ + OverallStatus: statusWarning, + Warnings: []string{ + "Recent failed resource action: etcd start on master-0 failed", + }, + Errors: []string{}, + } + controller.recordHealthCheckEvents(resourceActionStatus) + initialEvents := len(recorder.Events()) + require.Equal(t, 1, initialEvents, "Should have recorded 1 event") + + // Record a fencing event warning + fencingStatus := &HealthStatus{ + OverallStatus: statusWarning, + Warnings: []string{ + "Recent fencing event: reboot of master-0 success", + }, + Errors: []string{}, + } + controller.recordHealthCheckEvents(fencingStatus) + afterFencingEvents := len(recorder.Events()) + require.Equal(t, 2, afterFencingEvents, "Should have recorded fencing event") + + // Verify both events are tracked separately + controller.recordedEventsMu.Lock() + require.Len(t, controller.recordedEvents, 2, "Should have 2 events in deduplication map") + controller.recordedEventsMu.Unlock() + + // Try to record the same events again - both should be deduplicated + controller.recordHealthCheckEvents(resourceActionStatus) + controller.recordHealthCheckEvents(fencingStatus) + afterDuplicates := len(recorder.Events()) + require.Equal(t, afterFencingEvents, afterDuplicates, "Duplicate events should not be recorded") +} + +func TestHealthCheck_eventDeduplication_MultipleErrors(t *testing.T) { + controller := createTestHealthCheck() + recorder := controller.eventRecorder.(events.InMemoryRecorder) + + // Record multiple errors at once + status := &HealthStatus{ + OverallStatus: statusError, + Warnings: []string{}, + Errors: []string{ + "Node master-0 is not online", + "kubelet resource not started on all nodes (started on 1/2 nodes)", + "etcd resource not started on all nodes (started on 1/2 nodes)", + }, + } + controller.recordHealthCheckEvents(status) + initialEvents := len(recorder.Events()) + require.Equal(t, 3, initialEvents, "Should have recorded 3 error events") + + // Record the same errors again - all should be deduplicated + controller.recordHealthCheckEvents(status) + afterDuplicates := len(recorder.Events()) + require.Equal(t, initialEvents, afterDuplicates, "All duplicate errors should be deduplicated") + + // Record a subset with one new error + status.Errors = []string{ + "Node master-0 is not online", // Duplicate + "Cluster does not have quorum", // New + } + controller.recordHealthCheckEvents(status) + afterNewError := len(recorder.Events()) + require.Equal(t, initialEvents+1, afterNewError, "Only the new error should be recorded") +} + +func TestHealthCheck_eventDeduplication_StatusTransitions(t *testing.T) { + controller := createTestHealthCheck() + recorder := controller.eventRecorder.(events.InMemoryRecorder) + + // Start with Unknown status (default) + require.Equal(t, statusUnknown, controller.previousStatus, "Initial status should be Unknown") + + // Transition from Unknown to Healthy - should record + healthyStatus := &HealthStatus{ + OverallStatus: statusHealthy, + Warnings: []string{}, + Errors: []string{}, + } + controller.recordHealthCheckEvents(healthyStatus) + afterFirstHealthy := len(recorder.Events()) + require.Equal(t, 1, afterFirstHealthy, "Healthy event should be recorded on transition from Unknown") + + // Stay healthy - should not record + controller.recordHealthCheckEvents(healthyStatus) + require.Equal(t, afterFirstHealthy, len(recorder.Events()), "No event when staying healthy") + + // Transition to Warning - should not record healthy event + warningStatus := &HealthStatus{ + OverallStatus: statusWarning, + Warnings: []string{"Some warning"}, + Errors: []string{}, + } + controller.recordHealthCheckEvents(warningStatus) + afterWarning := len(recorder.Events()) + require.Greater(t, afterWarning, afterFirstHealthy, "Warning should be recorded") + + // Transition from Warning to Healthy - should record + controller.recordHealthCheckEvents(healthyStatus) + afterSecondHealthy := len(recorder.Events()) + require.Equal(t, afterWarning+1, afterSecondHealthy, "Healthy event should be recorded on transition from Warning") + + // Transition to Error + errorStatus := &HealthStatus{ + OverallStatus: statusError, + Warnings: []string{}, + Errors: []string{"Critical error"}, + } + controller.recordHealthCheckEvents(errorStatus) + afterError := len(recorder.Events()) + require.Greater(t, afterError, afterSecondHealthy, "Error should be recorded") + + // Transition from Error to Healthy - should record + controller.recordHealthCheckEvents(healthyStatus) + afterThirdHealthy := len(recorder.Events()) + require.Equal(t, afterError+1, afterThirdHealthy, "Healthy event should be recorded on transition from Error") +} + +func TestHealthCheck_eventDeduplication_MixedEventTypes(t *testing.T) { + controller := createTestHealthCheck() + recorder := controller.eventRecorder.(events.InMemoryRecorder) + + // Record a status with warnings, errors, and multiple types of warnings + status := &HealthStatus{ + OverallStatus: statusError, + Warnings: []string{ + "Recent failed resource action: kubelet monitor on master-0 failed", + "Recent fencing event: reboot of master-1 success", + "Expected 2 nodes, found 1", + }, + Errors: []string{ + "Node master-0 is not online", + "etcd resource not started on all nodes (started on 1/2 nodes)", + }, + } + controller.recordHealthCheckEvents(status) + initialEvents := len(recorder.Events()) + require.Equal(t, 5, initialEvents, "Should have recorded 5 events (3 warnings + 2 errors)") + + // Verify all events are in deduplication map + controller.recordedEventsMu.Lock() + require.Len(t, controller.recordedEvents, 5, "Should have 5 events in deduplication map") + controller.recordedEventsMu.Unlock() + + // Record the same status again - all should be deduplicated + controller.recordHealthCheckEvents(status) + afterDuplicates := len(recorder.Events()) + require.Equal(t, initialEvents, afterDuplicates, "All duplicate events should not be recorded") +} + +func TestHealthCheck_eventDeduplication_CleanupOldEntries(t *testing.T) { + controller := createTestHealthCheck() + + // Add a very old entry to the recorded events map + oldEventKey := "warning:Very old warning" + controller.recordedEventsMu.Lock() + controller.recordedEvents[oldEventKey] = time.Now().Add(-2 * time.Hour) // 2 hours ago (older than longest window) + controller.recordedEventsMu.Unlock() + + // Trigger cleanup by recording a new event + status := &HealthStatus{ + OverallStatus: statusWarning, + Warnings: []string{"New warning"}, + Errors: []string{}, + } + controller.recordHealthCheckEvents(status) + + // Verify old entry was cleaned up + controller.recordedEventsMu.Lock() + _, exists := controller.recordedEvents[oldEventKey] + require.False(t, exists, "Old entry should have been cleaned up") + controller.recordedEventsMu.Unlock() +} + +func TestHealthCheck_buildHealthStatusFromCR(t *testing.T) { + controller := &HealthCheck{} + + // Test with healthy cluster status + pacemakerStatus := &v1alpha1.PacemakerStatus{ + Status: v1alpha1.PacemakerStatusStatus{ + Summary: v1alpha1.PacemakerSummary{ + PacemakerdState: "running", + HasQuorum: true, + NodesOnline: 2, + NodesTotal: 2, + ResourcesStarted: 4, + ResourcesTotal: 4, + RecentFailures: false, + RecentFencing: false, + }, + Nodes: []v1alpha1.NodeStatus{ + {Name: "master-0", Online: true, Standby: false}, + {Name: "master-1", Online: true, Standby: false}, + }, + Resources: []v1alpha1.ResourceStatus{ + {Name: "kubelet-clone-0", ResourceAgent: "systemd:kubelet", Role: "Started", Active: true, Node: "master-0"}, + {Name: "kubelet-clone-1", ResourceAgent: "systemd:kubelet", Role: "Started", Active: true, Node: "master-1"}, + {Name: "etcd-clone-0", ResourceAgent: "ocf:heartbeat:podman-etcd", Role: "Started", Active: true, Node: "master-0"}, + {Name: "etcd-clone-1", ResourceAgent: "ocf:heartbeat:podman-etcd", Role: "Started", Active: true, Node: "master-1"}, + }, + NodeHistory: []v1alpha1.NodeHistoryEntry{}, + FencingHistory: []v1alpha1.FencingEvent{}, + }, + } + + status := controller.buildHealthStatusFromCR(pacemakerStatus) + + require.NotNil(t, status, "HealthStatus should not be nil") + require.Equal(t, statusHealthy, status.OverallStatus, "Status should be Healthy for healthy cluster") + require.Empty(t, status.Errors, "Should have no errors for healthy cluster") + require.Empty(t, status.Warnings, "Should have no warnings for healthy cluster") +} + +func TestHealthCheck_buildHealthStatusFromCR_OfflineNode(t *testing.T) { + controller := &HealthCheck{} + + // Test with offline node status + pacemakerStatus := &v1alpha1.PacemakerStatus{ + Status: v1alpha1.PacemakerStatusStatus{ + Summary: v1alpha1.PacemakerSummary{ + PacemakerdState: "running", + HasQuorum: true, + NodesOnline: 1, + NodesTotal: 2, + }, + Nodes: []v1alpha1.NodeStatus{ + {Name: "master-0", Online: true, Standby: false}, + {Name: "master-1", Online: false, Standby: false}, + }, + }, + } + + status := controller.buildHealthStatusFromCR(pacemakerStatus) + + require.NotNil(t, status, "HealthStatus should not be nil") + require.Equal(t, statusError, status.OverallStatus, "Status should be Error for offline node") + require.NotEmpty(t, status.Errors, "Should have errors for offline node") + require.Contains(t, status.Errors, "Node master-1 is not online") +} + +func TestHealthCheck_buildHealthStatusFromCR_StandbyNode(t *testing.T) { + controller := &HealthCheck{} + + // Test with standby node status + pacemakerStatus := &v1alpha1.PacemakerStatus{ + Status: v1alpha1.PacemakerStatusStatus{ + Summary: v1alpha1.PacemakerSummary{ + PacemakerdState: "running", + HasQuorum: true, + NodesOnline: 2, + NodesTotal: 2, + }, + Nodes: []v1alpha1.NodeStatus{ + {Name: "master-0", Online: true, Standby: false}, + {Name: "master-1", Online: true, Standby: true}, + }, + }, + } + + status := controller.buildHealthStatusFromCR(pacemakerStatus) + + require.NotNil(t, status, "HealthStatus should not be nil") + require.Equal(t, statusError, status.OverallStatus, "Status should be Error for standby node") + require.NotEmpty(t, status.Errors, "Should have errors for standby node") + require.Contains(t, status.Errors, "Node master-1 is in standby (unexpected behavior; treated as offline)") +} + +func TestHealthCheck_buildHealthStatusFromCR_RecentFailures(t *testing.T) { + controller := &HealthCheck{} + + // Test with recent failures in status + now := metav1.Now() + pacemakerStatus := &v1alpha1.PacemakerStatus{ + Status: v1alpha1.PacemakerStatusStatus{ + Summary: v1alpha1.PacemakerSummary{ + PacemakerdState: "running", + HasQuorum: true, + NodesOnline: 2, + NodesTotal: 2, + RecentFailures: true, + RecentFencing: true, + }, + Nodes: []v1alpha1.NodeStatus{ + {Name: "master-0", Online: true, Standby: false}, + {Name: "master-1", Online: true, Standby: false}, + }, + Resources: []v1alpha1.ResourceStatus{ + {Name: "kubelet-clone-0", ResourceAgent: "systemd:kubelet", Role: "Started", Active: true, Node: "master-0"}, + {Name: "kubelet-clone-1", ResourceAgent: "systemd:kubelet", Role: "Started", Active: true, Node: "master-1"}, + {Name: "etcd-clone-0", ResourceAgent: "ocf:heartbeat:podman-etcd", Role: "Started", Active: true, Node: "master-0"}, + {Name: "etcd-clone-1", ResourceAgent: "ocf:heartbeat:podman-etcd", Role: "Started", Active: true, Node: "master-1"}, + }, + NodeHistory: []v1alpha1.NodeHistoryEntry{ + {Node: "master-0", Resource: "etcd-clone-0", Operation: "monitor", RC: 1, RCText: "error", LastRCChange: now}, + }, + FencingHistory: []v1alpha1.FencingEvent{ + {Target: "master-1", Action: "reboot", Status: "success", Completed: now}, + }, + }, + } + + status := controller.buildHealthStatusFromCR(pacemakerStatus) + + require.NotNil(t, status, "HealthStatus should not be nil") + require.Equal(t, statusWarning, status.OverallStatus, "Status should be Warning for recent failures") + require.Empty(t, status.Errors, "Should have no errors for healthy cluster") + require.NotEmpty(t, status.Warnings, "Should have warnings for recent failures") + + // Check for specific warnings + foundFailedAction := false + foundFencingEvent := false + for _, warning := range status.Warnings { + if strings.Contains(warning, "Recent failed resource action") { + foundFailedAction = true + } + if strings.Contains(warning, "Recent fencing event") { + foundFencingEvent = true + } + } + require.True(t, foundFailedAction, "Should have warning for recent failed resource action") + require.True(t, foundFencingEvent, "Should have warning for recent fencing event") +} + +func TestHealthCheck_checkNodeStatus(t *testing.T) { + controller := &HealthCheck{} + + // Test with healthy nodes + pacemakerStatus := &v1alpha1.PacemakerStatus{ + Status: v1alpha1.PacemakerStatusStatus{ + Nodes: []v1alpha1.NodeStatus{ + {Name: "master-0", Online: true, Standby: false}, + {Name: "master-1", Online: true, Standby: false}, + }, + }, + } + + status := &HealthStatus{Warnings: []string{}, Errors: []string{}} + controller.checkNodeStatus(pacemakerStatus, status) + + require.Empty(t, status.Errors, "Should have no errors for online nodes") + require.Empty(t, status.Warnings, "Should have no warnings for expected node count") +} + +func TestHealthCheck_checkNodeStatus_MismatchedCount(t *testing.T) { + controller := &HealthCheck{} + + // Create status with only 1 node (mismatch with expected 2) + pacemakerStatus := &v1alpha1.PacemakerStatus{ + Status: v1alpha1.PacemakerStatusStatus{ + Nodes: []v1alpha1.NodeStatus{ + {Name: "master-0", Online: true}, + }, + }, + } + + status := &HealthStatus{Warnings: []string{}, Errors: []string{}} + controller.checkNodeStatus(pacemakerStatus, status) + + require.Empty(t, status.Errors, "Should have no errors for single online node") + require.NotEmpty(t, status.Warnings, "Should have warning for node count mismatch") + require.Contains(t, status.Warnings[0], "Expected 2 nodes, found 1") +} + +func TestHealthCheck_checkNodeStatus_NoNodes(t *testing.T) { + controller := &HealthCheck{} + + // Create status with no nodes + pacemakerStatus := &v1alpha1.PacemakerStatus{ + Status: v1alpha1.PacemakerStatusStatus{ + Nodes: []v1alpha1.NodeStatus{}, + }, + } + + status := &HealthStatus{Warnings: []string{}, Errors: []string{}} + controller.checkNodeStatus(pacemakerStatus, status) + + require.NotEmpty(t, status.Errors, "Should have error for no nodes") + require.Contains(t, status.Errors[0], "No nodes found") +} + +func TestHealthCheck_checkNodeStatus_StandbyNode(t *testing.T) { + controller := &HealthCheck{} + + // Create status with one node in standby + pacemakerStatus := &v1alpha1.PacemakerStatus{ + Status: v1alpha1.PacemakerStatusStatus{ + Nodes: []v1alpha1.NodeStatus{ + {Name: "master-0", Online: true, Standby: false}, + {Name: "master-1", Online: true, Standby: true}, + }, + }, + } + + status := &HealthStatus{Warnings: []string{}, Errors: []string{}} + controller.checkNodeStatus(pacemakerStatus, status) + + require.NotEmpty(t, status.Errors, "Should have error for standby node") + require.Contains(t, status.Errors[0], "Node master-1 is in standby (unexpected behavior; treated as offline)") +} + +func TestHealthCheck_checkNodeStatus_OfflineAndStandbyNode(t *testing.T) { + controller := &HealthCheck{} + + // Create status with one offline node and one standby node + pacemakerStatus := &v1alpha1.PacemakerStatus{ + Status: v1alpha1.PacemakerStatusStatus{ + Nodes: []v1alpha1.NodeStatus{ + {Name: "master-0", Online: false, Standby: false}, + {Name: "master-1", Online: true, Standby: true}, + }, + }, + } + + status := &HealthStatus{Warnings: []string{}, Errors: []string{}} + controller.checkNodeStatus(pacemakerStatus, status) + + require.Len(t, status.Errors, 2, "Should have 2 errors (one for offline, one for standby)") + require.Contains(t, status.Errors[0], "Node master-0 is not online") + require.Contains(t, status.Errors[1], "Node master-1 is in standby (unexpected behavior; treated as offline)") +} + +func TestHealthCheck_checkResourceStatus(t *testing.T) { + controller := &HealthCheck{} + + // Test with healthy resources + pacemakerStatus := &v1alpha1.PacemakerStatus{ + Status: v1alpha1.PacemakerStatusStatus{ + Resources: []v1alpha1.ResourceStatus{ + {Name: "kubelet-clone-0", ResourceAgent: "systemd:kubelet", Role: "Started", Active: true, Node: "master-0"}, + {Name: "kubelet-clone-1", ResourceAgent: "systemd:kubelet", Role: "Started", Active: true, Node: "master-1"}, + {Name: "etcd-clone-0", ResourceAgent: "ocf:heartbeat:podman-etcd", Role: "Started", Active: true, Node: "master-0"}, + {Name: "etcd-clone-1", ResourceAgent: "ocf:heartbeat:podman-etcd", Role: "Started", Active: true, Node: "master-1"}, + }, + }, + } + + status := &HealthStatus{Warnings: []string{}, Errors: []string{}} + controller.checkResourceStatus(pacemakerStatus, status) + + require.Empty(t, status.Errors, "Should have no errors for started resources") +} + +func TestHealthCheck_determineOverallStatus(t *testing.T) { + controller := &HealthCheck{} + + // Test healthy status + status := &HealthStatus{Warnings: []string{}, Errors: []string{}} + result := controller.determineOverallStatus(status) + require.Equal(t, statusHealthy, result) + + // Test warning status + status = &HealthStatus{Warnings: []string{"warning"}, Errors: []string{}} + result = controller.determineOverallStatus(status) + require.Equal(t, statusWarning, result) + + // Test degraded status - with errors + status = &HealthStatus{Warnings: []string{}, Errors: []string{"existing error"}} + result = controller.determineOverallStatus(status) + require.Equal(t, statusError, result) + + // Test degraded status - with multiple errors + status = &HealthStatus{Warnings: []string{"warning"}, Errors: []string{"error1", "error2"}} + result = controller.determineOverallStatus(status) + require.Equal(t, statusError, result) +} + +func TestHealthCheck_newUnknownHealthStatus(t *testing.T) { + errMsg := "test error message" + status := newUnknownHealthStatus(errMsg) + + require.NotNil(t, status, "Status should not be nil") + require.Equal(t, statusUnknown, status.OverallStatus, "OverallStatus should be Unknown") + require.Len(t, status.Errors, 1, "Should have one error") + require.Equal(t, errMsg, status.Errors[0], "Error message should match") + require.Empty(t, status.Warnings, "Warnings should be empty") +} + +func TestHealthCheck_checkRecentFailures(t *testing.T) { + controller := &HealthCheck{} + + // Test with recent failures + now := metav1.Now() + pacemakerStatus := &v1alpha1.PacemakerStatus{ + Status: v1alpha1.PacemakerStatusStatus{ + NodeHistory: []v1alpha1.NodeHistoryEntry{ + {Node: "master-0", Resource: "etcd-clone-0", Operation: "monitor", RC: 1, RCText: "error", LastRCChange: now}, + }, + }, + } + + status := &HealthStatus{Warnings: []string{}, Errors: []string{}} + controller.checkRecentFailures(pacemakerStatus, status) + + require.NotEmpty(t, status.Warnings, "Should have warnings for recent failures") + foundFailedAction := false + for _, warning := range status.Warnings { + if strings.Contains(warning, "Recent failed resource action") { + foundFailedAction = true + break + } + } + require.True(t, foundFailedAction, "Should have warning for recent failed resource action") +} + +func TestHealthCheck_checkFencingEvents(t *testing.T) { + controller := &HealthCheck{} + + // Test with recent fencing events + now := metav1.Now() + pacemakerStatus := &v1alpha1.PacemakerStatus{ + Status: v1alpha1.PacemakerStatusStatus{ + FencingHistory: []v1alpha1.FencingEvent{ + {Target: "master-1", Action: "reboot", Status: "success", Completed: now}, + }, + }, + } + + status := &HealthStatus{Warnings: []string{}, Errors: []string{}} + controller.checkFencingEvents(pacemakerStatus, status) + + require.NotEmpty(t, status.Warnings, "Should have warnings for recent fencing events") + foundFencingEvent := false + for _, warning := range status.Warnings { + if strings.Contains(warning, "Recent fencing event") { + foundFencingEvent = true + break + } + } + require.True(t, foundFencingEvent, "Should have warning for recent fencing event") +} + +func TestHealthCheck_validateResourceCount(t *testing.T) { + controller := &HealthCheck{} + + // Test with correct count + status := &HealthStatus{Warnings: []string{}, Errors: []string{}} + controller.validateResourceCount(resourceEtcd, 2, status) + require.Empty(t, status.Errors, "Should have no errors for correct count") + + // Test with insufficient count + status = &HealthStatus{Warnings: []string{}, Errors: []string{}} + controller.validateResourceCount(resourceKubelet, 1, status) + require.NotEmpty(t, status.Errors, "Should have error for insufficient count") + require.Contains(t, status.Errors[0], "kubelet resource not started on all nodes (started on 1/2 nodes)") +} diff --git a/pkg/tnf/pkg/pacemaker/statuscollector.go b/pkg/tnf/pkg/pacemaker/statuscollector.go new file mode 100644 index 000000000..07e0d162f --- /dev/null +++ b/pkg/tnf/pkg/pacemaker/statuscollector.go @@ -0,0 +1,561 @@ +package pacemaker + +import ( + "context" + "encoding/xml" + "fmt" + "os" + "time" + + "github.com/spf13/cobra" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + + "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/exec" + "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/pacemaker/v1alpha1" +) + +const ( + // pcsStatusXMLCommand is the command to get pacemaker status in XML format + pcsStatusXMLCommand = "sudo -n pcs status xml" + + // execTimeout is the timeout for executing the pcs command + execTimeout = 10 * time.Second + + // collectorTimeout is the overall timeout for the collector run + collectorTimeout = 30 * time.Second + + // maxXMLSize prevents XML bombs (10MB) + maxXMLSize = 10 * 1024 * 1024 + + // Time windows for detecting recent events + failedActionTimeWindow = 5 * time.Minute + fencingEventTimeWindow = 24 * time.Hour + + // Pacemaker state strings + booleanValueTrue = "true" + operationRCTextSuccess = "ok" + nodeStatusStarted = "Started" + + // Node attribute names + nodeAttributeIP = "node_ip" + + // Time formats for parsing Pacemaker timestamps + pacemakerTimeFormat = "Mon Jan 2 15:04:05 2006" + pacemakerFenceTimeFormat = "2006-01-02 15:04:05.000000Z" + + // Kubernetes API constants (kubernetesAPIPath and pacemakerResourceName shared with healthcheck.go) + statusSubresource = "status" + pacemakerKind = "PacemakerStatus" + + // Error check strings + notFoundError = "not found" + + // Environment variables + envKubeconfig = "KUBECONFIG" + envHome = "HOME" + defaultKubeconfigPath = "/.kube/config" +) + +// XML structures for parsing pacemaker status from "pcs status xml" command output. +// The healthcheck controller does not parse XML - it reads structured data from the PacemakerStatus CR. +type PacemakerResult struct { + XMLName xml.Name `xml:"pacemaker-result"` + Summary Summary `xml:"summary"` + Nodes Nodes `xml:"nodes"` + Resources Resources `xml:"resources"` + NodeAttributes NodeAttributes `xml:"node_attributes"` + NodeHistory NodeHistory `xml:"node_history"` + FenceHistory FenceHistory `xml:"fence_history"` +} + +type Summary struct { + Stack Stack `xml:"stack"` + CurrentDC CurrentDC `xml:"current_dc"` + NodesConfigured NodesConfigured `xml:"nodes_configured"` + ResourcesConfigured ResourcesConfigured `xml:"resources_configured"` +} + +type Stack struct { + PacemakerdState string `xml:"pacemakerd-state,attr"` +} + +type CurrentDC struct { + WithQuorum string `xml:"with_quorum,attr"` +} + +type NodesConfigured struct { + Number string `xml:"number,attr"` +} + +type ResourcesConfigured struct { + Number string `xml:"number,attr"` +} + +type Nodes struct { + Node []Node `xml:"node"` +} + +type Node struct { + Name string `xml:"name,attr"` + ID string `xml:"id,attr"` + Online string `xml:"online,attr"` + Standby string `xml:"standby,attr"` + StandbyOnFail string `xml:"standby_onfail,attr"` + Maintenance string `xml:"maintenance,attr"` + Pending string `xml:"pending,attr"` + Unclean string `xml:"unclean,attr"` + Shutdown string `xml:"shutdown,attr"` + ExpectedUp string `xml:"expected_up,attr"` + IsDC string `xml:"is_dc,attr"` + ResourcesRunning string `xml:"resources_running,attr"` + Type string `xml:"type,attr"` +} + +type NodeAttributes struct { + Node []NodeAttributeSet `xml:"node"` +} + +type NodeAttributeSet struct { + Name string `xml:"name,attr"` + Attribute []NodeAttribute `xml:"attribute"` +} + +type NodeAttribute struct { + Name string `xml:"name,attr"` + Value string `xml:"value,attr"` +} + +type Resources struct { + Clone []Clone `xml:"clone"` + Resource []Resource `xml:"resource"` +} + +type Clone struct { + Resource []Resource `xml:"resource"` +} + +type Resource struct { + ID string `xml:"id,attr"` + ResourceAgent string `xml:"resource_agent,attr"` + Role string `xml:"role,attr"` + TargetRole string `xml:"target_role,attr"` + Active string `xml:"active,attr"` + Orphaned string `xml:"orphaned,attr"` + Blocked string `xml:"blocked,attr"` + Managed string `xml:"managed,attr"` + Failed string `xml:"failed,attr"` + FailureIgnored string `xml:"failure_ignored,attr"` + NodesRunningOn string `xml:"nodes_running_on,attr"` + Node NodeRef `xml:"node"` +} + +type NodeRef struct { + Name string `xml:"name,attr"` +} + +type NodeHistory struct { + Node []NodeHistoryNode `xml:"node"` +} + +type NodeHistoryNode struct { + Name string `xml:"name,attr"` + ResourceHistory []ResourceHistory `xml:"resource_history"` +} + +type ResourceHistory struct { + ID string `xml:"id,attr"` + OperationHistory []OperationHistory `xml:"operation_history"` +} + +type OperationHistory struct { + Call string `xml:"call,attr"` + Task string `xml:"task,attr"` + RC string `xml:"rc,attr"` + RCText string `xml:"rc_text,attr"` + ExitReason string `xml:"exit-reason,attr"` + LastRCChange string `xml:"last-rc-change,attr"` + ExecTime string `xml:"exec-time,attr"` + QueueTime string `xml:"queue-time,attr"` +} + +type FenceHistory struct { + FenceEvent []FenceEvent `xml:"fence_event"` +} + +type FenceEvent struct { + Target string `xml:"target,attr"` + Action string `xml:"action,attr"` + Delegate string `xml:"delegate,attr"` + Client string `xml:"client,attr"` + Origin string `xml:"origin,attr"` + Status string `xml:"status,attr"` + ExitReason string `xml:"exit-reason,attr"` + Completed string `xml:"completed,attr"` +} + +// NewPacemakerStatusCollectorCommand creates a new command for collecting pacemaker status +func NewPacemakerStatusCollectorCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "pacemaker-status-collector", + Short: "Collects pacemaker status and updates PacemakerStatus CR", + Run: func(cmd *cobra.Command, args []string) { + if err := runCollector(); err != nil { + klog.Errorf("Failed to collect pacemaker status: %v", err) + os.Exit(1) + } + }, + } + return cmd +} + +// runCollector executes the full pacemaker status collection workflow: +// 1. Executes "sudo -n pcs status xml" to get cluster status +// 2. Parses the XML output into structured data +// 3. Updates or creates the PacemakerStatus CR with the collected information +func runCollector() error { + ctx, cancel := context.WithTimeout(context.Background(), collectorTimeout) + defer cancel() + + klog.Info("Starting pacemaker status collection...") + + // Collect pacemaker status + rawXML, summary, nodes, resources, nodeHistory, fencingHistory, collectionError := collectPacemakerStatus(ctx) + + // Update PacemakerStatus CR + if err := updatePacemakerStatusCR(ctx, rawXML, summary, nodes, resources, nodeHistory, fencingHistory, collectionError); err != nil { + return fmt.Errorf("failed to update PacemakerStatus CR: %w", err) + } + + klog.Info("Successfully updated PacemakerStatus CR") + return nil +} + +// collectPacemakerStatus executes "pcs status xml" and parses the output into structured data. +// Returns the raw XML, parsed status components, and any error encountered during collection. +// If an error occurs, collectionError will contain the error message and other return values +// will be zero/empty values. +func collectPacemakerStatus(ctx context.Context) ( + rawXML string, + summary v1alpha1.PacemakerSummary, + nodes []v1alpha1.NodeStatus, + resources []v1alpha1.ResourceStatus, + nodeHistory []v1alpha1.NodeHistoryEntry, + fencingHistory []v1alpha1.FencingEvent, + collectionError string, +) { + // Execute the pcs status xml command with a timeout + ctxExec, cancel := context.WithTimeout(ctx, execTimeout) + defer cancel() + + stdout, stderr, err := exec.Execute(ctxExec, pcsStatusXMLCommand) + if err != nil { + collectionError = fmt.Sprintf("Failed to execute pcs status xml command: %v", err) + klog.Warning(collectionError) + return "", summary, nil, nil, nil, nil, collectionError + } + + if stderr != "" { + klog.Warningf("pcs status xml command produced stderr: %s", stderr) + } + + // Validate XML size to prevent XML bombs + if len(stdout) > maxXMLSize { + collectionError = fmt.Sprintf("XML output too large: %d bytes (max: %d bytes)", len(stdout), maxXMLSize) + klog.Warning(collectionError) + return "", summary, nil, nil, nil, nil, collectionError + } + + rawXML = stdout + + // Parse the XML to create a summary + var result PacemakerResult + if err := xml.Unmarshal([]byte(rawXML), &result); err != nil { + collectionError = fmt.Sprintf("Failed to parse XML: %v", err) + klog.Warning(collectionError) + // Still return the raw XML even if parsing fails + return rawXML, summary, nil, nil, nil, nil, collectionError + } + + // Build all status components + summary, nodes, resources, nodeHistory, fencingHistory = buildStatusComponents(&result) + + return rawXML, summary, nodes, resources, nodeHistory, fencingHistory, "" +} + +// buildStatusComponents parses a PacemakerResult (XML) into structured status components. +// It extracts summary information, node status, resource status, and recent history. +// Historical data is filtered to recent time windows (5 minutes for operations, 24 hours for fencing). +func buildStatusComponents(result *PacemakerResult) ( + summary v1alpha1.PacemakerSummary, + nodes []v1alpha1.NodeStatus, + resources []v1alpha1.ResourceStatus, + nodeHistory []v1alpha1.NodeHistoryEntry, + fencingHistory []v1alpha1.FencingEvent, +) { + // Build high-level summary + summary = v1alpha1.PacemakerSummary{ + PacemakerdState: result.Summary.Stack.PacemakerdState, + HasQuorum: result.Summary.CurrentDC.WithQuorum == booleanValueTrue, + } + + // Build node IP map from node attributes + nodeIPMap := make(map[string]string) + for _, nodeAttrSet := range result.NodeAttributes.Node { + for _, attr := range nodeAttrSet.Attribute { + if attr.Name == nodeAttributeIP { + nodeIPMap[nodeAttrSet.Name] = attr.Value + break + } + } + } + + // Build detailed node information + onlineCount := 0 + for _, node := range result.Nodes.Node { + online := node.Online == booleanValueTrue + if online { + onlineCount++ + } + + nodes = append(nodes, v1alpha1.NodeStatus{ + Name: node.Name, + Online: online, + Standby: node.Standby == booleanValueTrue, + }) + } + summary.NodesOnline = onlineCount + summary.NodesTotal = len(result.Nodes.Node) + + // Build resource information + resourcesStarted := 0 + + // Helper function to process a resource + processResource := func(resource Resource) { + active := resource.Active == booleanValueTrue + started := resource.Role == nodeStatusStarted && active + if started { + resourcesStarted++ + } + + resources = append(resources, v1alpha1.ResourceStatus{ + Name: resource.ID, + ResourceAgent: resource.ResourceAgent, + Role: resource.Role, + Active: active, + Node: resource.Node.Name, + }) + } + + // Process clone resources + for _, clone := range result.Resources.Clone { + for _, resource := range clone.Resource { + processResource(resource) + } + } + + // Process standalone resources + for _, resource := range result.Resources.Resource { + processResource(resource) + } + + summary.ResourcesStarted = resourcesStarted + summary.ResourcesTotal = len(resources) + + // Build node history (recent operations) + cutoffTime := time.Now().Add(-failedActionTimeWindow) + hasFailures := false + + for _, node := range result.NodeHistory.Node { + for _, resourceHistory := range node.ResourceHistory { + for _, operation := range resourceHistory.OperationHistory { + // Parse the timestamp + t, err := time.Parse(pacemakerTimeFormat, operation.LastRCChange) + if err != nil { + klog.Warningf("Failed to parse operation timestamp: %v", err) + continue + } + + // Only include recent operations + if !t.After(cutoffTime) { + continue + } + + // Parse RC + rc := 0 + if operation.RC != "" { + if _, err := fmt.Sscanf(operation.RC, "%d", &rc); err != nil { + klog.Warningf("Failed to parse RC value '%s': %v", operation.RC, err) + } + } + + // Check if this is a failure + if rc != 0 || operation.RCText != operationRCTextSuccess { + hasFailures = true + } + + nodeHistory = append(nodeHistory, v1alpha1.NodeHistoryEntry{ + Node: node.Name, + Resource: resourceHistory.ID, + Operation: operation.Task, + RC: rc, + RCText: operation.RCText, + LastRCChange: metav1.NewTime(t), + }) + } + } + } + + summary.RecentFailures = hasFailures + + // Build fencing history + fenceCutoffTime := time.Now().Add(-fencingEventTimeWindow) + hasFencing := false + + for _, fenceEvent := range result.FenceHistory.FenceEvent { + // Parse the timestamp + t, err := time.Parse(pacemakerFenceTimeFormat, fenceEvent.Completed) + if err != nil { + klog.Warningf("Failed to parse fence event timestamp: %v", err) + continue + } + + // Only include recent fencing events + if !t.After(fenceCutoffTime) { + continue + } + + hasFencing = true + + fencingHistory = append(fencingHistory, v1alpha1.FencingEvent{ + Target: fenceEvent.Target, + Action: fenceEvent.Action, + Status: fenceEvent.Status, + Completed: metav1.NewTime(t), + }) + } + + summary.RecentFencing = hasFencing + + return summary, nodes, resources, nodeHistory, fencingHistory +} + +// createKubernetesClient creates a Kubernetes clientset using in-cluster config +// or falling back to kubeconfig file. +func createKubernetesClient() (*kubernetes.Clientset, error) { + config, err := getKubeConfig() + if err != nil { + return nil, err + } + return kubernetes.NewForConfig(config) +} + +// updatePacemakerStatusCR updates or creates the PacemakerStatus custom resource +// with the collected status information. The CR is named "cluster" and is cluster-scoped. +// If the CR doesn't exist, it will be created; otherwise, its status subresource is updated. +func updatePacemakerStatusCR( + ctx context.Context, + rawXML string, + summary v1alpha1.PacemakerSummary, + nodes []v1alpha1.NodeStatus, + resources []v1alpha1.ResourceStatus, + nodeHistory []v1alpha1.NodeHistoryEntry, + fencingHistory []v1alpha1.FencingEvent, + collectionError string, +) error { + // Create REST client for the PacemakerStatus CRD + config, err := getKubeConfig() + if err != nil { + return err + } + + restClient, err := createPacemakerRESTClient(config) + if err != nil { + return err + } + + // Try to get existing PacemakerStatus + var existing v1alpha1.PacemakerStatus + err = restClient.Get(). + Resource(pacemakerResourceName). + Name(PacemakerStatusResourceName). + Do(ctx). + Into(&existing) + + now := metav1.Now() + + if err != nil { + // Create new PacemakerStatus if it doesn't exist + if apierrors.IsNotFound(err) { + newStatus := &v1alpha1.PacemakerStatus{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1alpha1.SchemeGroupVersion.String(), + Kind: pacemakerKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: PacemakerStatusResourceName, + }, + Status: v1alpha1.PacemakerStatusStatus{ + LastUpdated: now, + RawXML: rawXML, + CollectionError: collectionError, + Summary: summary, + Nodes: nodes, + Resources: resources, + NodeHistory: nodeHistory, + FencingHistory: fencingHistory, + }, + } + + result := restClient.Post(). + Resource(pacemakerResourceName). + Body(newStatus). + Do(ctx) + + if result.Error() != nil { + return fmt.Errorf("failed to create PacemakerStatus: %w", result.Error()) + } + + // Ensure status is set on initial create when CRD uses the status subresource + result = restClient.Put(). + Resource(pacemakerResourceName). + Name(PacemakerStatusResourceName). + SubResource(statusSubresource). + Body(newStatus). + Do(ctx) + if result.Error() != nil { + return fmt.Errorf("failed to initialize PacemakerStatus status: %w", result.Error()) + } + klog.Info("Created and initialized PacemakerStatus CR") + + return nil + } + return fmt.Errorf("failed to get PacemakerStatus: %w", err) + } + + // Update existing PacemakerStatus + existing.Status.LastUpdated = now + existing.Status.RawXML = rawXML + existing.Status.CollectionError = collectionError + existing.Status.Summary = summary + existing.Status.Nodes = nodes + existing.Status.Resources = resources + existing.Status.NodeHistory = nodeHistory + existing.Status.FencingHistory = fencingHistory + + result := restClient.Put(). + Resource(pacemakerResourceName). + Name(PacemakerStatusResourceName). + SubResource(statusSubresource). + Body(&existing). + Do(ctx) + + if result.Error() != nil { + return fmt.Errorf("failed to update PacemakerStatus: %w", result.Error()) + } + + klog.Info("Updated existing PacemakerStatus CR") + return nil +} diff --git a/pkg/tnf/pkg/pacemaker/statuscollector_test.go b/pkg/tnf/pkg/pacemaker/statuscollector_test.go new file mode 100644 index 000000000..ce30042ba --- /dev/null +++ b/pkg/tnf/pkg/pacemaker/statuscollector_test.go @@ -0,0 +1,487 @@ +package pacemaker + +import ( + "encoding/xml" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// Helper function to load test XML files +func loadTestXML(t *testing.T, filename string) string { + path := filepath.Join("testdata", filename) + data, err := os.ReadFile(path) + require.NoError(t, err, "Failed to read test XML file: %s", filename) + return string(data) +} + +// Helper function to generate recent failures XML with dynamic timestamps +func getRecentFailuresXML(t *testing.T) string { + // Load the template from testdata + templateXML := loadTestXML(t, "recent_failures_template.xml") + + // Generate recent timestamps that are definitely within the 5-minute window + now := time.Now() + recentTime := now.Add(-1 * time.Minute).UTC().Format("Mon Jan 2 15:04:05 2006") + recentTimeISO := now.Add(-1 * time.Minute).UTC().Format("2006-01-02 15:04:05.000000Z") + + // Replace the placeholders in the template + xmlContent := strings.ReplaceAll(templateXML, "{{RECENT_TIMESTAMP}}", recentTime) + xmlContent = strings.ReplaceAll(xmlContent, "{{RECENT_TIMESTAMP_ISO}}", recentTimeISO) + + return xmlContent +} + +func TestBuildStatusComponents_HealthyCluster(t *testing.T) { + // Test with healthy cluster XML + healthyXML := loadTestXML(t, "healthy_cluster.xml") + var result PacemakerResult + err := xml.Unmarshal([]byte(healthyXML), &result) + require.NoError(t, err) + + summary, nodes, resources, nodeHistory, fencingHistory := buildStatusComponents(&result) + + // Verify summary + require.Equal(t, "running", summary.PacemakerdState) + require.True(t, summary.HasQuorum) + require.Equal(t, 2, summary.NodesOnline) + require.Equal(t, 2, summary.NodesTotal) + require.Greater(t, summary.ResourcesStarted, 0) + require.False(t, summary.RecentFailures) + require.False(t, summary.RecentFencing) + + // Verify nodes + require.Len(t, nodes, 2) + for _, node := range nodes { + require.True(t, node.Online, "All nodes should be online") + require.False(t, node.Standby, "No nodes should be in standby") + } + + // Verify resources + require.NotEmpty(t, resources) + foundKubelet := false + foundEtcd := false + for _, resource := range resources { + if strings.Contains(resource.ResourceAgent, "kubelet") { + foundKubelet = true + } + if strings.Contains(resource.ResourceAgent, "etcd") { + foundEtcd = true + } + } + require.True(t, foundKubelet, "Should find kubelet resources") + require.True(t, foundEtcd, "Should find etcd resources") + + // No recent failures or fencing in healthy cluster + require.Empty(t, nodeHistory) + require.Empty(t, fencingHistory) +} + +func TestBuildStatusComponents_OfflineNode(t *testing.T) { + // Test with offline node XML + offlineXML := loadTestXML(t, "offline_node.xml") + var result PacemakerResult + err := xml.Unmarshal([]byte(offlineXML), &result) + require.NoError(t, err) + + summary, nodes, _, _, _ := buildStatusComponents(&result) + + // Verify summary shows reduced online count + require.Equal(t, "running", summary.PacemakerdState) + require.Equal(t, 1, summary.NodesOnline, "Only one node should be online") + require.Equal(t, 2, summary.NodesTotal) + + // Verify nodes - one should be offline + require.Len(t, nodes, 2) + offlineCount := 0 + for _, node := range nodes { + if !node.Online { + offlineCount++ + } + } + require.Equal(t, 1, offlineCount, "Should have exactly one offline node") +} + +func TestBuildStatusComponents_StandbyNode(t *testing.T) { + // Test with standby node XML + standbyXML := loadTestXML(t, "standby_node.xml") + var result PacemakerResult + err := xml.Unmarshal([]byte(standbyXML), &result) + require.NoError(t, err) + + _, nodes, _, _, _ := buildStatusComponents(&result) + + // Verify nodes - one should be in standby + require.Len(t, nodes, 2) + standbyCount := 0 + for _, node := range nodes { + if node.Standby { + standbyCount++ + } + } + require.Equal(t, 1, standbyCount, "Should have exactly one standby node") +} + +func TestBuildStatusComponents_RecentFailures(t *testing.T) { + // Test with recent failures XML + recentFailuresXML := getRecentFailuresXML(t) + var result PacemakerResult + err := xml.Unmarshal([]byte(recentFailuresXML), &result) + require.NoError(t, err) + + summary, _, _, nodeHistory, fencingHistory := buildStatusComponents(&result) + + // Verify summary flags + require.True(t, summary.RecentFailures, "Should detect recent failures") + require.True(t, summary.RecentFencing, "Should detect recent fencing") + + // Verify node history contains recent failures + require.NotEmpty(t, nodeHistory, "Should have node history entries") + foundFailure := false + for _, entry := range nodeHistory { + if entry.RC != 0 { + foundFailure = true + require.NotEmpty(t, entry.Node, "Entry should have node name") + require.NotEmpty(t, entry.Resource, "Entry should have resource name") + require.NotEmpty(t, entry.Operation, "Entry should have operation") + require.NotEmpty(t, entry.RCText, "Entry should have RC text") + require.False(t, entry.LastRCChange.IsZero(), "Entry should have timestamp") + } + } + require.True(t, foundFailure, "Should have at least one failed operation") + + // Verify fencing history contains recent fencing events + require.NotEmpty(t, fencingHistory, "Should have fencing history entries") + for _, event := range fencingHistory { + require.NotEmpty(t, event.Target, "Event should have target") + require.NotEmpty(t, event.Action, "Event should have action") + require.NotEmpty(t, event.Status, "Event should have status") + require.False(t, event.Completed.IsZero(), "Event should have timestamp") + } +} + +func TestBuildStatusComponents_TimeWindowFiltering(t *testing.T) { + // Create a test result with operations at different times + now := time.Now() + recentTime := now.Add(-2 * time.Minute) // Within 5-minute window + oldTime := now.Add(-10 * time.Minute) // Outside 5-minute window + recentFenceTime := now.Add(-1 * time.Hour) // Within 24-hour window + oldFenceTime := now.Add(-48 * time.Hour) // Outside 24-hour window + + result := &PacemakerResult{ + Summary: Summary{ + Stack: Stack{PacemakerdState: "running"}, + CurrentDC: CurrentDC{WithQuorum: "true"}, + }, + Nodes: Nodes{ + Node: []Node{ + {Name: "master-0", Online: "true"}, + }, + }, + NodeHistory: NodeHistory{ + Node: []NodeHistoryNode{ + { + Name: "master-0", + ResourceHistory: []ResourceHistory{ + { + ID: "etcd-clone-0", + OperationHistory: []OperationHistory{ + { + Task: "monitor", + RC: "1", + RCText: "error", + LastRCChange: recentTime.UTC().Format("Mon Jan 2 15:04:05 2006"), + }, + { + Task: "monitor", + RC: "1", + RCText: "error", + LastRCChange: oldTime.UTC().Format("Mon Jan 2 15:04:05 2006"), + }, + }, + }, + }, + }, + }, + }, + FenceHistory: FenceHistory{ + FenceEvent: []FenceEvent{ + { + Target: "master-1", + Action: "reboot", + Status: "success", + Completed: recentFenceTime.UTC().Format("2006-01-02 15:04:05.000000Z"), + }, + { + Target: "master-1", + Action: "reboot", + Status: "success", + Completed: oldFenceTime.UTC().Format("2006-01-02 15:04:05.000000Z"), + }, + }, + }, + } + + summary, _, _, nodeHistory, fencingHistory := buildStatusComponents(result) + + // Only recent operations should be included + require.Len(t, nodeHistory, 1, "Should only include recent operation") + require.True(t, summary.RecentFailures, "Should detect recent failures") + + // Only recent fencing events should be included + require.Len(t, fencingHistory, 1, "Should only include recent fencing event") + require.True(t, summary.RecentFencing, "Should detect recent fencing") +} + +func TestBuildStatusComponents_ResourceCounting(t *testing.T) { + result := &PacemakerResult{ + Summary: Summary{ + Stack: Stack{PacemakerdState: "running"}, + CurrentDC: CurrentDC{WithQuorum: "true"}, + }, + Nodes: Nodes{ + Node: []Node{ + {Name: "master-0", Online: "true"}, + {Name: "master-1", Online: "true"}, + }, + }, + Resources: Resources{ + Clone: []Clone{ + { + Resource: []Resource{ + { + ID: "kubelet-0", + ResourceAgent: "systemd:kubelet", + Role: "Started", + Active: "true", + Node: NodeRef{Name: "master-0"}, + }, + { + ID: "kubelet-1", + ResourceAgent: "systemd:kubelet", + Role: "Started", + Active: "true", + Node: NodeRef{Name: "master-1"}, + }, + }, + }, + }, + Resource: []Resource{ + { + ID: "etcd-0", + ResourceAgent: "ocf:heartbeat:podman-etcd", + Role: "Started", + Active: "true", + Node: NodeRef{Name: "master-0"}, + }, + { + ID: "etcd-1", + ResourceAgent: "ocf:heartbeat:podman-etcd", + Role: "Stopped", + Active: "false", + Node: NodeRef{Name: ""}, + }, + }, + }, + } + + summary, _, resources, _, _ := buildStatusComponents(result) + + // Verify resource counting + require.Equal(t, 4, summary.ResourcesTotal, "Should count all resources") + require.Equal(t, 3, summary.ResourcesStarted, "Should count only started resources") + + // Verify resource details + require.Len(t, resources, 4) + startedCount := 0 + for _, resource := range resources { + if resource.Role == "Started" && resource.Active { + startedCount++ + } + } + require.Equal(t, 3, startedCount, "Should have 3 started resources") +} + +func TestBuildStatusComponents_EmptyXML(t *testing.T) { + result := &PacemakerResult{ + Summary: Summary{ + Stack: Stack{PacemakerdState: "running"}, + CurrentDC: CurrentDC{WithQuorum: "false"}, + }, + } + + summary, nodes, resources, nodeHistory, fencingHistory := buildStatusComponents(result) + + // Verify minimal valid result + require.Equal(t, "running", summary.PacemakerdState) + require.False(t, summary.HasQuorum) + require.Equal(t, 0, summary.NodesOnline) + require.Equal(t, 0, summary.NodesTotal) + require.Empty(t, nodes) + require.Empty(t, resources) + require.Empty(t, nodeHistory) + require.Empty(t, fencingHistory) +} + +func TestBuildStatusComponents_QuorumHandling(t *testing.T) { + tests := []struct { + name string + withQuorum string + expectedQuorum bool + }{ + {"quorum_true", "true", true}, + {"quorum_false", "false", false}, + {"quorum_empty", "", false}, + {"quorum_invalid", "invalid", false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := &PacemakerResult{ + Summary: Summary{ + Stack: Stack{PacemakerdState: "running"}, + CurrentDC: CurrentDC{WithQuorum: tt.withQuorum}, + }, + } + + summary, _, _, _, _ := buildStatusComponents(result) + require.Equal(t, tt.expectedQuorum, summary.HasQuorum) + }) + } +} + +func TestBuildStatusComponents_NodeStatusVariations(t *testing.T) { + tests := []struct { + name string + online string + standby string + expectedOnline bool + expectedStandby bool + }{ + {"online_normal", "true", "false", true, false}, + {"offline", "false", "false", false, false}, + {"online_standby", "true", "true", true, true}, + {"offline_standby", "false", "true", false, true}, + {"empty_values", "", "", false, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := &PacemakerResult{ + Summary: Summary{ + Stack: Stack{PacemakerdState: "running"}, + CurrentDC: CurrentDC{WithQuorum: "true"}, + }, + Nodes: Nodes{ + Node: []Node{ + { + Name: "test-node", + Online: tt.online, + Standby: tt.standby, + }, + }, + }, + } + + _, nodes, _, _, _ := buildStatusComponents(result) + require.Len(t, nodes, 1) + require.Equal(t, tt.expectedOnline, nodes[0].Online) + require.Equal(t, tt.expectedStandby, nodes[0].Standby) + }) + } +} + +func TestCollectPacemakerStatus_XMLSizeValidation(t *testing.T) { + // This test verifies that large XML is rejected + // In a real test, you would mock the exec.Execute function + // For now, we just verify the maxXMLSize constant is reasonable + require.Equal(t, 10*1024*1024, maxXMLSize, "Max XML size should be 10MB") +} + +func TestCollectPacemakerStatus_InvalidXMLHandling(t *testing.T) { + invalidXML := "" + var result PacemakerResult + err := xml.Unmarshal([]byte(invalidXML), &result) + + // Should fail to unmarshal but not panic + require.Error(t, err, "Should return error for invalid XML") +} + +func TestBuildStatusComponents_NodeIPExtraction(t *testing.T) { + result := &PacemakerResult{ + Summary: Summary{ + Stack: Stack{PacemakerdState: "running"}, + CurrentDC: CurrentDC{WithQuorum: "true"}, + }, + Nodes: Nodes{ + Node: []Node{ + {Name: "master-0", Online: "true"}, + }, + }, + NodeAttributes: NodeAttributes{ + Node: []NodeAttributeSet{ + { + Name: "master-0", + Attribute: []NodeAttribute{ + {Name: "node_ip", Value: "192.168.1.10"}, + {Name: "other_attr", Value: "value"}, + }, + }, + }, + }, + } + + // Just verify it doesn't panic when node_ip attributes are present + _, nodes, _, _, _ := buildStatusComponents(result) + require.Len(t, nodes, 1) + require.Equal(t, "master-0", nodes[0].Name) +} + +func TestBuildStatusComponents_ResourceAgentTypes(t *testing.T) { + result := &PacemakerResult{ + Summary: Summary{ + Stack: Stack{PacemakerdState: "running"}, + CurrentDC: CurrentDC{WithQuorum: "true"}, + }, + Resources: Resources{ + Resource: []Resource{ + { + ID: "kubelet-0", + ResourceAgent: "systemd:kubelet", + Role: "Started", + Active: "true", + }, + { + ID: "etcd-0", + ResourceAgent: "ocf:heartbeat:podman-etcd", + Role: "Started", + Active: "true", + }, + { + ID: "ip-0", + ResourceAgent: "ocf:heartbeat:IPaddr2", + Role: "Started", + Active: "true", + }, + }, + }, + } + + _, _, resources, _, _ := buildStatusComponents(result) + + require.Len(t, resources, 3) + + // Verify all resource agents are preserved + agents := make(map[string]bool) + for _, resource := range resources { + agents[resource.ResourceAgent] = true + } + + require.True(t, agents["systemd:kubelet"]) + require.True(t, agents["ocf:heartbeat:podman-etcd"]) + require.True(t, agents["ocf:heartbeat:IPaddr2"]) +} diff --git a/pkg/tnf/pkg/pacemaker/testdata/healthy_cluster.xml b/pkg/tnf/pkg/pacemaker/testdata/healthy_cluster.xml new file mode 100644 index 000000000..c1edce349 --- /dev/null +++ b/pkg/tnf/pkg/pacemaker/testdata/healthy_cluster.xml @@ -0,0 +1,87 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/pkg/tnf/pkg/pacemaker/testdata/offline_node.xml b/pkg/tnf/pkg/pacemaker/testdata/offline_node.xml new file mode 100644 index 000000000..6759614ee --- /dev/null +++ b/pkg/tnf/pkg/pacemaker/testdata/offline_node.xml @@ -0,0 +1,67 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/pkg/tnf/pkg/pacemaker/testdata/recent_failures_template.xml b/pkg/tnf/pkg/pacemaker/testdata/recent_failures_template.xml new file mode 100644 index 000000000..3538c2df3 --- /dev/null +++ b/pkg/tnf/pkg/pacemaker/testdata/recent_failures_template.xml @@ -0,0 +1,66 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/pkg/tnf/pkg/pacemaker/testdata/standby_node.xml b/pkg/tnf/pkg/pacemaker/testdata/standby_node.xml new file mode 100644 index 000000000..eba32d54c --- /dev/null +++ b/pkg/tnf/pkg/pacemaker/testdata/standby_node.xml @@ -0,0 +1,65 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/pkg/tnf/pkg/pacemaker/v1alpha1/doc.go b/pkg/tnf/pkg/pacemaker/v1alpha1/doc.go new file mode 100644 index 000000000..113ed5200 --- /dev/null +++ b/pkg/tnf/pkg/pacemaker/v1alpha1/doc.go @@ -0,0 +1,3 @@ +// Package v1alpha1 contains API Schema definitions for the pacemaker v1alpha1 API group +// +groupName=etcd.openshift.io +package v1alpha1 diff --git a/pkg/tnf/pkg/pacemaker/v1alpha1/register.go b/pkg/tnf/pkg/pacemaker/v1alpha1/register.go new file mode 100644 index 000000000..6caac21c6 --- /dev/null +++ b/pkg/tnf/pkg/pacemaker/v1alpha1/register.go @@ -0,0 +1,31 @@ +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +const ( + GroupName = "etcd.openshift.io" + Version = "v1alpha1" +) + +var ( + SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: Version} + SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) + AddToScheme = SchemeBuilder.AddToScheme +) + +func Resource(resource string) schema.GroupResource { + return SchemeGroupVersion.WithResource(resource).GroupResource() +} + +func addKnownTypes(scheme *runtime.Scheme) error { + scheme.AddKnownTypes(SchemeGroupVersion, + &PacemakerStatus{}, + &PacemakerStatusList{}, + ) + metav1.AddToGroupVersion(scheme, SchemeGroupVersion) + return nil +} diff --git a/pkg/tnf/pkg/pacemaker/v1alpha1/types.go b/pkg/tnf/pkg/pacemaker/v1alpha1/types.go new file mode 100644 index 000000000..93fcc970f --- /dev/null +++ b/pkg/tnf/pkg/pacemaker/v1alpha1/types.go @@ -0,0 +1,176 @@ +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +kubebuilder:object:root=true +// +kubebuilder:resource:path=pacemakerstatuses,scope=Cluster +// +kubebuilder:subresource:status + +// PacemakerStatus represents the current state of the Pacemaker cluster +// as reported by the pcs status command. +type PacemakerStatus struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec PacemakerStatusSpec `json:"spec,omitempty"` + Status PacemakerStatusStatus `json:"status,omitempty"` +} + +// PacemakerStatusSpec defines the desired state of PacemakerStatus. +// Note: This CR is status-only. The spec exists for Kubernetes API compliance +// but is not used. PacemakerStatus is a cluster-scoped singleton CR named "cluster" +// that contains status information for all nodes in the pacemaker cluster. +type PacemakerStatusSpec struct { + // Reserved for future use + // +optional + NodeName string `json:"nodeName,omitempty"` +} + +// PacemakerStatusStatus contains the actual pacemaker cluster status information +type PacemakerStatusStatus struct { + // LastUpdated is the timestamp when this status was last updated + LastUpdated metav1.Time `json:"lastUpdated,omitempty"` + + // RawXML contains the raw XML output from pcs status xml command + // Kept for debugging purposes only; healthcheck should not need to parse this + // +optional + RawXML string `json:"rawXML,omitempty"` + + // CollectionError contains any error encountered while collecting status + // +optional + CollectionError string `json:"collectionError,omitempty"` + + // Summary provides high-level counts and flags for the cluster state + // +optional + Summary PacemakerSummary `json:"summary,omitempty"` + + // Nodes provides detailed information about each node in the cluster + // +optional + Nodes []NodeStatus `json:"nodes,omitempty"` + + // Resources provides detailed information about each resource in the cluster + // +optional + Resources []ResourceStatus `json:"resources,omitempty"` + + // NodeHistory provides recent operation history for troubleshooting + // +optional + NodeHistory []NodeHistoryEntry `json:"nodeHistory,omitempty"` + + // FencingHistory provides recent fencing events + // +optional + FencingHistory []FencingEvent `json:"fencingHistory,omitempty"` +} + +// PacemakerSummary provides a high-level summary of cluster state +type PacemakerSummary struct { + // PacemakerdState indicates if pacemaker is running + PacemakerdState string `json:"pacemakerdState,omitempty"` + + // HasQuorum indicates if the cluster has quorum + HasQuorum bool `json:"hasQuorum,omitempty"` + + // NodesOnline is the count of online nodes + NodesOnline int `json:"nodesOnline,omitempty"` + + // NodesTotal is the total count of configured nodes + NodesTotal int `json:"nodesTotal,omitempty"` + + // ResourcesStarted is the count of started resources + ResourcesStarted int `json:"resourcesStarted,omitempty"` + + // ResourcesTotal is the total count of configured resources + ResourcesTotal int `json:"resourcesTotal,omitempty"` + + // RecentFailures indicates if there are recent operation failures + RecentFailures bool `json:"recentFailures,omitempty"` + + // RecentFencing indicates if there are recent fencing events + RecentFencing bool `json:"recentFencing,omitempty"` +} + +// NodeStatus represents the status of a single node in the Pacemaker cluster +type NodeStatus struct { + // Name is the name of the node + Name string `json:"name"` + + // Online indicates if the node is online + Online bool `json:"online"` + + // Standby indicates if the node is in standby mode + // +optional + Standby bool `json:"standby,omitempty"` +} + +// ResourceStatus represents the status of a single resource in the Pacemaker cluster +type ResourceStatus struct { + // Name is the name of the resource + Name string `json:"name"` + + // ResourceAgent is the resource agent type (e.g., "ocf:heartbeat:IPaddr2", "systemd:kubelet") + // +optional + ResourceAgent string `json:"resourceAgent,omitempty"` + + // Role is the current role of the resource (e.g., "Started", "Stopped") + // +optional + Role string `json:"role,omitempty"` + + // Active indicates if the resource is active + // +optional + Active bool `json:"active,omitempty"` + + // Node is the node where the resource is running + // +optional + Node string `json:"node,omitempty"` +} + +// NodeHistoryEntry represents a single operation history entry from node_history +type NodeHistoryEntry struct { + // Node is the node where the operation occurred + Node string `json:"node"` + + // Resource is the resource that was operated on + Resource string `json:"resource"` + + // Operation is the operation that was performed (e.g., "monitor", "start", "stop") + Operation string `json:"operation"` + + // RC is the return code from the operation + RC int `json:"rc"` + + // RCText is the human-readable return code text (e.g., "ok", "error", "not running") + // +optional + RCText string `json:"rcText,omitempty"` + + // LastRCChange is the timestamp when the RC last changed + // +optional + LastRCChange metav1.Time `json:"lastRCChange,omitempty"` +} + +// FencingEvent represents a single fencing event from fence history +type FencingEvent struct { + // Target is the node that was fenced + Target string `json:"target"` + + // Action is the fencing action performed (e.g., "reboot", "off", "on") + Action string `json:"action"` + + // Status is the status of the fencing operation (e.g., "success", "failed") + Status string `json:"status"` + + // Completed is the timestamp when the fencing completed + // +optional + Completed metav1.Time `json:"completed,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// PacemakerStatusList contains a list of PacemakerStatus +type PacemakerStatusList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []PacemakerStatus `json:"items"` +} diff --git a/pkg/tnf/pkg/pacemaker/v1alpha1/zz_generated.deepcopy.go b/pkg/tnf/pkg/pacemaker/v1alpha1/zz_generated.deepcopy.go new file mode 100644 index 000000000..33348df34 --- /dev/null +++ b/pkg/tnf/pkg/pacemaker/v1alpha1/zz_generated.deepcopy.go @@ -0,0 +1,211 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PacemakerStatus) DeepCopyInto(out *PacemakerStatus) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PacemakerStatus. +func (in *PacemakerStatus) DeepCopy() *PacemakerStatus { + if in == nil { + return nil + } + out := new(PacemakerStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *PacemakerStatus) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PacemakerStatusList) DeepCopyInto(out *PacemakerStatusList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]PacemakerStatus, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PacemakerStatusList. +func (in *PacemakerStatusList) DeepCopy() *PacemakerStatusList { + if in == nil { + return nil + } + out := new(PacemakerStatusList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *PacemakerStatusList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PacemakerStatusSpec) DeepCopyInto(out *PacemakerStatusSpec) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PacemakerStatusSpec. +func (in *PacemakerStatusSpec) DeepCopy() *PacemakerStatusSpec { + if in == nil { + return nil + } + out := new(PacemakerStatusSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PacemakerStatusStatus) DeepCopyInto(out *PacemakerStatusStatus) { + *out = *in + in.LastUpdated.DeepCopyInto(&out.LastUpdated) + out.Summary = in.Summary + if in.Nodes != nil { + in, out := &in.Nodes, &out.Nodes + *out = make([]NodeStatus, len(*in)) + copy(*out, *in) + } + if in.Resources != nil { + in, out := &in.Resources, &out.Resources + *out = make([]ResourceStatus, len(*in)) + copy(*out, *in) + } + if in.NodeHistory != nil { + in, out := &in.NodeHistory, &out.NodeHistory + *out = make([]NodeHistoryEntry, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.FencingHistory != nil { + in, out := &in.FencingHistory, &out.FencingHistory + *out = make([]FencingEvent, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PacemakerStatusStatus. +func (in *PacemakerStatusStatus) DeepCopy() *PacemakerStatusStatus { + if in == nil { + return nil + } + out := new(PacemakerStatusStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PacemakerSummary) DeepCopyInto(out *PacemakerSummary) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PacemakerSummary. +func (in *PacemakerSummary) DeepCopy() *PacemakerSummary { + if in == nil { + return nil + } + out := new(PacemakerSummary) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NodeStatus) DeepCopyInto(out *NodeStatus) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeStatus. +func (in *NodeStatus) DeepCopy() *NodeStatus { + if in == nil { + return nil + } + out := new(NodeStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ResourceStatus) DeepCopyInto(out *ResourceStatus) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourceStatus. +func (in *ResourceStatus) DeepCopy() *ResourceStatus { + if in == nil { + return nil + } + out := new(ResourceStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NodeHistoryEntry) DeepCopyInto(out *NodeHistoryEntry) { + *out = *in + in.LastRCChange.DeepCopyInto(&out.LastRCChange) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeHistoryEntry. +func (in *NodeHistoryEntry) DeepCopy() *NodeHistoryEntry { + if in == nil { + return nil + } + out := new(NodeHistoryEntry) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FencingEvent) DeepCopyInto(out *FencingEvent) { + *out = *in + in.Completed.DeepCopyInto(&out.Completed) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FencingEvent. +func (in *FencingEvent) DeepCopy() *FencingEvent { + if in == nil { + return nil + } + out := new(FencingEvent) + in.DeepCopyInto(out) + return out +}