Skip to content
Open
14 changes: 14 additions & 0 deletions cmd/tnf-setup-runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

tnfaftersetup "github.com/openshift/cluster-etcd-operator/pkg/tnf/after-setup"
tnfauth "github.com/openshift/cluster-etcd-operator/pkg/tnf/auth"
disruptivevalidate "github.com/openshift/cluster-etcd-operator/pkg/tnf/disruptivevalidate"
tnffencing "github.com/openshift/cluster-etcd-operator/pkg/tnf/fencing"
"github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools"
tnfsetup "github.com/openshift/cluster-etcd-operator/pkg/tnf/setup"
Expand Down Expand Up @@ -58,6 +59,7 @@ func NewTnfSetupRunnerCommand() *cobra.Command {
cmd.AddCommand(NewSetupCommand())
cmd.AddCommand(NewAfterSetupCommand())
cmd.AddCommand(NewFencingCommand())
cmd.AddCommand(NewDisruptiveValidateCommand())

return cmd
}
Expand Down Expand Up @@ -113,3 +115,15 @@ func NewFencingCommand() *cobra.Command {
},
}
}

func NewDisruptiveValidateCommand() *cobra.Command {
return &cobra.Command{
Use: tools.JobTypeDisruptiveValidate.GetSubCommand(),
Short: "Run disruptive peer validation from this node",
Run: func(cmd *cobra.Command, args []string) {
if err := disruptivevalidate.RunDisruptiveValidate(); err != nil {
klog.Fatal(err)
}
},
}
}
249 changes: 249 additions & 0 deletions pkg/tnf/disruptivevalidate/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
package disruptivevalidate

import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
"time"

batchv1 "k8s.io/api/batch/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
wait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"

"github.com/openshift/cluster-etcd-operator/pkg/operator/operatorclient"
"github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/config"
"github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/exec"
"github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools"
)

const (
poll = 3 * time.Second
timeoutSetup = tools.SetupJobCompletedTimeout
timeoutFencing = tools.FencingJobCompletedTimeout
timeoutAfter = tools.SetupJobCompletedTimeout
timeoutPeerJob = 45 * time.Minute
timeoutPCSUp = 2 * time.Minute
timeoutPCSDown = 5 * time.Minute
timeoutPCSBackUp = 10 * time.Minute
timeoutEtcdOK = 10 * time.Minute
jobNamePrefix = "tnf-disruptive-validate-job-"
)

func RunDisruptiveValidate() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() { <-server.SetupSignalHandler(); cancel() }()

cfg, err := rest.InClusterConfig()
if err != nil {
return err
}
kc, err := kubernetes.NewForConfig(cfg)
if err != nil {
return err
}

// 1) Wait orchestrated jobs
if err := waitForLabeledJob(ctx, kc, tools.JobTypeSetup.GetNameLabelValue(), 1, timeoutSetup); err != nil {
return fmt.Errorf("setup not complete: %w", err)
}
if err := waitForLabeledJob(ctx, kc, tools.JobTypeFencing.GetNameLabelValue(), 1, timeoutFencing); err != nil {
return fmt.Errorf("fencing not complete: %w", err)
}
if err := waitForLabeledJob(ctx, kc, tools.JobTypeAfterSetup.GetNameLabelValue(), 2, timeoutAfter); err != nil {
return fmt.Errorf("after-setup not complete: %w", err)
}

// 2) Local / peer discovery
clusterCfg, err := config.GetClusterConfig(ctx, kc)
if err != nil {
return err
}
local, peer, err := detectLocalAndPeer(ctx, kc, clusterCfg.NodeName1, clusterCfg.NodeName2)
if err != nil {
return err
}
klog.Infof("validate: local=%s peer=%s", local, peer)

// 3) Lexicographic sequencing
if err := waitPeerValidateIfSecond(ctx, kc, local, clusterCfg.NodeName1, clusterCfg.NodeName2); err != nil {
return err
}

// Pre-fence health
if err := waitEtcdTwoVoters(ctx, timeoutEtcdOK); err != nil {
return fmt.Errorf("pre-fence etcd not healthy: %w", err)
}

