Skip to content
Open
14 changes: 14 additions & 0 deletions cmd/tnf-setup-runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

tnfaftersetup "github.com/openshift/cluster-etcd-operator/pkg/tnf/after-setup"
tnfauth "github.com/openshift/cluster-etcd-operator/pkg/tnf/auth"
disruptivevalidate "github.com/openshift/cluster-etcd-operator/pkg/tnf/disruptivevalidate"
tnffencing "github.com/openshift/cluster-etcd-operator/pkg/tnf/fencing"
"github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools"
tnfsetup "github.com/openshift/cluster-etcd-operator/pkg/tnf/setup"
Expand Down Expand Up @@ -58,6 +59,7 @@ func NewTnfSetupRunnerCommand() *cobra.Command {
cmd.AddCommand(NewSetupCommand())
cmd.AddCommand(NewAfterSetupCommand())
cmd.AddCommand(NewFencingCommand())
cmd.AddCommand(NewDisruptiveValidateCommand())

return cmd
}
Expand Down Expand Up @@ -113,3 +115,15 @@ func NewFencingCommand() *cobra.Command {
},
}
}

func NewDisruptiveValidateCommand() *cobra.Command {
return &cobra.Command{
Use: tools.JobTypeDisruptiveValidate.GetSubCommand(),
Short: "Run disruptive peer validation from this node",
Run: func(cmd *cobra.Command, args []string) {
if err := disruptivevalidate.RunDisruptiveValidate(); err != nil {
klog.Fatal(err)
}
},
}
}
277 changes: 277 additions & 0 deletions pkg/tnf/disruptivevalidate/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
package disruptivevalidate

import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
"time"

batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
wait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"

"github.com/openshift/cluster-etcd-operator/pkg/operator/operatorclient"
"github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/config"
"github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/exec"
"github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools"
apierrors "k8s.io/apimachinery/pkg/api/errors"
)

const (
poll = 3 * time.Second
timeoutSetup = tools.SetupJobCompletedTimeout
timeoutFencing = tools.FencingJobCompletedTimeout
timeoutAfter = tools.SetupJobCompletedTimeout
timeoutPeerJob = 45 * time.Minute
timeoutPCSUp = 2 * time.Minute
timeoutPCSDown = 5 * time.Minute
timeoutPCSBackUp = 10 * time.Minute
timeoutEtcdOK = 10 * time.Minute
jobNamePrefix = "tnf-disruptive-validate-job-"
)

func RunDisruptiveValidate() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() { <-server.SetupSignalHandler(); cancel() }()

cfg, err := rest.InClusterConfig()
if err != nil {
return err
}
kc, err := kubernetes.NewForConfig(cfg)
if err != nil {
return err
}

// 1) Wait orchestrated jobs
if err := waitForLabeledJob(ctx, kc, tools.JobTypeSetup.GetNameLabelValue(), 1, timeoutSetup); err != nil {
return fmt.Errorf("setup not complete: %w", err)
}
if err := waitForLabeledJob(ctx, kc, tools.JobTypeFencing.GetNameLabelValue(), 1, timeoutFencing); err != nil {
return fmt.Errorf("fencing not complete: %w", err)
}
if err := waitForLabeledJob(ctx, kc, tools.JobTypeAfterSetup.GetNameLabelValue(), 2, timeoutAfter); err != nil {
return fmt.Errorf("after-setup not complete: %w", err)
}

// 2) Local / peer discovery
clusterCfg, err := config.GetClusterConfig(ctx, kc)
if err != nil {
return err
}
local, peer, err := detectLocalAndPeer(clusterCfg.NodeName1, clusterCfg.NodeName2)
if err != nil {
return err
}
klog.Infof("validate: local=%s peer=%s", local, peer)

// 3) Lexicographic sequencing
if err := waitPeerValidateIfSecond(ctx, kc, local, clusterCfg.NodeName1, clusterCfg.NodeName2); err != nil {
return err
}

// Pre-fence health
if err := waitEtcdTwoVoters(ctx, timeoutEtcdOK); err != nil {
return fmt.Errorf("pre-fence etcd not healthy: %w", err)
}

// 4) PCS preflight
if _, _, err := exec.Execute(ctx, `command -v pcs`); err != nil {
return fmt.Errorf("pcs not found: %w", err)
}
if _, _, err := exec.Execute(ctx, `systemctl is-active pacemaker`); err != nil {
return fmt.Errorf("pacemaker not active: %w", err)
}
if err := waitPCSState(ctx, boolp(true), peer, timeoutPCSUp); err != nil {
return fmt.Errorf("peer %q not ONLINE pre-fence: %w", peer, err)
}

