From 530c52138b6aa99fb150c678c2ab7d32c1365498 Mon Sep 17 00:00:00 2001 From: nhamza Date: Wed, 17 Sep 2025 16:05:31 +0300 Subject: [PATCH 01/14] add fencing disruptive fencing validation Signed-off-by: nhamza --- pkg/tnf/pkg/pcs/fencing.go | 158 +++++++++++++++++++++++++++++++++++-- pkg/tnf/setup/runner.go | 68 ++++++++++++++-- 2 files changed, 213 insertions(+), 13 deletions(-) diff --git a/pkg/tnf/pkg/pcs/fencing.go b/pkg/tnf/pkg/pcs/fencing.go index 136e1ee22..e21ce7fff 100644 --- a/pkg/tnf/pkg/pcs/fencing.go +++ b/pkg/tnf/pkg/pcs/fencing.go @@ -5,19 +5,19 @@ import ( "fmt" "regexp" "strings" + "time" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + "github.com/openshift/cluster-etcd-operator/pkg/etcdcli" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/config" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/exec" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools" ) -var ( - addressRegEx = regexp.MustCompile(`.*//(.*):(.*)(/redfish.*)`) -) +var addressRegEx = regexp.MustCompile(`.*//(.*):(.*)(/redfish.*)`) const ( // defaultPcmkDelayBase is the delay applied to the first fence device to prevent simultaneous fencing @@ -117,11 +117,9 @@ func ConfigureFencing(ctx context.Context, kubeClient kubernetes.Interface, cfg klog.Info("Fencing configuration succeeded!") return nil - } func getFencingConfig(nodeName string, secret *corev1.Secret) (*fencingConfig, error) { - address := string(secret.Data["address"]) if !strings.Contains(address, "redfish") { klog.Errorf("Secret %s does not contain redfish address", secret.Name) @@ -184,7 +182,6 @@ func getStatusCommand(fc fencingConfig) string { } func getStonithCommand(sc StonithConfig, fc fencingConfig) string { - stonithAction := "create" // check if device already exists for _, p := range sc.Primitives { @@ -294,3 +291,152 @@ func deleteObsoleteStonithDevices(ctx context.Context, stonithConfig StonithConf } return nil } + +func peerOf(cfg config.ClusterConfig, local string) (string, error) { + switch local { + case cfg.NodeName1: + return cfg.NodeName2, nil + case cfg.NodeName2: + return cfg.NodeName1, nil + default: + return "", fmt.Errorf("local node %q not in cluster config (%q, %q)", local, cfg.NodeName1, cfg.NodeName2) + } +} + +func localHostname(ctx context.Context) (string, error) { + out, _, err := exec.Execute(ctx, "hostname -f || hostname") + if err != nil { + return "", fmt.Errorf("hostname query failed: %v", err) + } + return strings.TrimSpace(out), nil +} + +func pcsNodesOnline(ctx context.Context) (string, error) { + out, _, err := exec.Execute(ctx, "/usr/sbin/pcs status nodes 2>/dev/null || /usr/sbin/crm_mon -1 2>/dev/null || true") + return out, err +} + +func isOnline(output, name string) bool { + short := name + if i := strings.IndexByte(name, '.'); i > 0 { + short = name[:i] + } + return strings.Contains(output, name) || strings.Contains(output, short) +} + +func waitPacemakerOffline(ctx context.Context, name string, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + out, pcsErr := pcsNodesOnline(ctx) + if pcsErr != nil { + return fmt.Errorf("pcs status failed while waiting OFFLINE: %w", pcsErr) + } + if !isOnline(out, name) { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(5 * time.Second): + } + } + return fmt.Errorf("timeout waiting for %s to go OFFLINE (pcs)", name) +} + +func waitPacemakerOnline(ctx context.Context, name string, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + out, pcsErr := pcsNodesOnline(ctx) + if pcsErr != nil { + return fmt.Errorf("pcs status failed while waiting ONLINE: %w", pcsErr) + } + if isOnline(out, name) { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(5 * time.Second): + } + } + return fmt.Errorf("timeout waiting for %s to become ONLINE (pcs)", name) +} + +func waitEtcdHealthy(ctx context.Context, timeout time.Duration, ec etcdcli.EtcdClient) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + ok, fatal := etcdTwoStarted(ctx, ec) + if fatal != nil { + return fmt.Errorf("etcdctl check failed: %v", fatal) + } + if ok { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(5 * time.Second): + } + } + return fmt.Errorf("timeout waiting for etcd to report 2 started non-learner voters") +} + +func etcdTwoStarted(ctx context.Context, ec etcdcli.EtcdClient) (bool, error) { + members, err := ec.VotingMemberList(ctx) + if err != nil { + return false, fmt.Errorf("list voters: %w", err) + } + if len(members) < 2 { + return false, nil + } + healthy := 0 + for _, m := range members { + ok, err := ec.IsMemberHealthy(ctx, m) + if err != nil { + return false, fmt.Errorf("member %s health: %w", m.Name, err) + } + if ok { + healthy++ + } + } + return healthy >= 2, nil +} + +func ValidateFencingPeerOnly(ctx context.Context, cfg config.ClusterConfig, ec etcdcli.EtcdClient) error { + klog.Info("Validating Fencing (disruptive, peer-only)") + + local, err := localHostname(ctx) + if err != nil { + return err + } + target, err := peerOf(cfg, local) + if err != nil { + return err + } + + klog.Infof("Fencing peer node %q from local %q", target, local) + + out, _ := pcsNodesOnline(ctx) + if !isOnline(out, target) { + return fmt.Errorf("peer %q is not ONLINE before fencing", target) + } + + cmd := fmt.Sprintf("/usr/sbin/pcs stonith fence %q --wait=300", target) + _, stdErr, fenceErr := exec.Execute(ctx, cmd) + if fenceErr != nil { + klog.Error(fenceErr, "pcs stonith fence failed", "stderr", stdErr) + return fmt.Errorf("pcs stonith fence %q failed: %w", target, fenceErr) + } + if err := waitPacemakerOffline(ctx, target, 10*time.Minute); err != nil { + return err + } + if err := waitPacemakerOnline(ctx, target, 15*time.Minute); err != nil { + return err + } + if err := waitEtcdHealthy(ctx, 10*time.Minute, ec); err != nil { + return err + } + + klog.Infof(" peer-only fencing validation passed for peer %q", target) + return nil +} diff --git a/pkg/tnf/setup/runner.go b/pkg/tnf/setup/runner.go index c3264c2ca..31218ad99 100644 --- a/pkg/tnf/setup/runner.go +++ b/pkg/tnf/setup/runner.go @@ -3,8 +3,11 @@ package setup import ( "context" "fmt" + "os" "time" + configclient "github.com/openshift/client-go/config/clientset/versioned" + configv1informers "github.com/openshift/client-go/config/informers/externalversions" operatorversionedclient "github.com/openshift/client-go/operator/clientset/versioned" batchv1 "k8s.io/api/batch/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -12,18 +15,31 @@ import ( "k8s.io/apiserver/pkg/server" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + utilclock "k8s.io/utils/clock" + "github.com/openshift/cluster-etcd-operator/pkg/etcdcli" + "github.com/openshift/cluster-etcd-operator/pkg/operator/operatorclient" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/config" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/etcd" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/pcs" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools" + "github.com/openshift/library-go/pkg/operator/events" + "github.com/openshift/library-go/pkg/operator/v1helpers" ) func RunTnfSetup() error { - klog.Info("Setting up clients etc. for TNF setup") + ctx, cancel := context.WithCancel(context.Background()) + shutdownHandler := server.SetupSignalHandler() + go func() { + defer cancel() + <-shutdownHandler + klog.Info("Received SIGTERM or SIGINT signal, terminating") + }() + clientConfig, err := rest.InClusterConfig() if err != nil { return err @@ -39,18 +55,51 @@ func RunTnfSetup() error { return err } - operatorConfigClient, err := operatorversionedclient.NewForConfig(clientConfig) + // CRD clients + configClient, err := configclient.NewForConfig(clientConfig) if err != nil { return err } + kubeInformers := v1helpers.NewKubeInformersForNamespaces( + kubeClient, + operatorclient.TargetNamespace, + "", + ) + nodes := kubeInformers.InformersFor("").Core().V1().Nodes() - ctx, cancel := context.WithCancel(context.Background()) - shutdownHandler := server.SetupSignalHandler() + cfgInformers := configv1informers.NewSharedInformerFactory(configClient, 0) + networkInformer := cfgInformers.Config().V1().Networks() + + recorder := events.NewInMemoryRecorder("tnf-fencing", utilclock.RealClock{}) + + stopCh := make(chan struct{}) go func() { - defer cancel() - <-shutdownHandler - klog.Info("Received SIGTERM or SIGINT signal, terminating") + <-ctx.Done() + close(stopCh) }() + kubeInformers.Start(stopCh) + cfgInformers.Start(stopCh) + + if ok := cache.WaitForCacheSync( + ctx.Done(), + nodes.Informer().HasSynced, + networkInformer.Informer().HasSynced, + ); !ok { + return fmt.Errorf("failed to sync informers for etcd client") + } + + ec := etcdcli.NewEtcdClient( + kubeInformers, + nodes.Informer(), + nodes.Lister(), + networkInformer, + recorder, + ) + + operatorConfigClient, err := operatorversionedclient.NewForConfig(clientConfig) + if err != nil { + return err + } klog.Info("Waiting for completed auth jobs") authDone := func(context.Context) (done bool, err error) { @@ -131,6 +180,11 @@ func RunTnfSetup() error { return err } + if os.Getenv("TNF_VALIDATE_PEER_ONLY") == "true" { + if err := pcs.ValidateFencingPeerOnly(ctx, cfg, ec); err != nil { + return fmt.Errorf("peer-only disruptive validation failed: %w", err) + } + } klog.Infof("HA setup done! CIB:\n%s", cib) return nil From cd1364c28c2bf7f1eac88d707bc04d73c8ea9d86 Mon Sep 17 00:00:00 2001 From: nhamza Date: Thu, 18 Sep 2025 13:23:09 +0300 Subject: [PATCH 02/14] new fencing validation logic - two new jobs Signed-off-by: nhamza --- cmd/tnf-setup-runner/main.go | 14 +++ pkg/tnf/disruptivevalidate/runner.go | 168 +++++++++++++++++++++++++++ pkg/tnf/operator/starter.go | 15 ++- pkg/tnf/pkg/pcs/fencing.go | 151 ------------------------ pkg/tnf/pkg/tools/jobs.go | 3 + pkg/tnf/setup/runner.go | 67 +---------- 6 files changed, 200 insertions(+), 218 deletions(-) create mode 100644 pkg/tnf/disruptivevalidate/runner.go diff --git a/cmd/tnf-setup-runner/main.go b/cmd/tnf-setup-runner/main.go index 8452dad92..9a8745917 100644 --- a/cmd/tnf-setup-runner/main.go +++ b/cmd/tnf-setup-runner/main.go @@ -17,6 +17,7 @@ import ( tnfaftersetup "github.com/openshift/cluster-etcd-operator/pkg/tnf/after-setup" tnfauth "github.com/openshift/cluster-etcd-operator/pkg/tnf/auth" + disruptivevalidate "github.com/openshift/cluster-etcd-operator/pkg/tnf/disruptivevalidate" tnffencing "github.com/openshift/cluster-etcd-operator/pkg/tnf/fencing" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools" tnfsetup "github.com/openshift/cluster-etcd-operator/pkg/tnf/setup" @@ -58,6 +59,7 @@ func NewTnfSetupRunnerCommand() *cobra.Command { cmd.AddCommand(NewSetupCommand()) cmd.AddCommand(NewAfterSetupCommand()) cmd.AddCommand(NewFencingCommand()) + cmd.AddCommand(NewDisruptiveValidateCommand()) return cmd } @@ -113,3 +115,15 @@ func NewFencingCommand() *cobra.Command { }, } } + +func NewDisruptiveValidateCommand() *cobra.Command { + return &cobra.Command{ + Use: tools.JobTypeDisruptiveValidate.GetSubCommand(), + Short: "Run disruptive peer validation from this node", + Run: func(cmd *cobra.Command, args []string) { + if err := disruptivevalidate.RunDisruptiveValidate(); err != nil { + klog.Fatal(err) + } + }, + } +} diff --git a/pkg/tnf/disruptivevalidate/runner.go b/pkg/tnf/disruptivevalidate/runner.go new file mode 100644 index 000000000..33c0b4887 --- /dev/null +++ b/pkg/tnf/disruptivevalidate/runner.go @@ -0,0 +1,168 @@ +package fencingvalidate + +import ( + "context" + "fmt" + "regexp" + "strings" + "time" + + batchv1 "k8s.io/api/batch/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/server" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" + + "github.com/openshift/cluster-etcd-operator/pkg/operator/operatorclient" + "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/config" + "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/exec" + "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools" +) + +func RunDisruptiveValidate() error { + klog.Info("Setting up clients for TNF validate job") + + ctx, cancel := context.WithCancel(context.Background()) + shutdown := server.SetupSignalHandler() + go func() { + defer cancel() + <-shutdown + klog.Info("Received termination signal, exiting validate job") + }() + + // kube client + clientConfig, err := rest.InClusterConfig() + if err != nil { + return err + } + kubeClient, err := kubernetes.NewForConfig(clientConfig) + if err != nil { + return err + } + + // Wait for SETUP (cluster-wide) to complete + klog.Info("Waiting for completed setup job before validation") + setupDone := func(context.Context) (bool, error) { + jobs, err := kubeClient.BatchV1().Jobs(operatorclient.TargetNamespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("app.kubernetes.io/name=%s", tools.JobTypeSetup.GetNameLabelValue()), + }) + if err != nil || len(jobs.Items) != 1 { + klog.Warningf("setup job not ready yet, err=%v count=%d", err, len(jobs.Items)) + return false, nil + } + if !tools.IsConditionTrue(jobs.Items[0].Status.Conditions, batchv1.JobComplete) { + return false, nil + } + return true, nil + } + _ = wait.PollUntilContextTimeout(ctx, tools.JobPollIntervall, tools.SetupJobCompletedTimeout, true, setupDone) + + // NEW: wait for FENCING (cluster-wide) to complete + klog.Info("Waiting for completed fencing job before validation") + fencingDone := func(context.Context) (bool, error) { + jobs, err := kubeClient.BatchV1().Jobs(operatorclient.TargetNamespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("app.kubernetes.io/name=%s", tools.JobTypeFencing.GetNameLabelValue()), + }) + if err != nil || len(jobs.Items) != 1 { + klog.Warningf("fencing job not ready yet, err=%v count=%d", err, len(jobs.Items)) + return false, nil + } + if !tools.IsConditionTrue(jobs.Items[0].Status.Conditions, batchv1.JobComplete) { + return false, nil + } + return true, nil + } + _ = wait.PollUntilContextTimeout(ctx, tools.JobPollIntervall, tools.FencingJobCompletedTimeout, true, fencingDone) + + // NEW: wait for BOTH AFTER-SETUP (per-node) jobs to complete + klog.Info("Waiting for completed after-setup jobs before validation") + afterSetupDone := func(context.Context) (bool, error) { + jobs, err := kubeClient.BatchV1().Jobs(operatorclient.TargetNamespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("app.kubernetes.io/name=%s", tools.JobTypeAfterSetup.GetNameLabelValue()), + }) + if err != nil || len(jobs.Items) != 2 { + klog.Warningf("after-setup jobs not ready yet, err=%v count=%d", err, len(jobs.Items)) + return false, nil + } + for _, j := range jobs.Items { + if !tools.IsConditionTrue(j.Status.Conditions, batchv1.JobComplete) { + return false, nil + } + } + return true, nil + } + _ = wait.PollUntilContextTimeout(ctx, tools.JobPollIntervall, tools.SetupJobCompletedTimeout, true, afterSetupDone) + + // Discover cluster config (node names) + clusterCfg, err := config.GetClusterConfig(ctx, kubeClient) + if err != nil { + return err + } + + // Determine which host this pod is on (nsenter wrapper runs on host) + hostOut, _, err := exec.Execute(ctx, "hostname") + if err != nil { + return fmt.Errorf("get host hostname: %w", err) + } + local := strings.TrimSpace(hostOut) + + var peer string + switch local { + case clusterCfg.NodeName1: + peer = clusterCfg.NodeName2 + case clusterCfg.NodeName2: + peer = clusterCfg.NodeName1 + default: + return fmt.Errorf("host %q not in cluster config (%q, %q)", local, clusterCfg.NodeName1, clusterCfg.NodeName2) + } + + klog.Infof("TNF validate: local=%s peer=%s", local, peer) + + // Preflight on host + if _, _, err := exec.Execute(ctx, `command -v pcs`); err != nil { + return fmt.Errorf("pcs absent on host: %w", err) + } + if _, _, err := exec.Execute(ctx, `systemctl is-active pacemaker`); err != nil { + return fmt.Errorf("pacemaker not active: %w", err) + } + + // Ensure peer ONLINE before fence + peerOnlineRE := regexp.MustCompile(`(?mi)^Node\s+` + regexp.QuoteMeta(peer) + `\s+state:\s+([A-Z]+)`) + waitPeer := func(wantOnline bool, timeout time.Duration) error { + check := func(context.Context) (bool, error) { + out, _, err := exec.Execute(ctx, `/usr/sbin/pcs status nodes`) + if err != nil { + // transient during fencing + return false, nil + } + m := peerOnlineRE.FindStringSubmatch(out) + if len(m) != 2 { + return false, nil + } + gotOnline := (m[1] == "ONLINE") + return gotOnline == wantOnline, nil + } + return wait.PollUntilContextTimeout(ctx, 3*time.Second, timeout, true, check) + } + if err := waitPeer(true, 2*time.Minute); err != nil { + return fmt.Errorf("peer %q not ONLINE pre-fence: %w", peer, err) + } + + // Fence peer + if _, _, err := exec.Execute(ctx, fmt.Sprintf(`/usr/sbin/pcs stonith fence %s --wait=300`, peer)); err != nil { + return fmt.Errorf("pcs fence failed: %w", err) + } + + // Wait OFFLINE then ONLINE + if err := waitPeer(false, 10*time.Minute); err != nil { + return fmt.Errorf("peer didn't go OFFLINE: %w", err) + } + if err := waitPeer(true, 15*time.Minute); err != nil { + return fmt.Errorf("peer didn't become ONLINE: %w", err) + } + + klog.Infof("TNF validate: success local=%s peer=%s", local, peer) + return nil +} diff --git a/pkg/tnf/operator/starter.go b/pkg/tnf/operator/starter.go index dcd35fdc9..d659b42a9 100644 --- a/pkg/tnf/operator/starter.go +++ b/pkg/tnf/operator/starter.go @@ -4,10 +4,11 @@ import ( "bytes" "context" "fmt" - operatorv1informers "github.com/openshift/client-go/operator/informers/externalversions/operator/v1" "os" "sync" + operatorv1informers "github.com/openshift/client-go/operator/informers/externalversions/operator/v1" + operatorv1 "github.com/openshift/api/operator/v1" configv1informers "github.com/openshift/client-go/config/informers/externalversions" v1 "github.com/openshift/client-go/config/informers/externalversions/config/v1" @@ -59,8 +60,8 @@ func HandleDualReplicaClusters(ctx context.Context, controlPlaneNodeInformer cache.SharedIndexInformer, etcdInformer operatorv1informers.EtcdInformer, kubeClient kubernetes.Interface, - dynamicClient dynamic.Interface) (bool, error) { - + dynamicClient dynamic.Interface, +) (bool, error) { if isDualReplicaTopology, err := isDualReplicaTopoly(ctx, featureGateAccessor, configInformers); err != nil { return false, err } else if !isDualReplicaTopology { @@ -104,6 +105,7 @@ func HandleDualReplicaClusters(ctx context.Context, for _, node := range nodeList { runJobController(ctx, tools.JobTypeAuth, &node.Name, controllerContext, operatorClient, kubeClient, kubeInformersForNamespaces) runJobController(ctx, tools.JobTypeAfterSetup, &node.Name, controllerContext, operatorClient, kubeClient, kubeInformersForNamespaces) + runJobController(ctx, tools.JobTypeDisruptiveValidate, &node.Name, controllerContext, operatorClient, kubeClient, kubeInformersForNamespaces) } runJobController(ctx, tools.JobTypeSetup, nil, controllerContext, operatorClient, kubeClient, kubeInformersForNamespaces) runJobController(ctx, tools.JobTypeFencing, nil, controllerContext, operatorClient, kubeClient, kubeInformersForNamespaces) @@ -162,8 +164,8 @@ func runExternalEtcdSupportController(ctx context.Context, networkInformer v1.NetworkInformer, controlPlaneNodeInformer cache.SharedIndexInformer, etcdInformer operatorv1informers.EtcdInformer, - kubeClient kubernetes.Interface) { - + kubeClient kubernetes.Interface, +) { klog.Infof("starting external etcd support controller") externalEtcdSupportController := externaletcdsupportcontroller.NewExternalEtcdEnablerController( operatorClient, @@ -225,7 +227,8 @@ func runJobController(ctx context.Context, jobType tools.JobType, nodeName *stri job.Spec.Template.Spec.Containers[0].Image = os.Getenv("OPERATOR_IMAGE") job.Spec.Template.Spec.Containers[0].Command[1] = jobType.GetSubCommand() return nil - }}..., + }, + }..., ) go tnfJobController.Run(ctx, 1) } diff --git a/pkg/tnf/pkg/pcs/fencing.go b/pkg/tnf/pkg/pcs/fencing.go index e21ce7fff..eebd2a32e 100644 --- a/pkg/tnf/pkg/pcs/fencing.go +++ b/pkg/tnf/pkg/pcs/fencing.go @@ -5,13 +5,11 @@ import ( "fmt" "regexp" "strings" - "time" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" - "github.com/openshift/cluster-etcd-operator/pkg/etcdcli" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/config" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/exec" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools" @@ -291,152 +289,3 @@ func deleteObsoleteStonithDevices(ctx context.Context, stonithConfig StonithConf } return nil } - -func peerOf(cfg config.ClusterConfig, local string) (string, error) { - switch local { - case cfg.NodeName1: - return cfg.NodeName2, nil - case cfg.NodeName2: - return cfg.NodeName1, nil - default: - return "", fmt.Errorf("local node %q not in cluster config (%q, %q)", local, cfg.NodeName1, cfg.NodeName2) - } -} - -func localHostname(ctx context.Context) (string, error) { - out, _, err := exec.Execute(ctx, "hostname -f || hostname") - if err != nil { - return "", fmt.Errorf("hostname query failed: %v", err) - } - return strings.TrimSpace(out), nil -} - -func pcsNodesOnline(ctx context.Context) (string, error) { - out, _, err := exec.Execute(ctx, "/usr/sbin/pcs status nodes 2>/dev/null || /usr/sbin/crm_mon -1 2>/dev/null || true") - return out, err -} - -func isOnline(output, name string) bool { - short := name - if i := strings.IndexByte(name, '.'); i > 0 { - short = name[:i] - } - return strings.Contains(output, name) || strings.Contains(output, short) -} - -func waitPacemakerOffline(ctx context.Context, name string, timeout time.Duration) error { - deadline := time.Now().Add(timeout) - for time.Now().Before(deadline) { - out, pcsErr := pcsNodesOnline(ctx) - if pcsErr != nil { - return fmt.Errorf("pcs status failed while waiting OFFLINE: %w", pcsErr) - } - if !isOnline(out, name) { - return nil - } - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(5 * time.Second): - } - } - return fmt.Errorf("timeout waiting for %s to go OFFLINE (pcs)", name) -} - -func waitPacemakerOnline(ctx context.Context, name string, timeout time.Duration) error { - deadline := time.Now().Add(timeout) - for time.Now().Before(deadline) { - out, pcsErr := pcsNodesOnline(ctx) - if pcsErr != nil { - return fmt.Errorf("pcs status failed while waiting ONLINE: %w", pcsErr) - } - if isOnline(out, name) { - return nil - } - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(5 * time.Second): - } - } - return fmt.Errorf("timeout waiting for %s to become ONLINE (pcs)", name) -} - -func waitEtcdHealthy(ctx context.Context, timeout time.Duration, ec etcdcli.EtcdClient) error { - deadline := time.Now().Add(timeout) - for time.Now().Before(deadline) { - ok, fatal := etcdTwoStarted(ctx, ec) - if fatal != nil { - return fmt.Errorf("etcdctl check failed: %v", fatal) - } - if ok { - return nil - } - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(5 * time.Second): - } - } - return fmt.Errorf("timeout waiting for etcd to report 2 started non-learner voters") -} - -func etcdTwoStarted(ctx context.Context, ec etcdcli.EtcdClient) (bool, error) { - members, err := ec.VotingMemberList(ctx) - if err != nil { - return false, fmt.Errorf("list voters: %w", err) - } - if len(members) < 2 { - return false, nil - } - healthy := 0 - for _, m := range members { - ok, err := ec.IsMemberHealthy(ctx, m) - if err != nil { - return false, fmt.Errorf("member %s health: %w", m.Name, err) - } - if ok { - healthy++ - } - } - return healthy >= 2, nil -} - -func ValidateFencingPeerOnly(ctx context.Context, cfg config.ClusterConfig, ec etcdcli.EtcdClient) error { - klog.Info("Validating Fencing (disruptive, peer-only)") - - local, err := localHostname(ctx) - if err != nil { - return err - } - target, err := peerOf(cfg, local) - if err != nil { - return err - } - - klog.Infof("Fencing peer node %q from local %q", target, local) - - out, _ := pcsNodesOnline(ctx) - if !isOnline(out, target) { - return fmt.Errorf("peer %q is not ONLINE before fencing", target) - } - - cmd := fmt.Sprintf("/usr/sbin/pcs stonith fence %q --wait=300", target) - _, stdErr, fenceErr := exec.Execute(ctx, cmd) - if fenceErr != nil { - klog.Error(fenceErr, "pcs stonith fence failed", "stderr", stdErr) - return fmt.Errorf("pcs stonith fence %q failed: %w", target, fenceErr) - } - if err := waitPacemakerOffline(ctx, target, 10*time.Minute); err != nil { - return err - } - if err := waitPacemakerOnline(ctx, target, 15*time.Minute); err != nil { - return err - } - if err := waitEtcdHealthy(ctx, 10*time.Minute, ec); err != nil { - return err - } - - klog.Infof(" peer-only fencing validation passed for peer %q", target) - return nil -} diff --git a/pkg/tnf/pkg/tools/jobs.go b/pkg/tnf/pkg/tools/jobs.go index 793c51bc9..d2f1cb9a7 100644 --- a/pkg/tnf/pkg/tools/jobs.go +++ b/pkg/tnf/pkg/tools/jobs.go @@ -26,6 +26,7 @@ const ( JobTypeSetup JobTypeAfterSetup JobTypeFencing + JobTypeDisruptiveValidate ) func (t JobType) GetSubCommand() string { @@ -38,6 +39,8 @@ func (t JobType) GetSubCommand() string { return "after-setup" case JobTypeFencing: return "fencing" + case JobTypeDisruptiveValidate: + return "disruptive-validate" default: return "" } diff --git a/pkg/tnf/setup/runner.go b/pkg/tnf/setup/runner.go index 31218ad99..ed04d5916 100644 --- a/pkg/tnf/setup/runner.go +++ b/pkg/tnf/setup/runner.go @@ -3,11 +3,8 @@ package setup import ( "context" "fmt" - "os" "time" - configclient "github.com/openshift/client-go/config/clientset/versioned" - configv1informers "github.com/openshift/client-go/config/informers/externalversions" operatorversionedclient "github.com/openshift/client-go/operator/clientset/versioned" batchv1 "k8s.io/api/batch/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -15,31 +12,17 @@ import ( "k8s.io/apiserver/pkg/server" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" - utilclock "k8s.io/utils/clock" - "github.com/openshift/cluster-etcd-operator/pkg/etcdcli" - "github.com/openshift/cluster-etcd-operator/pkg/operator/operatorclient" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/config" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/etcd" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/pcs" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools" - "github.com/openshift/library-go/pkg/operator/events" - "github.com/openshift/library-go/pkg/operator/v1helpers" ) func RunTnfSetup() error { klog.Info("Setting up clients etc. for TNF setup") - ctx, cancel := context.WithCancel(context.Background()) - shutdownHandler := server.SetupSignalHandler() - go func() { - defer cancel() - <-shutdownHandler - klog.Info("Received SIGTERM or SIGINT signal, terminating") - }() - clientConfig, err := rest.InClusterConfig() if err != nil { return err @@ -55,51 +38,18 @@ func RunTnfSetup() error { return err } - // CRD clients - configClient, err := configclient.NewForConfig(clientConfig) + operatorConfigClient, err := operatorversionedclient.NewForConfig(clientConfig) if err != nil { return err } - kubeInformers := v1helpers.NewKubeInformersForNamespaces( - kubeClient, - operatorclient.TargetNamespace, - "", - ) - nodes := kubeInformers.InformersFor("").Core().V1().Nodes() - - cfgInformers := configv1informers.NewSharedInformerFactory(configClient, 0) - networkInformer := cfgInformers.Config().V1().Networks() - recorder := events.NewInMemoryRecorder("tnf-fencing", utilclock.RealClock{}) - - stopCh := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + shutdownHandler := server.SetupSignalHandler() go func() { - <-ctx.Done() - close(stopCh) + defer cancel() + <-shutdownHandler + klog.Info("Received SIGTERM or SIGINT signal, terminating") }() - kubeInformers.Start(stopCh) - cfgInformers.Start(stopCh) - - if ok := cache.WaitForCacheSync( - ctx.Done(), - nodes.Informer().HasSynced, - networkInformer.Informer().HasSynced, - ); !ok { - return fmt.Errorf("failed to sync informers for etcd client") - } - - ec := etcdcli.NewEtcdClient( - kubeInformers, - nodes.Informer(), - nodes.Lister(), - networkInformer, - recorder, - ) - - operatorConfigClient, err := operatorversionedclient.NewForConfig(clientConfig) - if err != nil { - return err - } klog.Info("Waiting for completed auth jobs") authDone := func(context.Context) (done bool, err error) { @@ -180,11 +130,6 @@ func RunTnfSetup() error { return err } - if os.Getenv("TNF_VALIDATE_PEER_ONLY") == "true" { - if err := pcs.ValidateFencingPeerOnly(ctx, cfg, ec); err != nil { - return fmt.Errorf("peer-only disruptive validation failed: %w", err) - } - } klog.Infof("HA setup done! CIB:\n%s", cib) return nil From ea8fec35589edbe40aa24fd394722e3a152fe232 Mon Sep 17 00:00:00 2001 From: nhamza Date: Thu, 18 Sep 2025 16:01:23 +0300 Subject: [PATCH 03/14] add lock mechanism Signed-off-by: nhamza --- pkg/tnf/disruptivevalidate/runner.go | 33 +++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/pkg/tnf/disruptivevalidate/runner.go b/pkg/tnf/disruptivevalidate/runner.go index 33c0b4887..7ece15da6 100644 --- a/pkg/tnf/disruptivevalidate/runner.go +++ b/pkg/tnf/disruptivevalidate/runner.go @@ -1,4 +1,4 @@ -package fencingvalidate +package disruptivevalidate import ( "context" @@ -120,6 +120,29 @@ func RunDisruptiveValidate() error { klog.Infof("TNF validate: local=%s peer=%s", local, peer) + min, max := clusterCfg.NodeName1, clusterCfg.NodeName2 + if strings.Compare(min, max) > 0 { + min, max = max, min + } + + // If I'm the "second" node (max), wait for the "first" node's validate Job to Complete. + if local == max { + targetJobName := tools.JobTypeDisruptiveValidate.GetJobName(&min) // e.g. tnf-disruptive-validate-job- + klog.Infof("validate: %s waiting for %s to complete (%s)", local, min, targetJobName) + + err := wait.PollUntilContextTimeout(ctx, tools.JobPollIntervall, 45*time.Minute, true, func(context.Context) (bool, error) { + j, err := kubeClient.BatchV1().Jobs(operatorclient.TargetNamespace).Get(ctx, targetJobName, metav1.GetOptions{}) + if err != nil { + // NotFound or transient—keep polling + return false, nil + } + return tools.IsConditionTrue(j.Status.Conditions, batchv1.JobComplete), nil + }) + if err != nil { + return fmt.Errorf("timed out waiting for %s (%s) to complete: %w", min, targetJobName, err) + } + klog.Infof("validate: %s saw %s complete; proceeding to fence", local, min) + } // Preflight on host if _, _, err := exec.Execute(ctx, `command -v pcs`); err != nil { return fmt.Errorf("pcs absent on host: %w", err) @@ -151,8 +174,12 @@ func RunDisruptiveValidate() error { } // Fence peer - if _, _, err := exec.Execute(ctx, fmt.Sprintf(`/usr/sbin/pcs stonith fence %s --wait=300`, peer)); err != nil { - return fmt.Errorf("pcs fence failed: %w", err) + if out, _, err := exec.Execute(ctx, fmt.Sprintf(`/usr/sbin/pcs stonith fence %s --wait=60`, peer)); err != nil { + last := out + if nl := strings.LastIndex(out, "\n"); nl >= 0 && nl+1 < len(out) { + last = out[nl+1:] + } + return fmt.Errorf("pcs fence failed: %w (last: %s)", err, strings.TrimSpace(last)) } // Wait OFFLINE then ONLINE From 0e179b94d1293a7e83c262ad7fe46b678dd132b3 Mon Sep 17 00:00:00 2001 From: nhamza Date: Thu, 18 Sep 2025 21:25:02 +0300 Subject: [PATCH 04/14] update node online parsing Signed-off-by: nhamza --- pkg/tnf/disruptivevalidate/runner.go | 54 +++++++++++++++++++++++----- 1 file changed, 46 insertions(+), 8 deletions(-) diff --git a/pkg/tnf/disruptivevalidate/runner.go b/pkg/tnf/disruptivevalidate/runner.go index 7ece15da6..398487789 100644 --- a/pkg/tnf/disruptivevalidate/runner.go +++ b/pkg/tnf/disruptivevalidate/runner.go @@ -152,20 +152,58 @@ func RunDisruptiveValidate() error { } // Ensure peer ONLINE before fence - peerOnlineRE := regexp.MustCompile(`(?mi)^Node\s+` + regexp.QuoteMeta(peer) + `\s+state:\s+([A-Z]+)`) + peerLineRE := regexp.MustCompile(`(?mi)^Node\s+` + regexp.QuoteMeta(peer) + `\s+state:\s+([A-Z]+)`) + waitPeer := func(wantOnline bool, timeout time.Duration) error { check := func(context.Context) (bool, error) { out, _, err := exec.Execute(ctx, `/usr/sbin/pcs status nodes`) if err != nil { - // transient during fencing - return false, nil + return false, nil // transient } - m := peerOnlineRE.FindStringSubmatch(out) - if len(m) != 2 { - return false, nil + + // Fast path: per-node line format + if m := peerLineRE.FindStringSubmatch(out); len(m) == 2 { + gotOnline := (m[1] == "ONLINE") + return gotOnline == wantOnline, nil } - gotOnline := (m[1] == "ONLINE") - return gotOnline == wantOnline, nil + + // Fallback: summary lists + for _, ln := range strings.Split(out, "\n") { + l := strings.TrimSpace(ln) + if l == "" { + continue + } + low := strings.ToLower(l) + + // Decide which list this line represents + var listType string + switch { + case strings.HasPrefix(low, "online:"): + listType = "online" + case strings.HasPrefix(low, "offline:"): + listType = "offline" + case strings.HasPrefix(low, "standby:"), + strings.HasPrefix(low, "standby with resource"): + listType = "standby" + default: + continue + } + + // Extract names after the colon and look for exact token match + colon := strings.Index(l, ":") + if colon < 0 { + continue + } + for _, name := range strings.Fields(strings.TrimSpace(l[colon+1:])) { + if name == peer { + gotOnline := (listType == "online") + return gotOnline == wantOnline, nil + } + } + } + + // Unknown formatting; keep polling + return false, nil } return wait.PollUntilContextTimeout(ctx, 3*time.Second, timeout, true, check) } From 3d07e5105285b5e2955f7b294a000797c5ec1f49 Mon Sep 17 00:00:00 2001 From: nhamza Date: Fri, 19 Sep 2025 00:50:26 +0300 Subject: [PATCH 05/14] remove bad fencing command Signed-off-by: nhamza --- pkg/tnf/disruptivevalidate/runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/tnf/disruptivevalidate/runner.go b/pkg/tnf/disruptivevalidate/runner.go index 398487789..c62a2e51c 100644 --- a/pkg/tnf/disruptivevalidate/runner.go +++ b/pkg/tnf/disruptivevalidate/runner.go @@ -212,7 +212,7 @@ func RunDisruptiveValidate() error { } // Fence peer - if out, _, err := exec.Execute(ctx, fmt.Sprintf(`/usr/sbin/pcs stonith fence %s --wait=60`, peer)); err != nil { + if out, _, err := exec.Execute(ctx, fmt.Sprintf(`/usr/sbin/pcs stonith fence %s`, peer)); err != nil { last := out if nl := strings.LastIndex(out, "\n"); nl >= 0 && nl+1 < len(out) { last = out[nl+1:] From 2e97359fa8883463689634960e78115443acc32d Mon Sep 17 00:00:00 2001 From: nhamza Date: Fri, 19 Sep 2025 02:17:16 +0300 Subject: [PATCH 06/14] add etcd check Signed-off-by: nhamza --- pkg/tnf/disruptivevalidate/runner.go | 97 +++++++++++++++------------- 1 file changed, 51 insertions(+), 46 deletions(-) diff --git a/pkg/tnf/disruptivevalidate/runner.go b/pkg/tnf/disruptivevalidate/runner.go index c62a2e51c..94f27f318 100644 --- a/pkg/tnf/disruptivevalidate/runner.go +++ b/pkg/tnf/disruptivevalidate/runner.go @@ -59,7 +59,7 @@ func RunDisruptiveValidate() error { } _ = wait.PollUntilContextTimeout(ctx, tools.JobPollIntervall, tools.SetupJobCompletedTimeout, true, setupDone) - // NEW: wait for FENCING (cluster-wide) to complete + // wait for FENCING (cluster-wide) to complete klog.Info("Waiting for completed fencing job before validation") fencingDone := func(context.Context) (bool, error) { jobs, err := kubeClient.BatchV1().Jobs(operatorclient.TargetNamespace).List(ctx, metav1.ListOptions{ @@ -76,7 +76,7 @@ func RunDisruptiveValidate() error { } _ = wait.PollUntilContextTimeout(ctx, tools.JobPollIntervall, tools.FencingJobCompletedTimeout, true, fencingDone) - // NEW: wait for BOTH AFTER-SETUP (per-node) jobs to complete + // wait for BOTH AFTER-SETUP (per-node) jobs to complete klog.Info("Waiting for completed after-setup jobs before validation") afterSetupDone := func(context.Context) (bool, error) { jobs, err := kubeClient.BatchV1().Jobs(operatorclient.TargetNamespace).List(ctx, metav1.ListOptions{ @@ -125,7 +125,7 @@ func RunDisruptiveValidate() error { min, max = max, min } - // If I'm the "second" node (max), wait for the "first" node's validate Job to Complete. + // If "second" node (max), wait for the "first" node's validate Job to Complete. if local == max { targetJobName := tools.JobTypeDisruptiveValidate.GetJobName(&min) // e.g. tnf-disruptive-validate-job- klog.Infof("validate: %s waiting for %s to complete (%s)", local, min, targetJobName) @@ -151,63 +151,67 @@ func RunDisruptiveValidate() error { return fmt.Errorf("pacemaker not active: %w", err) } - // Ensure peer ONLINE before fence - peerLineRE := regexp.MustCompile(`(?mi)^Node\s+` + regexp.QuoteMeta(peer) + `\s+state:\s+([A-Z]+)`) - - waitPeer := func(wantOnline bool, timeout time.Duration) error { + // waiter for both peer state + etcd started-both, using `pcs status`. + waitPCS := func(wantPeer *bool, peer string, nodeA, nodeB string, needEtcdBoth bool, timeout time.Duration) error { + // wantPeer: nil = don't check peer, &true = want ONLINE, &false = want OFFLINE + reNodeLine := regexp.MustCompile(`(?mi)^\s*Node\s+(\S+)\s+state:\s+([A-Z]+)`) + reOnline := regexp.MustCompile(`(?mi)^\s*(?:\*\s*)?Online:\s*(.*)$`) + reEtcdList := regexp.MustCompile(`(?s)Clone Set:\s*etcd-clone\s*\[etcd\]:.*?Started:\s*\[\s*([^\]]+)\s*\]`) + reEtcdOne := regexp.MustCompile(`(?mi)^\s*\*\s+etcd\s+\(.*?\):\s*Started\s+(\S+)`) check := func(context.Context) (bool, error) { - out, _, err := exec.Execute(ctx, `/usr/sbin/pcs status nodes`) + out, _, err := exec.Execute(ctx, `/usr/sbin/pcs status`) if err != nil { return false, nil // transient } - // Fast path: per-node line format - if m := peerLineRE.FindStringSubmatch(out); len(m) == 2 { - gotOnline := (m[1] == "ONLINE") - return gotOnline == wantOnline, nil - } + // --- peer ONLINE set --- + online := map[string]bool{} - // Fallback: summary lists - for _, ln := range strings.Split(out, "\n") { - l := strings.TrimSpace(ln) - if l == "" { - continue + // Format A: per-node lines + for _, m := range reNodeLine.FindAllStringSubmatch(out, -1) { + if len(m) == 3 { + online[m[1]] = (m[2] == "ONLINE") } - low := strings.ToLower(l) - - // Decide which list this line represents - var listType string - switch { - case strings.HasPrefix(low, "online:"): - listType = "online" - case strings.HasPrefix(low, "offline:"): - listType = "offline" - case strings.HasPrefix(low, "standby:"), - strings.HasPrefix(low, "standby with resource"): - listType = "standby" - default: - continue + } + // Format B: summary list + if m := reOnline.FindStringSubmatch(out); len(m) == 2 { + for _, n := range strings.Fields(m[1]) { + online[n] = true } + } - // Extract names after the colon and look for exact token match - colon := strings.Index(l, ":") - if colon < 0 { - continue + // --- etcd started set --- + etcdStarted := map[string]bool{} + if m := reEtcdList.FindStringSubmatch(out); len(m) == 2 { + for _, n := range strings.Fields(m[1]) { + etcdStarted[n] = true } - for _, name := range strings.Fields(strings.TrimSpace(l[colon+1:])) { - if name == peer { - gotOnline := (listType == "online") - return gotOnline == wantOnline, nil + } else { + for _, m := range reEtcdOne.FindAllStringSubmatch(out, -1) { + if len(m) == 2 { + etcdStarted[m[1]] = true } } } - // Unknown formatting; keep polling - return false, nil + // Conditions + if wantPeer != nil { + if on, ok := online[peer]; !ok || on != *wantPeer { + return false, nil + } + } + if needEtcdBoth { + if !(etcdStarted[nodeA] && etcdStarted[nodeB]) { + return false, nil + } + } + return true, nil } + return wait.PollUntilContextTimeout(ctx, 3*time.Second, timeout, true, check) } - if err := waitPeer(true, 2*time.Minute); err != nil { + + if err := waitPCS(func() *bool { b := true; return &b }(), peer, clusterCfg.NodeName1, clusterCfg.NodeName2, false, 2*time.Minute); err != nil { return fmt.Errorf("peer %q not ONLINE pre-fence: %w", peer, err) } @@ -221,11 +225,12 @@ func RunDisruptiveValidate() error { } // Wait OFFLINE then ONLINE - if err := waitPeer(false, 10*time.Minute); err != nil { + if err := waitPCS(func() *bool { b := false; return &b }(), peer, clusterCfg.NodeName1, clusterCfg.NodeName2, false, 5*time.Minute); err != nil { return fmt.Errorf("peer didn't go OFFLINE: %w", err) } - if err := waitPeer(true, 15*time.Minute); err != nil { - return fmt.Errorf("peer didn't become ONLINE: %w", err) + + if err := waitPCS(func() *bool { b := true; return &b }(), peer, clusterCfg.NodeName1, clusterCfg.NodeName2, true, 15*time.Minute); err != nil { + return fmt.Errorf("peer didn't become ONLINE with etcd started on both: %w", err) } klog.Infof("TNF validate: success local=%s peer=%s", local, peer) From bfc5151cef300b83c9559b79b2386d30b97dbcb4 Mon Sep 17 00:00:00 2001 From: nhamza Date: Fri, 19 Sep 2025 10:34:37 +0300 Subject: [PATCH 07/14] update etcd started check Signed-off-by: nhamza --- pkg/tnf/disruptivevalidate/runner.go | 69 +++++++++++++++------------- 1 file changed, 38 insertions(+), 31 deletions(-) diff --git a/pkg/tnf/disruptivevalidate/runner.go b/pkg/tnf/disruptivevalidate/runner.go index 94f27f318..78a5912a4 100644 --- a/pkg/tnf/disruptivevalidate/runner.go +++ b/pkg/tnf/disruptivevalidate/runner.go @@ -2,6 +2,7 @@ package disruptivevalidate import ( "context" + "encoding/json" "fmt" "regexp" "strings" @@ -21,6 +22,13 @@ import ( "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools" ) +type etcdMembers struct { + Members []struct { + Name string `json:"name"` + IsLearner bool `json:"isLearner"` + } `json:"members"` +} + func RunDisruptiveValidate() error { klog.Info("Setting up clients for TNF validate job") @@ -124,7 +132,27 @@ func RunDisruptiveValidate() error { if strings.Compare(min, max) > 0 { min, max = max, min } - + waitEtcdVoters := func(nameA, nameB string, timeout time.Duration) error { + check := func(context.Context) (bool, error) { + out, _, err := exec.Execute(ctx, `podman exec etcd sh -lc 'ETCDCTL_API=3 etcdctl member list -w json'`) + if err != nil || strings.TrimSpace(out) == "" { + return false, nil + } + var ml etcdMembers + if e := json.Unmarshal([]byte(out), &ml); e != nil { + return false, nil + } + seen := map[string]bool{} + voter := map[string]bool{} + for _, m := range ml.Members { + seen[m.Name] = true + voter[m.Name] = !m.IsLearner + } + // require both members present and both voters + return seen[nameA] && seen[nameB] && voter[nameA] && voter[nameB], nil + } + return wait.PollUntilContextTimeout(ctx, 3*time.Second, timeout, true, check) + } // If "second" node (max), wait for the "first" node's validate Job to Complete. if local == max { targetJobName := tools.JobTypeDisruptiveValidate.GetJobName(&min) // e.g. tnf-disruptive-validate-job- @@ -141,6 +169,10 @@ func RunDisruptiveValidate() error { if err != nil { return fmt.Errorf("timed out waiting for %s (%s) to complete: %w", min, targetJobName, err) } + + if err := waitEtcdVoters(clusterCfg.NodeName1, clusterCfg.NodeName2, 10*time.Minute); err != nil { + return fmt.Errorf("etcd members did not start or both not voters yet; refusing second fence: %w", err) + } klog.Infof("validate: %s saw %s complete; proceeding to fence", local, min) } // Preflight on host @@ -151,22 +183,18 @@ func RunDisruptiveValidate() error { return fmt.Errorf("pacemaker not active: %w", err) } - // waiter for both peer state + etcd started-both, using `pcs status`. - waitPCS := func(wantPeer *bool, peer string, nodeA, nodeB string, needEtcdBoth bool, timeout time.Duration) error { + // waiter for both peer state + waitPCS := func(wantPeer *bool, peer string, timeout time.Duration) error { // wantPeer: nil = don't check peer, &true = want ONLINE, &false = want OFFLINE reNodeLine := regexp.MustCompile(`(?mi)^\s*Node\s+(\S+)\s+state:\s+([A-Z]+)`) reOnline := regexp.MustCompile(`(?mi)^\s*(?:\*\s*)?Online:\s*(.*)$`) - reEtcdList := regexp.MustCompile(`(?s)Clone Set:\s*etcd-clone\s*\[etcd\]:.*?Started:\s*\[\s*([^\]]+)\s*\]`) - reEtcdOne := regexp.MustCompile(`(?mi)^\s*\*\s+etcd\s+\(.*?\):\s*Started\s+(\S+)`) check := func(context.Context) (bool, error) { out, _, err := exec.Execute(ctx, `/usr/sbin/pcs status`) if err != nil { return false, nil // transient } - // --- peer ONLINE set --- online := map[string]bool{} - // Format A: per-node lines for _, m := range reNodeLine.FindAllStringSubmatch(out, -1) { if len(m) == 3 { @@ -179,39 +207,18 @@ func RunDisruptiveValidate() error { online[n] = true } } - - // --- etcd started set --- - etcdStarted := map[string]bool{} - if m := reEtcdList.FindStringSubmatch(out); len(m) == 2 { - for _, n := range strings.Fields(m[1]) { - etcdStarted[n] = true - } - } else { - for _, m := range reEtcdOne.FindAllStringSubmatch(out, -1) { - if len(m) == 2 { - etcdStarted[m[1]] = true - } - } - } - // Conditions if wantPeer != nil { if on, ok := online[peer]; !ok || on != *wantPeer { return false, nil } } - if needEtcdBoth { - if !(etcdStarted[nodeA] && etcdStarted[nodeB]) { - return false, nil - } - } return true, nil } - return wait.PollUntilContextTimeout(ctx, 3*time.Second, timeout, true, check) } - if err := waitPCS(func() *bool { b := true; return &b }(), peer, clusterCfg.NodeName1, clusterCfg.NodeName2, false, 2*time.Minute); err != nil { + if err := waitPCS(func() *bool { b := true; return &b }(), peer, 2*time.Minute); err != nil { return fmt.Errorf("peer %q not ONLINE pre-fence: %w", peer, err) } @@ -225,11 +232,11 @@ func RunDisruptiveValidate() error { } // Wait OFFLINE then ONLINE - if err := waitPCS(func() *bool { b := false; return &b }(), peer, clusterCfg.NodeName1, clusterCfg.NodeName2, false, 5*time.Minute); err != nil { + if err := waitPCS(func() *bool { b := false; return &b }(), peer, 5*time.Minute); err != nil { return fmt.Errorf("peer didn't go OFFLINE: %w", err) } - if err := waitPCS(func() *bool { b := true; return &b }(), peer, clusterCfg.NodeName1, clusterCfg.NodeName2, true, 15*time.Minute); err != nil { + if err := waitPCS(func() *bool { b := true; return &b }(), peer, 10*time.Minute); err != nil { return fmt.Errorf("peer didn't become ONLINE with etcd started on both: %w", err) } From e7de4c3475b97fdeb51ebdc123df8b70e290b9c4 Mon Sep 17 00:00:00 2001 From: nhamza Date: Fri, 19 Sep 2025 12:41:54 +0300 Subject: [PATCH 08/14] update lock mechanism Signed-off-by: nhamza --- pkg/tnf/disruptivevalidate/runner.go | 84 +++++++++++++++++++--------- 1 file changed, 58 insertions(+), 26 deletions(-) diff --git a/pkg/tnf/disruptivevalidate/runner.go b/pkg/tnf/disruptivevalidate/runner.go index 78a5912a4..a77091739 100644 --- a/pkg/tnf/disruptivevalidate/runner.go +++ b/pkg/tnf/disruptivevalidate/runner.go @@ -4,22 +4,23 @@ import ( "context" "encoding/json" "fmt" + "os" "regexp" "strings" "time" + "github.com/openshift/cluster-etcd-operator/pkg/operator/operatorclient" + "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/config" + "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/exec" + "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools" batchv1 "k8s.io/api/batch/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/server" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/klog/v2" - - "github.com/openshift/cluster-etcd-operator/pkg/operator/operatorclient" - "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/config" - "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/exec" - "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools" ) type etcdMembers struct { @@ -65,8 +66,9 @@ func RunDisruptiveValidate() error { } return true, nil } - _ = wait.PollUntilContextTimeout(ctx, tools.JobPollIntervall, tools.SetupJobCompletedTimeout, true, setupDone) - + if err := wait.PollUntilContextTimeout(ctx, tools.JobPollIntervall, tools.SetupJobCompletedTimeout, true, setupDone); err != nil { + return fmt.Errorf("waiting for setup to complete: %w", err) + } // wait for FENCING (cluster-wide) to complete klog.Info("Waiting for completed fencing job before validation") fencingDone := func(context.Context) (bool, error) { @@ -109,12 +111,19 @@ func RunDisruptiveValidate() error { return err } - // Determine which host this pod is on (nsenter wrapper runs on host) - hostOut, _, err := exec.Execute(ctx, "hostname") + // Determine which node this pod is running on + podName, err := os.Hostname() // pod name == container hostname + if err != nil || strings.TrimSpace(podName) == "" { + return fmt.Errorf("get pod hostname/name: %w", err) + } + pod, err := kubeClient.CoreV1().Pods(operatorclient.TargetNamespace).Get(ctx, podName, metav1.GetOptions{}) if err != nil { - return fmt.Errorf("get host hostname: %w", err) + return fmt.Errorf("get Pod %s/%s: %w", operatorclient.TargetNamespace, podName, err) + } + local := strings.TrimSpace(pod.Spec.NodeName) + if local == "" { + return fmt.Errorf("pod.Spec.NodeName empty") } - local := strings.TrimSpace(hostOut) var peer string switch local { @@ -132,7 +141,7 @@ func RunDisruptiveValidate() error { if strings.Compare(min, max) > 0 { min, max = max, min } - waitEtcdVoters := func(nameA, nameB string, timeout time.Duration) error { + waitEtcdVoters := func(timeout time.Duration) error { check := func(context.Context) (bool, error) { out, _, err := exec.Execute(ctx, `podman exec etcd sh -lc 'ETCDCTL_API=3 etcdctl member list -w json'`) if err != nil || strings.TrimSpace(out) == "" { @@ -142,39 +151,62 @@ func RunDisruptiveValidate() error { if e := json.Unmarshal([]byte(out), &ml); e != nil { return false, nil } - seen := map[string]bool{} - voter := map[string]bool{} + total, voters := 0, 0 for _, m := range ml.Members { - seen[m.Name] = true - voter[m.Name] = !m.IsLearner + total++ + if !m.IsLearner { + voters++ + } } - // require both members present and both voters - return seen[nameA] && seen[nameB] && voter[nameA] && voter[nameB], nil + // Require exactly 2 members and both voters + return total == 2 && voters == 2, nil } return wait.PollUntilContextTimeout(ctx, 3*time.Second, timeout, true, check) } // If "second" node (max), wait for the "first" node's validate Job to Complete. if local == max { - targetJobName := tools.JobTypeDisruptiveValidate.GetJobName(&min) // e.g. tnf-disruptive-validate-job- - klog.Infof("validate: %s waiting for %s to complete (%s)", local, min, targetJobName) + targetJobName := tools.JobTypeDisruptiveValidate.GetJobName(&min) // e.g., tnf-disruptive-validate-job-master-0 + klog.Infof("validate: %s waiting for %s to complete (job=%s)", local, min, targetJobName) + + // Robust job state helpers + isJobComplete := func(j *batchv1.Job) bool { + if j.Status.Succeeded > 0 { + return true + } + return tools.IsConditionTrue(j.Status.Conditions, batchv1.JobComplete) + } + isJobFailed := func(j *batchv1.Job) bool { + if j.Status.Failed > 0 { + return true + } + return tools.IsConditionTrue(j.Status.Conditions, batchv1.JobFailed) + } err := wait.PollUntilContextTimeout(ctx, tools.JobPollIntervall, 45*time.Minute, true, func(context.Context) (bool, error) { j, err := kubeClient.BatchV1().Jobs(operatorclient.TargetNamespace).Get(ctx, targetJobName, metav1.GetOptions{}) - if err != nil { - // NotFound or transient—keep polling + if apierrors.IsNotFound(err) { + klog.Infof("peer job %s not found yet; still waiting", targetJobName) return false, nil } - return tools.IsConditionTrue(j.Status.Conditions, batchv1.JobComplete), nil + if err != nil { + klog.Warningf("peer job get error: %v", err) + return false, nil // transient + } + if isJobFailed(j) { + return false, fmt.Errorf("peer validate job %s failed (failed=%d)", targetJobName, j.Status.Failed) + } + return isJobComplete(j), nil }) if err != nil { return fmt.Errorf("timed out waiting for %s (%s) to complete: %w", min, targetJobName, err) } - if err := waitEtcdVoters(clusterCfg.NodeName1, clusterCfg.NodeName2, 10*time.Minute); err != nil { - return fmt.Errorf("etcd members did not start or both not voters yet; refusing second fence: %w", err) + if err := waitEtcdVoters(10 * time.Minute); err != nil { + return fmt.Errorf("etcd members not both voters after first validate: %w", err) } - klog.Infof("validate: %s saw %s complete; proceeding to fence", local, min) + klog.Infof("validate: %s saw %s complete + etcd OK; proceeding to fence", local, min) } + // Preflight on host if _, _, err := exec.Execute(ctx, `command -v pcs`); err != nil { return fmt.Errorf("pcs absent on host: %w", err) From 140682fb0cda946de5b93cf59e865775671beb41 Mon Sep 17 00:00:00 2001 From: nhamza Date: Fri, 19 Sep 2025 15:03:32 +0300 Subject: [PATCH 09/14] adjust knowing who is the local node Signed-off-by: nhamza --- pkg/tnf/disruptivevalidate/runner.go | 399 +++++++++++++-------------- 1 file changed, 191 insertions(+), 208 deletions(-) diff --git a/pkg/tnf/disruptivevalidate/runner.go b/pkg/tnf/disruptivevalidate/runner.go index a77091739..51f07b5a5 100644 --- a/pkg/tnf/disruptivevalidate/runner.go +++ b/pkg/tnf/disruptivevalidate/runner.go @@ -5,273 +5,256 @@ import ( "encoding/json" "fmt" "os" - "regexp" "strings" "time" - "github.com/openshift/cluster-etcd-operator/pkg/operator/operatorclient" - "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/config" - "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/exec" - "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools" batchv1 "k8s.io/api/batch/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" + wait "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/server" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/klog/v2" + + "github.com/openshift/cluster-etcd-operator/pkg/operator/operatorclient" + "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/config" + "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/exec" + "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools" ) -type etcdMembers struct { - Members []struct { - Name string `json:"name"` - IsLearner bool `json:"isLearner"` - } `json:"members"` -} +const ( + poll = 3 * time.Second + timeoutSetup = tools.SetupJobCompletedTimeout + timeoutFencing = tools.FencingJobCompletedTimeout + timeoutAfter = tools.SetupJobCompletedTimeout + timeoutPeerJob = 45 * time.Minute + timeoutPCSUp = 2 * time.Minute + timeoutPCSDown = 5 * time.Minute + timeoutPCSBackUp = 10 * time.Minute + timeoutEtcdOK = 10 * time.Minute + jobNamePrefix = "tnf-disruptive-validate-job-" +) func RunDisruptiveValidate() error { - klog.Info("Setting up clients for TNF validate job") - ctx, cancel := context.WithCancel(context.Background()) - shutdown := server.SetupSignalHandler() - go func() { - defer cancel() - <-shutdown - klog.Info("Received termination signal, exiting validate job") - }() + defer cancel() + go func() { <-server.SetupSignalHandler(); cancel() }() - // kube client - clientConfig, err := rest.InClusterConfig() + cfg, err := rest.InClusterConfig() if err != nil { return err } - kubeClient, err := kubernetes.NewForConfig(clientConfig) + kc, err := kubernetes.NewForConfig(cfg) if err != nil { return err } - // Wait for SETUP (cluster-wide) to complete - klog.Info("Waiting for completed setup job before validation") - setupDone := func(context.Context) (bool, error) { - jobs, err := kubeClient.BatchV1().Jobs(operatorclient.TargetNamespace).List(ctx, metav1.ListOptions{ - LabelSelector: fmt.Sprintf("app.kubernetes.io/name=%s", tools.JobTypeSetup.GetNameLabelValue()), - }) - if err != nil || len(jobs.Items) != 1 { - klog.Warningf("setup job not ready yet, err=%v count=%d", err, len(jobs.Items)) - return false, nil - } - if !tools.IsConditionTrue(jobs.Items[0].Status.Conditions, batchv1.JobComplete) { - return false, nil - } - return true, nil + // 1) Wait orchestrated jobs + if err := waitForLabeledJob(ctx, kc, tools.JobTypeSetup.GetNameLabelValue(), 1, timeoutSetup); err != nil { + return fmt.Errorf("setup not complete: %w", err) } - if err := wait.PollUntilContextTimeout(ctx, tools.JobPollIntervall, tools.SetupJobCompletedTimeout, true, setupDone); err != nil { - return fmt.Errorf("waiting for setup to complete: %w", err) + if err := waitForLabeledJob(ctx, kc, tools.JobTypeFencing.GetNameLabelValue(), 1, timeoutFencing); err != nil { + return fmt.Errorf("fencing not complete: %w", err) } - // wait for FENCING (cluster-wide) to complete - klog.Info("Waiting for completed fencing job before validation") - fencingDone := func(context.Context) (bool, error) { - jobs, err := kubeClient.BatchV1().Jobs(operatorclient.TargetNamespace).List(ctx, metav1.ListOptions{ - LabelSelector: fmt.Sprintf("app.kubernetes.io/name=%s", tools.JobTypeFencing.GetNameLabelValue()), - }) - if err != nil || len(jobs.Items) != 1 { - klog.Warningf("fencing job not ready yet, err=%v count=%d", err, len(jobs.Items)) - return false, nil - } - if !tools.IsConditionTrue(jobs.Items[0].Status.Conditions, batchv1.JobComplete) { - return false, nil + if err := waitForLabeledJob(ctx, kc, tools.JobTypeAfterSetup.GetNameLabelValue(), 2, timeoutAfter); err != nil { + return fmt.Errorf("after-setup not complete: %w", err) + } + + // 2) Local / peer discovery + clusterCfg, err := config.GetClusterConfig(ctx, kc) + if err != nil { + return err + } + local, peer, err := detectLocalAndPeer(ctx, kc, clusterCfg.NodeName1, clusterCfg.NodeName2) + if err != nil { + return err + } + klog.Infof("validate: local=%s peer=%s", local, peer) + + // 3) Lexicographic sequencing + if err := waitPeerValidateIfSecond(ctx, kc, local, clusterCfg.NodeName1, clusterCfg.NodeName2); err != nil { + return err + } + + // Pre-fence health + if err := waitEtcdTwoVoters(ctx, timeoutEtcdOK); err != nil { + return fmt.Errorf("pre-fence etcd not healthy: %w", err) + } + + // 4) PCS preflight + if _, _, err := exec.Execute(ctx, `command -v pcs`); err != nil { + return fmt.Errorf("pcs not found: %w", err) + } + if _, _, err := exec.Execute(ctx, `systemctl is-active pacemaker`); err != nil { + return fmt.Errorf("pacemaker not active: %w", err) + } + if err := waitPCSState(ctx, boolp(true), peer, timeoutPCSUp); err != nil { + return fmt.Errorf("peer %q not ONLINE pre-fence: %w", peer, err) + } + + // 5) Fence → OFFLINE → ONLINE → etcd healthy + out, _, ferr := exec.Execute(ctx, fmt.Sprintf(`/usr/sbin/pcs stonith fence %s`, peer)) + if ferr != nil { + ls := out + if i := strings.LastIndex(ls, "\n"); i >= 0 && i+1 < len(ls) { + ls = ls[i+1:] } - return true, nil + return fmt.Errorf("pcs fence %s failed: %w (last line: %s)", peer, ferr, strings.TrimSpace(ls)) } - _ = wait.PollUntilContextTimeout(ctx, tools.JobPollIntervall, tools.FencingJobCompletedTimeout, true, fencingDone) + if err := waitPCSState(ctx, boolp(false), peer, timeoutPCSDown); err != nil { + return fmt.Errorf("peer didn't go OFFLINE: %w", err) + } + if err := waitPCSState(ctx, boolp(true), peer, timeoutPCSBackUp); err != nil { + return fmt.Errorf("peer didn't return ONLINE: %w", err) + } + if err := waitEtcdTwoVoters(ctx, timeoutEtcdOK); err != nil { + return fmt.Errorf("post-fence etcd not healthy: %w", err) + } + + klog.Infof("validate: success local=%s peer=%s", local, peer) + return nil +} - // wait for BOTH AFTER-SETUP (per-node) jobs to complete - klog.Info("Waiting for completed after-setup jobs before validation") - afterSetupDone := func(context.Context) (bool, error) { - jobs, err := kubeClient.BatchV1().Jobs(operatorclient.TargetNamespace).List(ctx, metav1.ListOptions{ - LabelSelector: fmt.Sprintf("app.kubernetes.io/name=%s", tools.JobTypeAfterSetup.GetNameLabelValue()), - }) - if err != nil || len(jobs.Items) != 2 { - klog.Warningf("after-setup jobs not ready yet, err=%v count=%d", err, len(jobs.Items)) +// helpers +func waitForLabeledJob(ctx context.Context, kc kubernetes.Interface, nameLabel string, wantAtLeast int, to time.Duration) error { + sel := fmt.Sprintf("app.kubernetes.io/name=%s", nameLabel) + return wait.PollUntilContextTimeout(ctx, poll, to, true, func(context.Context) (bool, error) { + jl, err := kc.BatchV1().Jobs(operatorclient.TargetNamespace).List(ctx, metav1.ListOptions{LabelSelector: sel}) + if err != nil || len(jl.Items) < wantAtLeast { return false, nil } - for _, j := range jobs.Items { - if !tools.IsConditionTrue(j.Status.Conditions, batchv1.JobComplete) { - return false, nil + completed := 0 + for i := range jl.Items { + j := &jl.Items[i] + if j.Status.Succeeded > 0 || tools.IsConditionTrue(j.Status.Conditions, batchv1.JobComplete) { + completed++ } } - return true, nil - } - _ = wait.PollUntilContextTimeout(ctx, tools.JobPollIntervall, tools.SetupJobCompletedTimeout, true, afterSetupDone) - - // Discover cluster config (node names) - clusterCfg, err := config.GetClusterConfig(ctx, kubeClient) - if err != nil { - return err - } + return completed >= wantAtLeast, nil + }) +} - // Determine which node this pod is running on - podName, err := os.Hostname() // pod name == container hostname +func detectLocalAndPeer(_ context.Context, _ kubernetes.Interface, n1, n2 string) (string, string, error) { + podName, err := os.Hostname() if err != nil || strings.TrimSpace(podName) == "" { - return fmt.Errorf("get pod hostname/name: %w", err) + return "", "", fmt.Errorf("get pod hostname: %w", err) } - pod, err := kubeClient.CoreV1().Pods(operatorclient.TargetNamespace).Get(ctx, podName, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("get Pod %s/%s: %w", operatorclient.TargetNamespace, podName, err) + // "-" -> "" + i := strings.LastIndex(podName, "-") + if i <= 0 { + return "", "", fmt.Errorf("cannot derive job name from pod %q", podName) } - local := strings.TrimSpace(pod.Spec.NodeName) - if local == "" { - return fmt.Errorf("pod.Spec.NodeName empty") + jobName := podName[:i] + + if !strings.HasPrefix(jobName, jobNamePrefix) { + return "", "", fmt.Errorf("unexpected job name %q (want prefix %q)", jobName, jobNamePrefix) } + local := strings.TrimPrefix(jobName, jobNamePrefix) - var peer string switch local { - case clusterCfg.NodeName1: - peer = clusterCfg.NodeName2 - case clusterCfg.NodeName2: - peer = clusterCfg.NodeName1 + case n1: + return local, n2, nil + case n2: + return local, n1, nil default: - return fmt.Errorf("host %q not in cluster config (%q, %q)", local, clusterCfg.NodeName1, clusterCfg.NodeName2) + return "", "", fmt.Errorf("local %q not in cluster config (%q,%q)", local, n1, n2) } +} - klog.Infof("TNF validate: local=%s peer=%s", local, peer) - - min, max := clusterCfg.NodeName1, clusterCfg.NodeName2 +func waitPeerValidateIfSecond(ctx context.Context, kc kubernetes.Interface, local, a, b string) error { + min, max := a, b if strings.Compare(min, max) > 0 { min, max = max, min } - waitEtcdVoters := func(timeout time.Duration) error { - check := func(context.Context) (bool, error) { - out, _, err := exec.Execute(ctx, `podman exec etcd sh -lc 'ETCDCTL_API=3 etcdctl member list -w json'`) - if err != nil || strings.TrimSpace(out) == "" { - return false, nil - } - var ml etcdMembers - if e := json.Unmarshal([]byte(out), &ml); e != nil { - return false, nil - } - total, voters := 0, 0 - for _, m := range ml.Members { - total++ - if !m.IsLearner { - voters++ - } - } - // Require exactly 2 members and both voters - return total == 2 && voters == 2, nil - } - return wait.PollUntilContextTimeout(ctx, 3*time.Second, timeout, true, check) + if local != max { + return nil } - // If "second" node (max), wait for the "first" node's validate Job to Complete. - if local == max { - targetJobName := tools.JobTypeDisruptiveValidate.GetJobName(&min) // e.g., tnf-disruptive-validate-job-master-0 - klog.Infof("validate: %s waiting for %s to complete (job=%s)", local, min, targetJobName) - - // Robust job state helpers - isJobComplete := func(j *batchv1.Job) bool { - if j.Status.Succeeded > 0 { - return true + target := tools.JobTypeDisruptiveValidate.GetJobName(&min) + missingSince := time.Time{} + return wait.PollUntilContextTimeout(ctx, poll, timeoutPeerJob, true, func(context.Context) (bool, error) { + j, err := kc.BatchV1().Jobs(operatorclient.TargetNamespace).Get(ctx, target, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + if missingSince.IsZero() { + missingSince = time.Now() + } else if time.Since(missingSince) > 2*time.Minute { + return false, fmt.Errorf("peer job %s not found for >2m; likely GC'd or never created", target) } - return tools.IsConditionTrue(j.Status.Conditions, batchv1.JobComplete) + return false, nil } - isJobFailed := func(j *batchv1.Job) bool { - if j.Status.Failed > 0 { - return true - } - return tools.IsConditionTrue(j.Status.Conditions, batchv1.JobFailed) + if err != nil { + return false, nil } + if j.Status.Failed > 0 || tools.IsConditionTrue(j.Status.Conditions, batchv1.JobFailed) { + return false, fmt.Errorf("peer validate job %s failed", target) + } + return j.Status.Succeeded > 0 || tools.IsConditionTrue(j.Status.Conditions, batchv1.JobComplete), nil + }) +} - err := wait.PollUntilContextTimeout(ctx, tools.JobPollIntervall, 45*time.Minute, true, func(context.Context) (bool, error) { - j, err := kubeClient.BatchV1().Jobs(operatorclient.TargetNamespace).Get(ctx, targetJobName, metav1.GetOptions{}) - if apierrors.IsNotFound(err) { - klog.Infof("peer job %s not found yet; still waiting", targetJobName) - return false, nil - } - if err != nil { - klog.Warningf("peer job get error: %v", err) - return false, nil // transient - } - if isJobFailed(j) { - return false, fmt.Errorf("peer validate job %s failed (failed=%d)", targetJobName, j.Status.Failed) +func waitEtcdTwoVoters(ctx context.Context, to time.Duration) error { + return wait.PollUntilContextTimeout(ctx, poll, to, true, func(context.Context) (bool, error) { + out, _, err := exec.Execute(ctx, `podman exec etcd sh -lc 'ETCDCTL_API=3 etcdctl member list -w json'`) + out = strings.TrimSpace(out) + if err != nil || len(out) < 2 { + return false, nil + } + var ml struct { + Members []struct { + IsLearner bool `json:"isLearner"` + } `json:"members"` + } + if json.Unmarshal([]byte(out), &ml) != nil { + return false, nil + } + total, voters := 0, 0 + for _, m := range ml.Members { + total++ + if !m.IsLearner { + voters++ } - return isJobComplete(j), nil - }) - if err != nil { - return fmt.Errorf("timed out waiting for %s (%s) to complete: %w", min, targetJobName, err) } + return total == 2 && voters == 2, nil + }) +} - if err := waitEtcdVoters(10 * time.Minute); err != nil { - return fmt.Errorf("etcd members not both voters after first validate: %w", err) +func waitPCSState(ctx context.Context, want *bool, peer string, to time.Duration) error { + return wait.PollUntilContextTimeout(ctx, poll, to, true, func(context.Context) (bool, error) { + out, _, err := exec.Execute(ctx, `/usr/sbin/pcs status`) + if err != nil { + return false, nil } - klog.Infof("validate: %s saw %s complete + etcd OK; proceeding to fence", local, min) - } - - // Preflight on host - if _, _, err := exec.Execute(ctx, `command -v pcs`); err != nil { - return fmt.Errorf("pcs absent on host: %w", err) - } - if _, _, err := exec.Execute(ctx, `systemctl is-active pacemaker`); err != nil { - return fmt.Errorf("pacemaker not active: %w", err) - } - - // waiter for both peer state - waitPCS := func(wantPeer *bool, peer string, timeout time.Duration) error { - // wantPeer: nil = don't check peer, &true = want ONLINE, &false = want OFFLINE - reNodeLine := regexp.MustCompile(`(?mi)^\s*Node\s+(\S+)\s+state:\s+([A-Z]+)`) - reOnline := regexp.MustCompile(`(?mi)^\s*(?:\*\s*)?Online:\s*(.*)$`) - check := func(context.Context) (bool, error) { - out, _, err := exec.Execute(ctx, `/usr/sbin/pcs status`) - if err != nil { - return false, nil // transient - } - // --- peer ONLINE set --- - online := map[string]bool{} - // Format A: per-node lines - for _, m := range reNodeLine.FindAllStringSubmatch(out, -1) { - if len(m) == 3 { - online[m[1]] = (m[2] == "ONLINE") + online := map[string]bool{} + for _, ln := range strings.Split(out, "\n") { + s := strings.TrimSpace(strings.TrimPrefix(ln, "*")) + // Format A: "Node state: ONLINE|OFFLINE" + if strings.HasPrefix(s, "Node ") && strings.Contains(s, " state: ") { + fs := strings.Fields(s) + if len(fs) >= 4 { + state := fs[len(fs)-1] + online[fs[1]] = (state == "ONLINE") } + continue } - // Format B: summary list - if m := reOnline.FindStringSubmatch(out); len(m) == 2 { - for _, n := range strings.Fields(m[1]) { - online[n] = true + // Format B: "Online: n1 n2 ..." / "Offline: nX ..." + if strings.HasPrefix(s, "Online:") { + for _, n := range strings.Fields(s[len("Online:"):]) { + online[strings.TrimSpace(n)] = true } - } - // Conditions - if wantPeer != nil { - if on, ok := online[peer]; !ok || on != *wantPeer { - return false, nil + } else if strings.HasPrefix(s, "Offline:") { + for _, n := range strings.Fields(s[len("Offline:"):]) { + if _, seen := online[strings.TrimSpace(n)]; !seen { + online[strings.TrimSpace(n)] = false + } } } - return true, nil } - return wait.PollUntilContextTimeout(ctx, 3*time.Second, timeout, true, check) - } - - if err := waitPCS(func() *bool { b := true; return &b }(), peer, 2*time.Minute); err != nil { - return fmt.Errorf("peer %q not ONLINE pre-fence: %w", peer, err) - } - - // Fence peer - if out, _, err := exec.Execute(ctx, fmt.Sprintf(`/usr/sbin/pcs stonith fence %s`, peer)); err != nil { - last := out - if nl := strings.LastIndex(out, "\n"); nl >= 0 && nl+1 < len(out) { - last = out[nl+1:] + if want == nil { + return true, nil } - return fmt.Errorf("pcs fence failed: %w (last: %s)", err, strings.TrimSpace(last)) - } - - // Wait OFFLINE then ONLINE - if err := waitPCS(func() *bool { b := false; return &b }(), peer, 5*time.Minute); err != nil { - return fmt.Errorf("peer didn't go OFFLINE: %w", err) - } - - if err := waitPCS(func() *bool { b := true; return &b }(), peer, 10*time.Minute); err != nil { - return fmt.Errorf("peer didn't become ONLINE with etcd started on both: %w", err) - } - - klog.Infof("TNF validate: success local=%s peer=%s", local, peer) - return nil + on, ok := online[peer] + return ok && on == *want, nil + }) } + +func boolp(b bool) *bool { return &b } From 9525b6f2f6beeff2b972878e2bdeaf417ab6ffb3 Mon Sep 17 00:00:00 2001 From: nhamza Date: Fri, 19 Sep 2025 16:34:48 +0300 Subject: [PATCH 10/14] dont fail on retries Signed-off-by: nhamza --- pkg/tnf/disruptivevalidate/runner.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/tnf/disruptivevalidate/runner.go b/pkg/tnf/disruptivevalidate/runner.go index 51f07b5a5..93c8bbded 100644 --- a/pkg/tnf/disruptivevalidate/runner.go +++ b/pkg/tnf/disruptivevalidate/runner.go @@ -170,24 +170,23 @@ func waitPeerValidateIfSecond(ctx context.Context, kc kubernetes.Interface, loca if local != max { return nil } + target := tools.JobTypeDisruptiveValidate.GetJobName(&min) - missingSince := time.Time{} return wait.PollUntilContextTimeout(ctx, poll, timeoutPeerJob, true, func(context.Context) (bool, error) { j, err := kc.BatchV1().Jobs(operatorclient.TargetNamespace).Get(ctx, target, metav1.GetOptions{}) if apierrors.IsNotFound(err) { - if missingSince.IsZero() { - missingSince = time.Now() - } else if time.Since(missingSince) > 2*time.Minute { - return false, fmt.Errorf("peer job %s not found for >2m; likely GC'd or never created", target) - } return false, nil } if err != nil { return false, nil } - if j.Status.Failed > 0 || tools.IsConditionTrue(j.Status.Conditions, batchv1.JobFailed) { + klog.V(2).Infof("peer %s status: succeeded=%d failed=%d conditions=%+v", target, j.Status.Succeeded, j.Status.Failed, j.Status.Conditions) + + // Only treat as failed if the JobFailed condition is set + if tools.IsConditionTrue(j.Status.Conditions, batchv1.JobFailed) { return false, fmt.Errorf("peer validate job %s failed", target) } + // Proceed when the peer is complete return j.Status.Succeeded > 0 || tools.IsConditionTrue(j.Status.Conditions, batchv1.JobComplete), nil }) } From efb27eff1fccc264e2f30f1849e195a5c3dfb2c3 Mon Sep 17 00:00:00 2001 From: nhamza Date: Fri, 19 Sep 2025 18:37:13 +0300 Subject: [PATCH 11/14] update node online logic Signed-off-by: nhamza --- pkg/tnf/disruptivevalidate/runner.go | 44 +++++++++++----------------- 1 file changed, 17 insertions(+), 27 deletions(-) diff --git a/pkg/tnf/disruptivevalidate/runner.go b/pkg/tnf/disruptivevalidate/runner.go index 93c8bbded..1b34777ff 100644 --- a/pkg/tnf/disruptivevalidate/runner.go +++ b/pkg/tnf/disruptivevalidate/runner.go @@ -219,40 +219,30 @@ func waitEtcdTwoVoters(ctx context.Context, to time.Duration) error { func waitPCSState(ctx context.Context, want *bool, peer string, to time.Duration) error { return wait.PollUntilContextTimeout(ctx, poll, to, true, func(context.Context) (bool, error) { - out, _, err := exec.Execute(ctx, `/usr/sbin/pcs status`) + out, _, err := exec.Execute(ctx, `LC_ALL=C /usr/sbin/pcs status nodes`) if err != nil { - return false, nil + return false, nil // treat as transient } - online := map[string]bool{} + + // Find the "Online:" line and build a set of names listed there. + var onlineLine string for _, ln := range strings.Split(out, "\n") { - s := strings.TrimSpace(strings.TrimPrefix(ln, "*")) - // Format A: "Node state: ONLINE|OFFLINE" - if strings.HasPrefix(s, "Node ") && strings.Contains(s, " state: ") { - fs := strings.Fields(s) - if len(fs) >= 4 { - state := fs[len(fs)-1] - online[fs[1]] = (state == "ONLINE") - } - continue - } - // Format B: "Online: n1 n2 ..." / "Offline: nX ..." + s := strings.TrimSpace(ln) if strings.HasPrefix(s, "Online:") { - for _, n := range strings.Fields(s[len("Online:"):]) { - online[strings.TrimSpace(n)] = true - } - } else if strings.HasPrefix(s, "Offline:") { - for _, n := range strings.Fields(s[len("Offline:"):]) { - if _, seen := online[strings.TrimSpace(n)]; !seen { - online[strings.TrimSpace(n)] = false - } - } + onlineLine = strings.TrimSpace(strings.TrimPrefix(s, "Online:")) + break } } - if want == nil { - return true, nil + peerOnline := false + if onlineLine != "" { + for _, tok := range strings.Fields(onlineLine) { + if strings.Trim(tok, "[],") == peer { + peerOnline = true + break + } + } } - on, ok := online[peer] - return ok && on == *want, nil + return peerOnline == *want, nil }) } From fc08add5e09fdd7ef0586d631a731a324e57f9aa Mon Sep 17 00:00:00 2001 From: nhamza Date: Thu, 25 Sep 2025 10:33:11 +0300 Subject: [PATCH 12/14] update per requested changes Signed-off-by: nhamza --- pkg/tnf/disruptivevalidate/runner.go | 78 +++++++++++++++++++--------- 1 file changed, 53 insertions(+), 25 deletions(-) diff --git a/pkg/tnf/disruptivevalidate/runner.go b/pkg/tnf/disruptivevalidate/runner.go index 1b34777ff..5b7d22131 100644 --- a/pkg/tnf/disruptivevalidate/runner.go +++ b/pkg/tnf/disruptivevalidate/runner.go @@ -9,7 +9,6 @@ import ( "time" batchv1 "k8s.io/api/batch/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" wait "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/server" @@ -21,6 +20,7 @@ import ( "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/config" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/exec" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools" + apierrors "k8s.io/apimachinery/pkg/api/errors" ) const ( @@ -66,7 +66,7 @@ func RunDisruptiveValidate() error { if err != nil { return err } - local, peer, err := detectLocalAndPeer(ctx, kc, clusterCfg.NodeName1, clusterCfg.NodeName2) + local, peer, err := detectLocalAndPeer(clusterCfg.NodeName1, clusterCfg.NodeName2) if err != nil { return err } @@ -117,16 +117,39 @@ func RunDisruptiveValidate() error { } // helpers -func waitForLabeledJob(ctx context.Context, kc kubernetes.Interface, nameLabel string, wantAtLeast int, to time.Duration) error { - sel := fmt.Sprintf("app.kubernetes.io/name=%s", nameLabel) +func waitForJobs( + ctx context.Context, + kc kubernetes.Interface, + byName string, + labelSelector string, + wantAtLeast int, + to time.Duration, +) error { return wait.PollUntilContextTimeout(ctx, poll, to, true, func(context.Context) (bool, error) { - jl, err := kc.BatchV1().Jobs(operatorclient.TargetNamespace).List(ctx, metav1.ListOptions{LabelSelector: sel}) - if err != nil || len(jl.Items) < wantAtLeast { + if byName != "" { + j, err := kc.BatchV1().Jobs(operatorclient.TargetNamespace).Get(ctx, byName, metav1.GetOptions{}) + if apierrors.IsNotFound(err) || err != nil { + return false, nil // keep polling on transient/not-found + } + if tools.IsConditionTrue(j.Status.Conditions, batchv1.JobFailed) { + return false, fmt.Errorf("job %s failed", byName) + } + return j.Status.Succeeded > 0 || tools.IsConditionTrue(j.Status.Conditions, batchv1.JobComplete), nil + } + // selector path + jl, err := kc.BatchV1().Jobs(operatorclient.TargetNamespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) + if err != nil { + return false, nil // transient + } + if len(jl.Items) < wantAtLeast { return false, nil } completed := 0 for i := range jl.Items { j := &jl.Items[i] + if tools.IsConditionTrue(j.Status.Conditions, batchv1.JobFailed) { + return false, fmt.Errorf("job %s failed", j.Name) + } if j.Status.Succeeded > 0 || tools.IsConditionTrue(j.Status.Conditions, batchv1.JobComplete) { completed++ } @@ -135,11 +158,29 @@ func waitForLabeledJob(ctx context.Context, kc kubernetes.Interface, nameLabel s }) } -func detectLocalAndPeer(_ context.Context, _ kubernetes.Interface, n1, n2 string) (string, string, error) { +func waitForLabeledJob(ctx context.Context, kc kubernetes.Interface, nameLabel string, wantAtLeast int, to time.Duration) error { + return waitForJobs(ctx, kc, + "", // byName + fmt.Sprintf("app.kubernetes.io/name=%s", nameLabel), + wantAtLeast, to) +} + +func waitForJobName(ctx context.Context, kc kubernetes.Interface, name string, to time.Duration) error { + return waitForJobs(ctx, kc, + name, // byName + "", // labelSelector + 1, to) +} + +func detectLocalAndPeer(n1, n2 string) (string, string, error) { podName, err := os.Hostname() - if err != nil || strings.TrimSpace(podName) == "" { + if err != nil { return "", "", fmt.Errorf("get pod hostname: %w", err) } + podName = strings.TrimSpace(podName) + if podName == "" { + return "", "", fmt.Errorf("get pod hostname: empty string") + } // "-" -> "" i := strings.LastIndex(podName, "-") if i <= 0 { @@ -172,23 +213,10 @@ func waitPeerValidateIfSecond(ctx context.Context, kc kubernetes.Interface, loca } target := tools.JobTypeDisruptiveValidate.GetJobName(&min) - return wait.PollUntilContextTimeout(ctx, poll, timeoutPeerJob, true, func(context.Context) (bool, error) { - j, err := kc.BatchV1().Jobs(operatorclient.TargetNamespace).Get(ctx, target, metav1.GetOptions{}) - if apierrors.IsNotFound(err) { - return false, nil - } - if err != nil { - return false, nil - } - klog.V(2).Infof("peer %s status: succeeded=%d failed=%d conditions=%+v", target, j.Status.Succeeded, j.Status.Failed, j.Status.Conditions) - - // Only treat as failed if the JobFailed condition is set - if tools.IsConditionTrue(j.Status.Conditions, batchv1.JobFailed) { - return false, fmt.Errorf("peer validate job %s failed", target) - } - // Proceed when the peer is complete - return j.Status.Succeeded > 0 || tools.IsConditionTrue(j.Status.Conditions, batchv1.JobComplete), nil - }) + if err := waitForJobName(ctx, kc, target, timeoutPeerJob); err != nil { + return fmt.Errorf("peer validate job %s not complete: %w", min, err) + } + return nil } func waitEtcdTwoVoters(ctx context.Context, to time.Duration) error { From 6a2b9a4ed8902fb7d8dda71e31caf02c51462117 Mon Sep 17 00:00:00 2001 From: nhamza Date: Thu, 25 Sep 2025 11:06:29 +0300 Subject: [PATCH 13/14] fix TTL-deleted infinite loop Signed-off-by: nhamza --- pkg/tnf/disruptivevalidate/runner.go | 48 +++++++++++++++++++++------- 1 file changed, 37 insertions(+), 11 deletions(-) diff --git a/pkg/tnf/disruptivevalidate/runner.go b/pkg/tnf/disruptivevalidate/runner.go index 5b7d22131..4c4c4229a 100644 --- a/pkg/tnf/disruptivevalidate/runner.go +++ b/pkg/tnf/disruptivevalidate/runner.go @@ -54,14 +54,7 @@ func RunDisruptiveValidate() error { if err := waitForLabeledJob(ctx, kc, tools.JobTypeSetup.GetNameLabelValue(), 1, timeoutSetup); err != nil { return fmt.Errorf("setup not complete: %w", err) } - if err := waitForLabeledJob(ctx, kc, tools.JobTypeFencing.GetNameLabelValue(), 1, timeoutFencing); err != nil { - return fmt.Errorf("fencing not complete: %w", err) - } - if err := waitForLabeledJob(ctx, kc, tools.JobTypeAfterSetup.GetNameLabelValue(), 2, timeoutAfter); err != nil { - return fmt.Errorf("after-setup not complete: %w", err) - } - // 2) Local / peer discovery clusterCfg, err := config.GetClusterConfig(ctx, kc) if err != nil { return err @@ -72,7 +65,17 @@ func RunDisruptiveValidate() error { } klog.Infof("validate: local=%s peer=%s", local, peer) - // 3) Lexicographic sequencing + if err := waitForLabeledJob(ctx, kc, tools.JobTypeFencing.GetNameLabelValue(), 1, timeoutFencing); err != nil { + return fmt.Errorf("fencing not complete: %w", err) + } + if err := waitForJobName(ctx, kc, tools.JobTypeAfterSetup.GetJobName(&clusterCfg.NodeName1), timeoutAfter); err != nil { + return fmt.Errorf("after-setup not complete on %s: %w", clusterCfg.NodeName1, err) + } + if err := waitForJobName(ctx, kc, tools.JobTypeAfterSetup.GetJobName(&clusterCfg.NodeName2), timeoutAfter); err != nil { + return fmt.Errorf("after-setup not complete on %s: %w", clusterCfg.NodeName2, err) + } + + // 2) Lexicographic sequencing if err := waitPeerValidateIfSecond(ctx, kc, local, clusterCfg.NodeName1, clusterCfg.NodeName2); err != nil { return err } @@ -125,18 +128,41 @@ func waitForJobs( wantAtLeast int, to time.Duration, ) error { + // TTL tolerance state (captured by the poll closure) + const appearanceGrace = 2 * time.Minute + start := time.Now() + seen := false + return wait.PollUntilContextTimeout(ctx, poll, to, true, func(context.Context) (bool, error) { if byName != "" { j, err := kc.BatchV1().Jobs(operatorclient.TargetNamespace).Get(ctx, byName, metav1.GetOptions{}) - if apierrors.IsNotFound(err) || err != nil { - return false, nil // keep polling on transient/not-found + if apierrors.IsNotFound(err) { + // If we never saw the job and it stays NotFound past a short grace, + // assume it completed earlier and was TTL-deleted. + if !seen { + if time.Since(start) > appearanceGrace { + klog.V(2).Infof("job %s not found for %s; assuming completed earlier and TTL-deleted", byName, appearanceGrace) + return true, nil + } + return false, nil + } + // We saw it before and now it's gone → assume TTL after completion. + klog.V(2).Infof("job %s disappeared after observation; assuming TTL after completion", byName) + return true, nil } + if err != nil { + return false, nil // transient + } + + seen = true + if tools.IsConditionTrue(j.Status.Conditions, batchv1.JobFailed) { return false, fmt.Errorf("job %s failed", byName) } return j.Status.Succeeded > 0 || tools.IsConditionTrue(j.Status.Conditions, batchv1.JobComplete), nil } - // selector path + + // selector path (aggregate waits) jl, err := kc.BatchV1().Jobs(operatorclient.TargetNamespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) if err != nil { return false, nil // transient From 0003e21b655655bd8839bbbc923e99d9c0a69168 Mon Sep 17 00:00:00 2001 From: nhamza Date: Thu, 25 Sep 2025 11:18:50 +0300 Subject: [PATCH 14/14] fix not marking unseen job as completed Signed-off-by: nhamza --- pkg/tnf/disruptivevalidate/runner.go | 35 ++++++++++++++-------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/pkg/tnf/disruptivevalidate/runner.go b/pkg/tnf/disruptivevalidate/runner.go index 4c4c4229a..01c8c1633 100644 --- a/pkg/tnf/disruptivevalidate/runner.go +++ b/pkg/tnf/disruptivevalidate/runner.go @@ -127,8 +127,8 @@ func waitForJobs( labelSelector string, wantAtLeast int, to time.Duration, + allowNeverSeenTTL bool, // NEW ) error { - // TTL tolerance state (captured by the poll closure) const appearanceGrace = 2 * time.Minute start := time.Now() seen := false @@ -137,18 +137,15 @@ func waitForJobs( if byName != "" { j, err := kc.BatchV1().Jobs(operatorclient.TargetNamespace).Get(ctx, byName, metav1.GetOptions{}) if apierrors.IsNotFound(err) { - // If we never saw the job and it stays NotFound past a short grace, - // assume it completed earlier and was TTL-deleted. - if !seen { - if time.Since(start) > appearanceGrace { - klog.V(2).Infof("job %s not found for %s; assuming completed earlier and TTL-deleted", byName, appearanceGrace) - return true, nil - } - return false, nil + if seen { + klog.V(2).Infof("job %s disappeared after observation; assuming TTL after completion", byName) + return true, nil } - // We saw it before and now it's gone → assume TTL after completion. - klog.V(2).Infof("job %s disappeared after observation; assuming TTL after completion", byName) - return true, nil + if allowNeverSeenTTL && time.Since(start) > appearanceGrace { + klog.V(2).Infof("job %s not found for %s; assuming completed earlier and TTL-deleted", byName, appearanceGrace) + return true, nil + } + return false, nil } if err != nil { return false, nil // transient @@ -188,14 +185,18 @@ func waitForLabeledJob(ctx context.Context, kc kubernetes.Interface, nameLabel s return waitForJobs(ctx, kc, "", // byName fmt.Sprintf("app.kubernetes.io/name=%s", nameLabel), - wantAtLeast, to) + wantAtLeast, to, false) // strict } func waitForJobName(ctx context.Context, kc kubernetes.Interface, name string, to time.Duration) error { return waitForJobs(ctx, kc, - name, // byName - "", // labelSelector - 1, to) + name, // byName + "", // labelSelector + 1, to, false) // strict +} + +func waitForJobNamePeerTTL(ctx context.Context, kc kubernetes.Interface, name string, to time.Duration) error { + return waitForJobs(ctx, kc, name, "", 1, to, true) // allowNeverSeenTTL = true } func detectLocalAndPeer(n1, n2 string) (string, string, error) { @@ -239,7 +240,7 @@ func waitPeerValidateIfSecond(ctx context.Context, kc kubernetes.Interface, loca } target := tools.JobTypeDisruptiveValidate.GetJobName(&min) - if err := waitForJobName(ctx, kc, target, timeoutPeerJob); err != nil { + if err := waitForJobNamePeerTTL(ctx, kc, target, timeoutPeerJob); err != nil { return fmt.Errorf("peer validate job %s not complete: %w", min, err) } return nil