// 4) PCS preflight
if _, _, err := exec.Execute(ctx, `command -v pcs`); err != nil {
return fmt.Errorf("pcs not found: %w", err)
}
if _, _, err := exec.Execute(ctx, `systemctl is-active pacemaker`); err != nil {
return fmt.Errorf("pacemaker not active: %w", err)
}
if err := waitPCSState(ctx, boolp(true), peer, timeoutPCSUp); err != nil {
return fmt.Errorf("peer %q not ONLINE pre-fence: %w", peer, err)
}

// 5) Fence → OFFLINE → ONLINE → etcd healthy
out, _, ferr := exec.Execute(ctx, fmt.Sprintf(`/usr/sbin/pcs stonith fence %s`, peer))
if ferr != nil {
ls := out
if i := strings.LastIndex(ls, "\n"); i >= 0 && i+1 < len(ls) {
ls = ls[i+1:]
}
return fmt.Errorf("pcs fence %s failed: %w (last line: %s)", peer, ferr, strings.TrimSpace(ls))
}
if err := waitPCSState(ctx, boolp(false), peer, timeoutPCSDown); err != nil {
return fmt.Errorf("peer didn't go OFFLINE: %w", err)
}
if err := waitPCSState(ctx, boolp(true), peer, timeoutPCSBackUp); err != nil {
return fmt.Errorf("peer didn't return ONLINE: %w", err)
}
if err := waitEtcdTwoVoters(ctx, timeoutEtcdOK); err != nil {
return fmt.Errorf("post-fence etcd not healthy: %w", err)
}

klog.Infof("validate: success local=%s peer=%s", local, peer)
return nil
}

// helpers
func waitForLabeledJob(ctx context.Context, kc kubernetes.Interface, nameLabel string, wantAtLeast int, to time.Duration) error {
sel := fmt.Sprintf("app.kubernetes.io/name=%s", nameLabel)
return wait.PollUntilContextTimeout(ctx, poll, to, true, func(context.Context) (bool, error) {
jl, err := kc.BatchV1().Jobs(operatorclient.TargetNamespace).List(ctx, metav1.ListOptions{LabelSelector: sel})
if err != nil || len(jl.Items) < wantAtLeast {
return false, nil
}
completed := 0
for i := range jl.Items {
j := &jl.Items[i]
if j.Status.Succeeded > 0 || tools.IsConditionTrue(j.Status.Conditions, batchv1.JobComplete) {
completed++
}
}
return completed >= wantAtLeast, nil
})
}