// 5) Fence → OFFLINE → ONLINE → etcd healthy
out, _, ferr := exec.Execute(ctx, fmt.Sprintf(`/usr/sbin/pcs stonith fence %s`, peer))
if ferr != nil {
ls := out
if i := strings.LastIndex(ls, "\n"); i >= 0 && i+1 < len(ls) {
ls = ls[i+1:]
}
return fmt.Errorf("pcs fence %s failed: %w (last line: %s)", peer, ferr, strings.TrimSpace(ls))
}
if err := waitPCSState(ctx, boolp(false), peer, timeoutPCSDown); err != nil {
return fmt.Errorf("peer didn't go OFFLINE: %w", err)
}
if err := waitPCSState(ctx, boolp(true), peer, timeoutPCSBackUp); err != nil {
return fmt.Errorf("peer didn't return ONLINE: %w", err)
}
if err := waitEtcdTwoVoters(ctx, timeoutEtcdOK); err != nil {
return fmt.Errorf("post-fence etcd not healthy: %w", err)
}

klog.Infof("validate: success local=%s peer=%s", local, peer)
return nil
}

// helpers
func waitForJobs(
ctx context.Context,
kc kubernetes.Interface,
byName string,
labelSelector string,
wantAtLeast int,
to time.Duration,
) error {
return wait.PollUntilContextTimeout(ctx, poll, to, true, func(context.Context) (bool, error) {
if byName != "" {
j, err := kc.BatchV1().Jobs(operatorclient.TargetNamespace).Get(ctx, byName, metav1.GetOptions{})
if apierrors.IsNotFound(err) || err != nil {
return false, nil // keep polling on transient/not-found
}
if tools.IsConditionTrue(j.Status.Conditions, batchv1.JobFailed) {
return false, fmt.Errorf("job %s failed", byName)
}
return j.Status.Succeeded > 0 || tools.IsConditionTrue(j.Status.Conditions, batchv1.JobComplete), nil
}
// selector path
jl, err := kc.BatchV1().Jobs(operatorclient.TargetNamespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
if err != nil {
return false, nil // transient
}
if len(jl.Items) < wantAtLeast {
return false, nil
}
completed := 0
for i := range jl.Items {
j := &jl.Items[i]
if tools.IsConditionTrue(j.Status.Conditions, batchv1.JobFailed) {
return false, fmt.Errorf("job %s failed", j.Name)
}
if j.Status.Succeeded > 0 || tools.IsConditionTrue(j.Status.Conditions, batchv1.JobComplete) {
completed++
}
}
return completed >= wantAtLeast, nil
})
}

func waitForLabeledJob(ctx context.Context, kc kubernetes.Interface, nameLabel string, wantAtLeast int, to time.Duration) error {
return waitForJobs(ctx, kc,
"", // byName
fmt.Sprintf("app.kubernetes.io/name=%s", nameLabel),
wantAtLeast, to)
}

func waitForJobName(ctx context.Context, kc kubernetes.Interface, name string, to time.Duration) error {
return waitForJobs(ctx, kc,
name, // byName
"", // labelSelector
1, to)
}

func detectLocalAndPeer(n1, n2 string) (string, string, error) {
podName, err := os.Hostname()
if err != nil {
return "", "", fmt.Errorf("get pod hostname: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the err is nil, but strings.TrimSpace(podName) returns empty string, the resulting error message won't be very helpful

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true! updated

}
podName = strings.TrimSpace(podName)
if podName == "" {
return "", "", fmt.Errorf("get pod hostname: empty string")
}
// "<job-name>-<suffix>" -> "<job-name>"
i := strings.LastIndex(podName, "-")
if i <= 0 {
return "", "", fmt.Errorf("cannot derive job name from pod %q", podName)
}
jobName := podName[:i]

if !strings.HasPrefix(jobName, jobNamePrefix) {
return "", "", fmt.Errorf("unexpected job name %q (want prefix %q)", jobName, jobNamePrefix)
}
local := strings.TrimPrefix(jobName, jobNamePrefix)

switch local {
case n1:
return local, n2, nil
case n2:
return local, n1, nil
default:
return "", "", fmt.Errorf("local %q not in cluster config (%q,%q)", local, n1, n2)
}
}

func waitPeerValidateIfSecond(ctx context.Context, kc kubernetes.Interface, local, a, b string) error {
min, max := a, b
if strings.Compare(min, max) > 0 {
min, max = max, min
}
if local != max {
return nil
}

target := tools.JobTypeDisruptiveValidate.GetJobName(&min)
if err := waitForJobName(ctx, kc, target, timeoutPeerJob); err != nil {
return fmt.Errorf("peer validate job %s not complete: %w", min, err)
}
return nil
}

func waitEtcdTwoVoters(ctx context.Context, to time.Duration) error {
return wait.PollUntilContextTimeout(ctx, poll, to, true, func(context.Context) (bool, error) {
out, _, err := exec.Execute(ctx, `podman exec etcd sh -lc 'ETCDCTL_API=3 etcdctl member list -w json'`)
out = strings.TrimSpace(out)
if err != nil || len(out) < 2 {
return false, nil
}
var ml struct {
Members []struct {
IsLearner bool `json:"isLearner"`
} `json:"members"`
}
if json.Unmarshal([]byte(out), &ml) != nil {
return false, nil
}
total, voters := 0, 0
for _, m := range ml.Members {
total++
if !m.IsLearner {
voters++
}
}
return total == 2 && voters == 2, nil
})
}

func waitPCSState(ctx context.Context, want *bool, peer string, to time.Duration) error {
return wait.PollUntilContextTimeout(ctx, poll, to, true, func(context.Context) (bool, error) {
out, _, err := exec.Execute(ctx, `LC_ALL=C /usr/sbin/pcs status nodes`)
if err != nil {
return false, nil // treat as transient
}

// Find the "Online:" line and build a set of names listed there.
var onlineLine string
for _, ln := range strings.Split(out, "\n") {
s := strings.TrimSpace(ln)
if strings.HasPrefix(s, "Online:") {
onlineLine = strings.TrimSpace(strings.TrimPrefix(s, "Online:"))
break
}
}
peerOnline := false
if onlineLine != "" {
for _, tok := range strings.Fields(onlineLine) {
if strings.Trim(tok, "[],") == peer {
peerOnline = true
break
}
}
}
return peerOnline == *want, nil
})
}

func boolp(b bool) *bool { return &b }
15 changes: 9 additions & 6 deletions pkg/tnf/operator/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"bytes"
"context"
"fmt"
operatorv1informers "github.com/openshift/client-go/operator/informers/externalversions/operator/v1"
"os"
"sync"

operatorv1informers "github.com/openshift/client-go/operator/informers/externalversions/operator/v1"

operatorv1 "github.com/openshift/api/operator/v1"
configv1informers "github.com/openshift/client-go/config/informers/externalversions"
v1 "github.com/openshift/client-go/config/informers/externalversions/config/v1"
Expand Down Expand Up @@ -59,8 +60,8 @@ func HandleDualReplicaClusters(ctx context.Context,
controlPlaneNodeInformer cache.SharedIndexInformer,
etcdInformer operatorv1informers.EtcdInformer,
kubeClient kubernetes.Interface,
dynamicClient dynamic.Interface) (bool, error) {

dynamicClient dynamic.Interface,
) (bool, error) {
if isDualReplicaTopology, err := isDualReplicaTopoly(ctx, featureGateAccessor, configInformers); err != nil {
return false, err
} else if !isDualReplicaTopology {
Expand Down Expand Up @@ -104,6 +105,7 @@ func HandleDualReplicaClusters(ctx context.Context,
for _, node := range nodeList {
runJobController(ctx, tools.JobTypeAuth, &node.Name, controllerContext, operatorClient, kubeClient, kubeInformersForNamespaces)
runJobController(ctx, tools.JobTypeAfterSetup, &node.Name, controllerContext, operatorClient, kubeClient, kubeInformersForNamespaces)
runJobController(ctx, tools.JobTypeDisruptiveValidate, &node.Name, controllerContext, operatorClient, kubeClient, kubeInformersForNamespaces)
}
runJobController(ctx, tools.JobTypeSetup, nil, controllerContext, operatorClient, kubeClient, kubeInformersForNamespaces)
runJobController(ctx, tools.JobTypeFencing, nil, controllerContext, operatorClient, kubeClient, kubeInformersForNamespaces)
Expand Down Expand Up @@ -162,8 +164,8 @@ func runExternalEtcdSupportController(ctx context.Context,
networkInformer v1.NetworkInformer,
controlPlaneNodeInformer cache.SharedIndexInformer,
etcdInformer operatorv1informers.EtcdInformer,
kubeClient kubernetes.Interface) {

kubeClient kubernetes.Interface,
) {
klog.Infof("starting external etcd support controller")
externalEtcdSupportController := externaletcdsupportcontroller.NewExternalEtcdEnablerController(
operatorClient,
Expand Down Expand Up @@ -225,7 +227,8 @@ func runJobController(ctx context.Context, jobType tools.JobType, nodeName *stri
job.Spec.Template.Spec.Containers[0].Image = os.Getenv("OPERATOR_IMAGE")
job.Spec.Template.Spec.Containers[0].Command[1] = jobType.GetSubCommand()
return nil
}}...,
},
}...,
)
go tnfJobController.Run(ctx, 1)
}
Expand Down
7 changes: 1 addition & 6 deletions pkg/tnf/pkg/pcs/fencing.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ import (
"github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools"
)

var (
addressRegEx = regexp.MustCompile(`.*//(.*):(.*)(/redfish.*)`)
)
var addressRegEx = regexp.MustCompile(`.*//(.*):(.*)(/redfish.*)`)

const (
// defaultPcmkDelayBase is the delay applied to the first fence device to prevent simultaneous fencing
Expand Down Expand Up @@ -117,11 +115,9 @@ func ConfigureFencing(ctx context.Context, kubeClient kubernetes.Interface, cfg

klog.Info("Fencing configuration succeeded!")
return nil

}

func getFencingConfig(nodeName string, secret *corev1.Secret) (*fencingConfig, error) {

address := string(secret.Data["address"])
if !strings.Contains(address, "redfish") {
klog.Errorf("Secret %s does not contain redfish address", secret.Name)
Expand Down Expand Up @@ -184,7 +180,6 @@ func getStatusCommand(fc fencingConfig) string {
}

func getStonithCommand(sc StonithConfig, fc fencingConfig) string {

stonithAction := "create"
// check if device already exists
for _, p := range sc.Primitives {
Expand Down
Loading