diff --git a/cmd/tnf-setup-runner/main.go b/cmd/tnf-setup-runner/main.go index 8452dad92..9a8745917 100644 --- a/cmd/tnf-setup-runner/main.go +++ b/cmd/tnf-setup-runner/main.go @@ -17,6 +17,7 @@ import ( tnfaftersetup "github.com/openshift/cluster-etcd-operator/pkg/tnf/after-setup" tnfauth "github.com/openshift/cluster-etcd-operator/pkg/tnf/auth" + disruptivevalidate "github.com/openshift/cluster-etcd-operator/pkg/tnf/disruptivevalidate" tnffencing "github.com/openshift/cluster-etcd-operator/pkg/tnf/fencing" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools" tnfsetup "github.com/openshift/cluster-etcd-operator/pkg/tnf/setup" @@ -58,6 +59,7 @@ func NewTnfSetupRunnerCommand() *cobra.Command { cmd.AddCommand(NewSetupCommand()) cmd.AddCommand(NewAfterSetupCommand()) cmd.AddCommand(NewFencingCommand()) + cmd.AddCommand(NewDisruptiveValidateCommand()) return cmd } @@ -113,3 +115,15 @@ func NewFencingCommand() *cobra.Command { }, } } + +func NewDisruptiveValidateCommand() *cobra.Command { + return &cobra.Command{ + Use: tools.JobTypeDisruptiveValidate.GetSubCommand(), + Short: "Run disruptive peer validation from this node", + Run: func(cmd *cobra.Command, args []string) { + if err := disruptivevalidate.RunDisruptiveValidate(); err != nil { + klog.Fatal(err) + } + }, + } +} diff --git a/pkg/tnf/disruptivevalidate/runner.go b/pkg/tnf/disruptivevalidate/runner.go new file mode 100644 index 000000000..01c8c1633 --- /dev/null +++ b/pkg/tnf/disruptivevalidate/runner.go @@ -0,0 +1,304 @@ +package disruptivevalidate + +import ( + "context" + "encoding/json" + "fmt" + "os" + "strings" + "time" + + batchv1 "k8s.io/api/batch/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + wait "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/server" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" + + "github.com/openshift/cluster-etcd-operator/pkg/operator/operatorclient" + "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/config" + "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/exec" + "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools" + apierrors "k8s.io/apimachinery/pkg/api/errors" +) + +const ( + poll = 3 * time.Second + timeoutSetup = tools.SetupJobCompletedTimeout + timeoutFencing = tools.FencingJobCompletedTimeout + timeoutAfter = tools.SetupJobCompletedTimeout + timeoutPeerJob = 45 * time.Minute + timeoutPCSUp = 2 * time.Minute + timeoutPCSDown = 5 * time.Minute + timeoutPCSBackUp = 10 * time.Minute + timeoutEtcdOK = 10 * time.Minute + jobNamePrefix = "tnf-disruptive-validate-job-" +) + +func RunDisruptiveValidate() error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { <-server.SetupSignalHandler(); cancel() }() + + cfg, err := rest.InClusterConfig() + if err != nil { + return err + } + kc, err := kubernetes.NewForConfig(cfg) + if err != nil { + return err + } + + // 1) Wait orchestrated jobs + if err := waitForLabeledJob(ctx, kc, tools.JobTypeSetup.GetNameLabelValue(), 1, timeoutSetup); err != nil { + return fmt.Errorf("setup not complete: %w", err) + } + + clusterCfg, err := config.GetClusterConfig(ctx, kc) + if err != nil { + return err + } + local, peer, err := detectLocalAndPeer(clusterCfg.NodeName1, clusterCfg.NodeName2) + if err != nil { + return err + } + klog.Infof("validate: local=%s peer=%s", local, peer) + + if err := waitForLabeledJob(ctx, kc, tools.JobTypeFencing.GetNameLabelValue(), 1, timeoutFencing); err != nil { + return fmt.Errorf("fencing not complete: %w", err) + } + if err := waitForJobName(ctx, kc, tools.JobTypeAfterSetup.GetJobName(&clusterCfg.NodeName1), timeoutAfter); err != nil { + return fmt.Errorf("after-setup not complete on %s: %w", clusterCfg.NodeName1, err) + } + if err := waitForJobName(ctx, kc, tools.JobTypeAfterSetup.GetJobName(&clusterCfg.NodeName2), timeoutAfter); err != nil { + return fmt.Errorf("after-setup not complete on %s: %w", clusterCfg.NodeName2, err) + } + + // 2) Lexicographic sequencing + if err := waitPeerValidateIfSecond(ctx, kc, local, clusterCfg.NodeName1, clusterCfg.NodeName2); err != nil { + return err + } + + // Pre-fence health + if err := waitEtcdTwoVoters(ctx, timeoutEtcdOK); err != nil { + return fmt.Errorf("pre-fence etcd not healthy: %w", err) + } + + // 4) PCS preflight + if _, _, err := exec.Execute(ctx, `command -v pcs`); err != nil { + return fmt.Errorf("pcs not found: %w", err) + } + if _, _, err := exec.Execute(ctx, `systemctl is-active pacemaker`); err != nil { + return fmt.Errorf("pacemaker not active: %w", err) + } + if err := waitPCSState(ctx, boolp(true), peer, timeoutPCSUp); err != nil { + return fmt.Errorf("peer %q not ONLINE pre-fence: %w", peer, err) + } + + // 5) Fence → OFFLINE → ONLINE → etcd healthy + out, _, ferr := exec.Execute(ctx, fmt.Sprintf(`/usr/sbin/pcs stonith fence %s`, peer)) + if ferr != nil { + ls := out + if i := strings.LastIndex(ls, "\n"); i >= 0 && i+1 < len(ls) { + ls = ls[i+1:] + } + return fmt.Errorf("pcs fence %s failed: %w (last line: %s)", peer, ferr, strings.TrimSpace(ls)) + } + if err := waitPCSState(ctx, boolp(false), peer, timeoutPCSDown); err != nil { + return fmt.Errorf("peer didn't go OFFLINE: %w", err) + } + if err := waitPCSState(ctx, boolp(true), peer, timeoutPCSBackUp); err != nil { + return fmt.Errorf("peer didn't return ONLINE: %w", err) + } + if err := waitEtcdTwoVoters(ctx, timeoutEtcdOK); err != nil { + return fmt.Errorf("post-fence etcd not healthy: %w", err) + } + + klog.Infof("validate: success local=%s peer=%s", local, peer) + return nil +} + +// helpers +func waitForJobs( + ctx context.Context, + kc kubernetes.Interface, + byName string, + labelSelector string, + wantAtLeast int, + to time.Duration, + allowNeverSeenTTL bool, // NEW +) error { + const appearanceGrace = 2 * time.Minute + start := time.Now() + seen := false + + return wait.PollUntilContextTimeout(ctx, poll, to, true, func(context.Context) (bool, error) { + if byName != "" { + j, err := kc.BatchV1().Jobs(operatorclient.TargetNamespace).Get(ctx, byName, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + if seen { + klog.V(2).Infof("job %s disappeared after observation; assuming TTL after completion", byName) + return true, nil + } + if allowNeverSeenTTL && time.Since(start) > appearanceGrace { + klog.V(2).Infof("job %s not found for %s; assuming completed earlier and TTL-deleted", byName, appearanceGrace) + return true, nil + } + return false, nil + } + if err != nil { + return false, nil // transient + } + + seen = true + + if tools.IsConditionTrue(j.Status.Conditions, batchv1.JobFailed) { + return false, fmt.Errorf("job %s failed", byName) + } + return j.Status.Succeeded > 0 || tools.IsConditionTrue(j.Status.Conditions, batchv1.JobComplete), nil + } + + // selector path (aggregate waits) + jl, err := kc.BatchV1().Jobs(operatorclient.TargetNamespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) + if err != nil { + return false, nil // transient + } + if len(jl.Items) < wantAtLeast { + return false, nil + } + completed := 0 + for i := range jl.Items { + j := &jl.Items[i] + if tools.IsConditionTrue(j.Status.Conditions, batchv1.JobFailed) { + return false, fmt.Errorf("job %s failed", j.Name) + } + if j.Status.Succeeded > 0 || tools.IsConditionTrue(j.Status.Conditions, batchv1.JobComplete) { + completed++ + } + } + return completed >= wantAtLeast, nil + }) +} + +func waitForLabeledJob(ctx context.Context, kc kubernetes.Interface, nameLabel string, wantAtLeast int, to time.Duration) error { + return waitForJobs(ctx, kc, + "", // byName + fmt.Sprintf("app.kubernetes.io/name=%s", nameLabel), + wantAtLeast, to, false) // strict +} + +func waitForJobName(ctx context.Context, kc kubernetes.Interface, name string, to time.Duration) error { + return waitForJobs(ctx, kc, + name, // byName + "", // labelSelector + 1, to, false) // strict +} + +func waitForJobNamePeerTTL(ctx context.Context, kc kubernetes.Interface, name string, to time.Duration) error { + return waitForJobs(ctx, kc, name, "", 1, to, true) // allowNeverSeenTTL = true +} + +func detectLocalAndPeer(n1, n2 string) (string, string, error) { + podName, err := os.Hostname() + if err != nil { + return "", "", fmt.Errorf("get pod hostname: %w", err) + } + podName = strings.TrimSpace(podName) + if podName == "" { + return "", "", fmt.Errorf("get pod hostname: empty string") + } + // "-" -> "" + i := strings.LastIndex(podName, "-") + if i <= 0 { + return "", "", fmt.Errorf("cannot derive job name from pod %q", podName) + } + jobName := podName[:i] + + if !strings.HasPrefix(jobName, jobNamePrefix) { + return "", "", fmt.Errorf("unexpected job name %q (want prefix %q)", jobName, jobNamePrefix) + } + local := strings.TrimPrefix(jobName, jobNamePrefix) + + switch local { + case n1: + return local, n2, nil + case n2: + return local, n1, nil + default: + return "", "", fmt.Errorf("local %q not in cluster config (%q,%q)", local, n1, n2) + } +} + +func waitPeerValidateIfSecond(ctx context.Context, kc kubernetes.Interface, local, a, b string) error { + min, max := a, b + if strings.Compare(min, max) > 0 { + min, max = max, min + } + if local != max { + return nil + } + + target := tools.JobTypeDisruptiveValidate.GetJobName(&min) + if err := waitForJobNamePeerTTL(ctx, kc, target, timeoutPeerJob); err != nil { + return fmt.Errorf("peer validate job %s not complete: %w", min, err) + } + return nil +} + +func waitEtcdTwoVoters(ctx context.Context, to time.Duration) error { + return wait.PollUntilContextTimeout(ctx, poll, to, true, func(context.Context) (bool, error) { + out, _, err := exec.Execute(ctx, `podman exec etcd sh -lc 'ETCDCTL_API=3 etcdctl member list -w json'`) + out = strings.TrimSpace(out) + if err != nil || len(out) < 2 { + return false, nil + } + var ml struct { + Members []struct { + IsLearner bool `json:"isLearner"` + } `json:"members"` + } + if json.Unmarshal([]byte(out), &ml) != nil { + return false, nil + } + total, voters := 0, 0 + for _, m := range ml.Members { + total++ + if !m.IsLearner { + voters++ + } + } + return total == 2 && voters == 2, nil + }) +} + +func waitPCSState(ctx context.Context, want *bool, peer string, to time.Duration) error { + return wait.PollUntilContextTimeout(ctx, poll, to, true, func(context.Context) (bool, error) { + out, _, err := exec.Execute(ctx, `LC_ALL=C /usr/sbin/pcs status nodes`) + if err != nil { + return false, nil // treat as transient + } + + // Find the "Online:" line and build a set of names listed there. + var onlineLine string + for _, ln := range strings.Split(out, "\n") { + s := strings.TrimSpace(ln) + if strings.HasPrefix(s, "Online:") { + onlineLine = strings.TrimSpace(strings.TrimPrefix(s, "Online:")) + break + } + } + peerOnline := false + if onlineLine != "" { + for _, tok := range strings.Fields(onlineLine) { + if strings.Trim(tok, "[],") == peer { + peerOnline = true + break + } + } + } + return peerOnline == *want, nil + }) +} + +func boolp(b bool) *bool { return &b } diff --git a/pkg/tnf/operator/starter.go b/pkg/tnf/operator/starter.go index dcd35fdc9..d659b42a9 100644 --- a/pkg/tnf/operator/starter.go +++ b/pkg/tnf/operator/starter.go @@ -4,10 +4,11 @@ import ( "bytes" "context" "fmt" - operatorv1informers "github.com/openshift/client-go/operator/informers/externalversions/operator/v1" "os" "sync" + operatorv1informers "github.com/openshift/client-go/operator/informers/externalversions/operator/v1" + operatorv1 "github.com/openshift/api/operator/v1" configv1informers "github.com/openshift/client-go/config/informers/externalversions" v1 "github.com/openshift/client-go/config/informers/externalversions/config/v1" @@ -59,8 +60,8 @@ func HandleDualReplicaClusters(ctx context.Context, controlPlaneNodeInformer cache.SharedIndexInformer, etcdInformer operatorv1informers.EtcdInformer, kubeClient kubernetes.Interface, - dynamicClient dynamic.Interface) (bool, error) { - + dynamicClient dynamic.Interface, +) (bool, error) { if isDualReplicaTopology, err := isDualReplicaTopoly(ctx, featureGateAccessor, configInformers); err != nil { return false, err } else if !isDualReplicaTopology { @@ -104,6 +105,7 @@ func HandleDualReplicaClusters(ctx context.Context, for _, node := range nodeList { runJobController(ctx, tools.JobTypeAuth, &node.Name, controllerContext, operatorClient, kubeClient, kubeInformersForNamespaces) runJobController(ctx, tools.JobTypeAfterSetup, &node.Name, controllerContext, operatorClient, kubeClient, kubeInformersForNamespaces) + runJobController(ctx, tools.JobTypeDisruptiveValidate, &node.Name, controllerContext, operatorClient, kubeClient, kubeInformersForNamespaces) } runJobController(ctx, tools.JobTypeSetup, nil, controllerContext, operatorClient, kubeClient, kubeInformersForNamespaces) runJobController(ctx, tools.JobTypeFencing, nil, controllerContext, operatorClient, kubeClient, kubeInformersForNamespaces) @@ -162,8 +164,8 @@ func runExternalEtcdSupportController(ctx context.Context, networkInformer v1.NetworkInformer, controlPlaneNodeInformer cache.SharedIndexInformer, etcdInformer operatorv1informers.EtcdInformer, - kubeClient kubernetes.Interface) { - + kubeClient kubernetes.Interface, +) { klog.Infof("starting external etcd support controller") externalEtcdSupportController := externaletcdsupportcontroller.NewExternalEtcdEnablerController( operatorClient, @@ -225,7 +227,8 @@ func runJobController(ctx context.Context, jobType tools.JobType, nodeName *stri job.Spec.Template.Spec.Containers[0].Image = os.Getenv("OPERATOR_IMAGE") job.Spec.Template.Spec.Containers[0].Command[1] = jobType.GetSubCommand() return nil - }}..., + }, + }..., ) go tnfJobController.Run(ctx, 1) } diff --git a/pkg/tnf/pkg/pcs/fencing.go b/pkg/tnf/pkg/pcs/fencing.go index 136e1ee22..eebd2a32e 100644 --- a/pkg/tnf/pkg/pcs/fencing.go +++ b/pkg/tnf/pkg/pcs/fencing.go @@ -15,9 +15,7 @@ import ( "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools" ) -var ( - addressRegEx = regexp.MustCompile(`.*//(.*):(.*)(/redfish.*)`) -) +var addressRegEx = regexp.MustCompile(`.*//(.*):(.*)(/redfish.*)`) const ( // defaultPcmkDelayBase is the delay applied to the first fence device to prevent simultaneous fencing @@ -117,11 +115,9 @@ func ConfigureFencing(ctx context.Context, kubeClient kubernetes.Interface, cfg klog.Info("Fencing configuration succeeded!") return nil - } func getFencingConfig(nodeName string, secret *corev1.Secret) (*fencingConfig, error) { - address := string(secret.Data["address"]) if !strings.Contains(address, "redfish") { klog.Errorf("Secret %s does not contain redfish address", secret.Name) @@ -184,7 +180,6 @@ func getStatusCommand(fc fencingConfig) string { } func getStonithCommand(sc StonithConfig, fc fencingConfig) string { - stonithAction := "create" // check if device already exists for _, p := range sc.Primitives { diff --git a/pkg/tnf/pkg/tools/jobs.go b/pkg/tnf/pkg/tools/jobs.go index 793c51bc9..d2f1cb9a7 100644 --- a/pkg/tnf/pkg/tools/jobs.go +++ b/pkg/tnf/pkg/tools/jobs.go @@ -26,6 +26,7 @@ const ( JobTypeSetup JobTypeAfterSetup JobTypeFencing + JobTypeDisruptiveValidate ) func (t JobType) GetSubCommand() string { @@ -38,6 +39,8 @@ func (t JobType) GetSubCommand() string { return "after-setup" case JobTypeFencing: return "fencing" + case JobTypeDisruptiveValidate: + return "disruptive-validate" default: return "" } diff --git a/pkg/tnf/setup/runner.go b/pkg/tnf/setup/runner.go index c3264c2ca..ed04d5916 100644 --- a/pkg/tnf/setup/runner.go +++ b/pkg/tnf/setup/runner.go @@ -21,7 +21,6 @@ import ( ) func RunTnfSetup() error { - klog.Info("Setting up clients etc. for TNF setup") clientConfig, err := rest.InClusterConfig()