func detectLocalAndPeer(_ context.Context, _ kubernetes.Interface, n1, n2 string) (string, string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason to pass context and kubernetes.Interface if not used?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was for consistency and future-ready for any API usage but i will remove it for now

podName, err := os.Hostname()
if err != nil || strings.TrimSpace(podName) == "" {
return "", "", fmt.Errorf("get pod hostname: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the err is nil, but strings.TrimSpace(podName) returns empty string, the resulting error message won't be very helpful

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true! updated

}
// "<job-name>-<suffix>" -> "<job-name>"
i := strings.LastIndex(podName, "-")
if i <= 0 {
return "", "", fmt.Errorf("cannot derive job name from pod %q", podName)
}
jobName := podName[:i]

if !strings.HasPrefix(jobName, jobNamePrefix) {
return "", "", fmt.Errorf("unexpected job name %q (want prefix %q)", jobName, jobNamePrefix)
}
local := strings.TrimPrefix(jobName, jobNamePrefix)

switch local {
case n1:
return local, n2, nil
case n2:
return local, n1, nil
default:
return "", "", fmt.Errorf("local %q not in cluster config (%q,%q)", local, n1, n2)
}
}

func waitPeerValidateIfSecond(ctx context.Context, kc kubernetes.Interface, local, a, b string) error {
min, max := a, b
if strings.Compare(min, max) > 0 {
min, max = max, min
}
if local != max {
return nil
}

target := tools.JobTypeDisruptiveValidate.GetJobName(&min)
return wait.PollUntilContextTimeout(ctx, poll, timeoutPeerJob, true, func(context.Context) (bool, error) {
j, err := kc.BatchV1().Jobs(operatorclient.TargetNamespace).Get(ctx, target, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return false, nil
}
if err != nil {
return false, nil
}
klog.V(2).Infof("peer %s status: succeeded=%d failed=%d conditions=%+v", target, j.Status.Succeeded, j.Status.Failed, j.Status.Conditions)

// Only treat as failed if the JobFailed condition is set
if tools.IsConditionTrue(j.Status.Conditions, batchv1.JobFailed) {
return false, fmt.Errorf("peer validate job %s failed", target)
}
// Proceed when the peer is complete
return j.Status.Succeeded > 0 || tools.IsConditionTrue(j.Status.Conditions, batchv1.JobComplete), nil
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very similar to waitForLabeledJob to me (well except the logging). Any chance we can label the validation job too, and reuse the same function?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i already tag the jobs just like we do with other jobs. however the label is not unique for each node, meaning i had to find a way to detect this specific job for this node
for that case i merged the logic into one function.
since labeling each node job would require some logic change in other places which could be fragile also for the other jobs which i would rather not touch
now it should look better

}

func waitEtcdTwoVoters(ctx context.Context, to time.Duration) error {
return wait.PollUntilContextTimeout(ctx, poll, to, true, func(context.Context) (bool, error) {
out, _, err := exec.Execute(ctx, `podman exec etcd sh -lc 'ETCDCTL_API=3 etcdctl member list -w json'`)
out = strings.TrimSpace(out)
if err != nil || len(out) < 2 {
return false, nil
}
var ml struct {
Members []struct {
IsLearner bool `json:"isLearner"`
} `json:"members"`
}
if json.Unmarshal([]byte(out), &ml) != nil {
return false, nil
}
total, voters := 0, 0
for _, m := range ml.Members {
total++
if !m.IsLearner {
voters++
}
}
return total == 2 && voters == 2, nil
})
}

func waitPCSState(ctx context.Context, want *bool, peer string, to time.Duration) error {
return wait.PollUntilContextTimeout(ctx, poll, to, true, func(context.Context) (bool, error) {
out, _, err := exec.Execute(ctx, `LC_ALL=C /usr/sbin/pcs status nodes`)
if err != nil {
return false, nil // treat as transient
}

// Find the "Online:" line and build a set of names listed there.
var onlineLine string
for _, ln := range strings.Split(out, "\n") {
s := strings.TrimSpace(ln)
if strings.HasPrefix(s, "Online:") {
onlineLine = strings.TrimSpace(strings.TrimPrefix(s, "Online:"))
break
}
}
peerOnline := false
if onlineLine != "" {
for _, tok := range strings.Fields(onlineLine) {
if strings.Trim(tok, "[],") == peer {
peerOnline = true
break
}
}
}
return peerOnline == *want, nil
})
}

func boolp(b bool) *bool { return &b }
15 changes: 9 additions & 6 deletions pkg/tnf/operator/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"bytes"
"context"
"fmt"
operatorv1informers "github.com/openshift/client-go/operator/informers/externalversions/operator/v1"
"os"
"sync"

operatorv1informers "github.com/openshift/client-go/operator/informers/externalversions/operator/v1"

operatorv1 "github.com/openshift/api/operator/v1"
configv1informers "github.com/openshift/client-go/config/informers/externalversions"
v1 "github.com/openshift/client-go/config/informers/externalversions/config/v1"
Expand Down Expand Up @@ -59,8 +60,8 @@ func HandleDualReplicaClusters(ctx context.Context,
controlPlaneNodeInformer cache.SharedIndexInformer,
etcdInformer operatorv1informers.EtcdInformer,
kubeClient kubernetes.Interface,
dynamicClient dynamic.Interface) (bool, error) {

dynamicClient dynamic.Interface,
) (bool, error) {
if isDualReplicaTopology, err := isDualReplicaTopoly(ctx, featureGateAccessor, configInformers); err != nil {
return false, err
} else if !isDualReplicaTopology {
Expand Down Expand Up @@ -104,6 +105,7 @@ func HandleDualReplicaClusters(ctx context.Context,
for _, node := range nodeList {
runJobController(ctx, tools.JobTypeAuth, &node.Name, controllerContext, operatorClient, kubeClient, kubeInformersForNamespaces)
runJobController(ctx, tools.JobTypeAfterSetup, &node.Name, controllerContext, operatorClient, kubeClient, kubeInformersForNamespaces)
runJobController(ctx, tools.JobTypeDisruptiveValidate, &node.Name, controllerContext, operatorClient, kubeClient, kubeInformersForNamespaces)
}
runJobController(ctx, tools.JobTypeSetup, nil, controllerContext, operatorClient, kubeClient, kubeInformersForNamespaces)
runJobController(ctx, tools.JobTypeFencing, nil, controllerContext, operatorClient, kubeClient, kubeInformersForNamespaces)
Expand Down Expand Up @@ -162,8 +164,8 @@ func runExternalEtcdSupportController(ctx context.Context,
networkInformer v1.NetworkInformer,
controlPlaneNodeInformer cache.SharedIndexInformer,
etcdInformer operatorv1informers.EtcdInformer,
kubeClient kubernetes.Interface) {

kubeClient kubernetes.Interface,
) {
klog.Infof("starting external etcd support controller")
externalEtcdSupportController := externaletcdsupportcontroller.NewExternalEtcdEnablerController(
operatorClient,
Expand Down Expand Up @@ -225,7 +227,8 @@ func runJobController(ctx context.Context, jobType tools.JobType, nodeName *stri
job.Spec.Template.Spec.Containers[0].Image = os.Getenv("OPERATOR_IMAGE")
job.Spec.Template.Spec.Containers[0].Command[1] = jobType.GetSubCommand()
return nil
}}...,
},
}...,
)
go tnfJobController.Run(ctx, 1)
}
Expand Down
7 changes: 1 addition & 6 deletions pkg/tnf/pkg/pcs/fencing.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ import (
"github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools"
)

var (
addressRegEx = regexp.MustCompile(`.*//(.*):(.*)(/redfish.*)`)
)
var addressRegEx = regexp.MustCompile(`.*//(.*):(.*)(/redfish.*)`)

const (
// defaultPcmkDelayBase is the delay applied to the first fence device to prevent simultaneous fencing
Expand Down Expand Up @@ -117,11 +115,9 @@ func ConfigureFencing(ctx context.Context, kubeClient kubernetes.Interface, cfg

klog.Info("Fencing configuration succeeded!")
return nil

}

func getFencingConfig(nodeName string, secret *corev1.Secret) (*fencingConfig, error) {

address := string(secret.Data["address"])
if !strings.Contains(address, "redfish") {
klog.Errorf("Secret %s does not contain redfish address", secret.Name)
Expand Down Expand Up @@ -184,7 +180,6 @@ func getStatusCommand(fc fencingConfig) string {
}

func getStonithCommand(sc StonithConfig, fc fencingConfig) string {

stonithAction := "create"
// check if device already exists
for _, p := range sc.Primitives {
Expand Down
3 changes: 3 additions & 0 deletions pkg/tnf/pkg/tools/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
JobTypeSetup
JobTypeAfterSetup
JobTypeFencing
JobTypeDisruptiveValidate
)

func (t JobType) GetSubCommand() string {
Expand All @@ -38,6 +39,8 @@ func (t JobType) GetSubCommand() string {
return "after-setup"
case JobTypeFencing:
return "fencing"
case JobTypeDisruptiveValidate:
return "disruptive-validate"
default:
return ""
}
Expand Down
1 change: 0 additions & 1 deletion pkg/tnf/setup/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
)

func RunTnfSetup() error {

klog.Info("Setting up clients etc. for TNF setup")

clientConfig, err := rest.InClusterConfig()
Expand Down