Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions cmd/tnf-setup-runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
tnfauth "github.com/openshift/cluster-etcd-operator/pkg/tnf/auth"
tnffencing "github.com/openshift/cluster-etcd-operator/pkg/tnf/fencing"
"github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools"
tnfrestartetcd "github.com/openshift/cluster-etcd-operator/pkg/tnf/restart-etcd"
tnfsetup "github.com/openshift/cluster-etcd-operator/pkg/tnf/setup"
)

Expand Down Expand Up @@ -58,6 +59,7 @@ func NewTnfSetupRunnerCommand() *cobra.Command {
cmd.AddCommand(NewSetupCommand())
cmd.AddCommand(NewAfterSetupCommand())
cmd.AddCommand(NewFencingCommand())
cmd.AddCommand(NewRestartEtcdCommand())

return cmd
}
Expand Down Expand Up @@ -113,3 +115,16 @@ func NewFencingCommand() *cobra.Command {
},
}
}

func NewRestartEtcdCommand() *cobra.Command {
return &cobra.Command{
Use: tools.JobTypeRestartEtcd.GetSubCommand(),
Short: "Run restart etcd steps for cert change",
Run: func(cmd *cobra.Command, args []string) {
err := tnfrestartetcd.RunEtcdRestart()
if err != nil {
klog.Fatal(err)
}
},
}
}
128 changes: 128 additions & 0 deletions pkg/tnf/operator/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/openshift/cluster-etcd-operator/pkg/operator/ceohelpers"
"github.com/openshift/cluster-etcd-operator/pkg/operator/externaletcdsupportcontroller"
"github.com/openshift/cluster-etcd-operator/pkg/operator/operatorclient"
"github.com/openshift/cluster-etcd-operator/pkg/tlshelpers"
"github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/jobs"
"github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools"
)
Expand All @@ -45,6 +46,10 @@ var (
fencingUpdateTriggered bool
// fencingUpdateMutex is used to make usage of fencingUpdateTriggered thread safe
fencingUpdateMutex sync.Mutex
// etcdCertUpdateTriggered is set to true when a etcd cert update is already triggered
etcdCertUpdateTriggered bool
// etcdCertUpdateMutex is used to make usage of etcdCertUpdateTriggered thread safe
etcdCertUpdateMutex sync.Mutex
)

// HandleDualReplicaClusters checks feature gate and control plane topology,
Expand Down Expand Up @@ -105,6 +110,7 @@ func HandleDualReplicaClusters(
},
UpdateFunc: func(oldObj, newObj interface{}) {
handleFencingSecretChange(ctx, kubeClient, oldObj, newObj)
handleEtcdCertChange(ctx, controllerContext, operatorClient, kubeInformersForNamespaces, controlPlaneNodeLister, kubeClient, oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
handleFencingSecretChange(ctx, kubeClient, nil, obj)
Expand Down Expand Up @@ -275,6 +281,128 @@ func runJobController(ctx context.Context, jobType tools.JobType, nodeName *stri
go tnfJobController.Run(ctx, 1)
}

func handleEtcdCertChange(ctx context.Context, controllerContext *controllercmd.ControllerContext, operatorClient v1helpers.StaticPodOperatorClient, kubeInformersForNamespaces v1helpers.KubeInformersForNamespaces, controlPlaneNodeLister corev1listers.NodeLister, client kubernetes.Interface, oldObj, obj interface{}) {
secret, ok := obj.(*corev1.Secret)
if !ok {
klog.Warningf("failed to convert added / modified / deleted object to Secret %+v", obj)
return
}

if secret.GetName() != tlshelpers.EtcdAllCertsSecretName {
return
}

var oldSecret *corev1.Secret
if oldObj != nil {
oldSecret, ok = oldObj.(*corev1.Secret)
if !ok {
klog.Warningf("failed to convert old object to Secret %+v", oldObj)
return
}
} else {
// Nothing to do, no old cert was found, things should have progressed as normal
// Might need to revisit this in the future if some edge case is found
return
}

certsChanged := false
for key, oldValue := range oldSecret.Data {
newValue, exists := secret.Data[key]
if !exists || !bytes.Equal(oldValue, newValue) {
klog.Infof("etcd certs changed, restarting etcd")
certsChanged = true
break
}
}
if !certsChanged {
klog.Infof("etcd certs did not change, skipping restart of podman-etcd")
return
}
klog.Infof("etcd certs changed, restarting etcd")

etcdCertUpdateMutex.Lock()
if etcdCertUpdateTriggered {
klog.Infof("etcd cert update triggered already, skipping recreation of etcd job for secret %s", secret.GetName())
etcdCertUpdateMutex.Unlock()
return
}
etcdCertUpdateTriggered = true
etcdCertUpdateMutex.Unlock()

defer func() {
etcdCertUpdateMutex.Lock()
etcdCertUpdateTriggered = false
etcdCertUpdateMutex.Unlock()
}()

nodeList, err := controlPlaneNodeLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list control plane nodes while waiting to create TNF jobs: %v", err)
return
}
if len(nodeList) != 2 {
klog.Info("not starting TNF jobs yet, waiting for 2 control plane nodes to exist")
return
}

for _, node := range nodeList {
runJobController(ctx, tools.JobTypeRestartEtcd, &node.Name, controllerContext, operatorClient, client, kubeInformersForNamespaces)
}
Comment on lines +348 to +350
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid spawning restart job controllers on every secret event

runJobController spins up a long-lived controller (go ...Run(ctx, 1)). Calling it inside the cert-change handler means every secret update (and even re-list events) will start another controller instance per node, leading to unbounded goroutines and duplicated informers fighting over the same tnf-restart-etcd-job-* resources. Please start these controllers once (e.g. alongside the other TNF job controllers) and let the handler only manage job lifecycle triggers.


jobsFound := false
jobsDone := map[string]bool{}

// helper func for waiting for a running job
// finished = Complete, Failed, or not found
isResrtartJobFinished := func(context.Context) (finished bool, returnErr error) {
var err error
jobsFound = false
jobs, err := client.BatchV1().Jobs(operatorclient.TargetNamespace).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("app.kubernetes.io/name=%s", tools.JobTypeRestartEtcd.GetNameLabelValue()),
})
if err != nil {
if apierrors.IsNotFound(err) {
return true, nil
}
klog.Errorf("failed to get fencing job, will retry: %v", err)
return false, nil
}
jobsFound = true
for _, job := range jobs.Items {
if tools.IsConditionTrue(job.Status.Conditions, batchv1.JobComplete) || tools.IsConditionTrue(job.Status.Conditions, batchv1.JobFailed) {
jobsDone[job.Name] = true
}
}
if len(jobsDone) == len(nodeList) {
return true, nil
}
klog.Infof("fencing job still running, skipping recreation for now, will retry")
return false, nil
}

// wait as long as the fencing job waits as well, plus some execution time
err = wait.PollUntilContextTimeout(ctx, tools.JobPollIntervall, tools.RestartEtcdJobCompletedTimeout, true, isResrtartJobFinished)
if err != nil {
// if we set timeouts right, this should not happen...
klog.Errorf("timed out waiting for fencing job to complete: %v", err)
return
}

if !jobsFound {
klog.Errorf("fencing job not found, nothing to do")
return
}

klog.Info("deleting fencing job for recreation")
for jobName, _ := range jobsDone {
err = client.BatchV1().Jobs(operatorclient.TargetNamespace).Delete(ctx, jobName, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
// TODO how to trigger a retry here...
klog.Errorf("failed to delete fencing job: %v", err)
}
}
}

func handleFencingSecretChange(ctx context.Context, client kubernetes.Interface, oldObj, obj interface{}) {
secret, ok := obj.(*corev1.Secret)
if !ok {
Expand Down
22 changes: 22 additions & 0 deletions pkg/tnf/pkg/pcs/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,28 @@ func ConfigureEtcd(ctx context.Context, cfg config.ClusterConfig) error {
return nil
}

func RestartEtcd(ctx context.Context) error {
klog.Info("Checking pcs resources")

stdOut, stdErr, err := exec.Execute(ctx, "/usr/sbin/pcs resource status")
if err != nil || len(stdErr) > 0 {
klog.Error(err, "Failed to get pcs resource status", "stdout", stdOut, "stderr", stdErr, "err", err)
return err
}
if !strings.Contains(stdOut, "etcd") {
klog.Info("etcd resource not found, nothing to do")
return nil
}

klog.Info("Restarting etcd")
stdOut, stdErr, err = exec.Execute(ctx, "/usr/sbin/pcs resource restart etcd")
if err != nil || len(stdErr) > 0 {
klog.Error(err, "Failed to restart etcd", "stdout", stdOut, "stderr", stdErr, "err", err)
return err
}
return nil
}

// ConfigureConstraints configures the etcd constraints
func ConfigureConstraints(ctx context.Context) (bool, error) {
klog.Info("Checking pcs constraints")
Expand Down
12 changes: 8 additions & 4 deletions pkg/tnf/pkg/tools/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import (
// setup job: waits for auth jobs to complete
// after setup jobs: waits for setup job to complete
const (
JobPollIntervall = 15 * time.Second
AuthJobCompletedTimeout = 10 * time.Minute
SetupJobCompletedTimeout = 20 * time.Minute
FencingJobCompletedTimeout = 25 * time.Minute
JobPollIntervall = 15 * time.Second
AuthJobCompletedTimeout = 10 * time.Minute
SetupJobCompletedTimeout = 20 * time.Minute
FencingJobCompletedTimeout = 25 * time.Minute
RestartEtcdJobCompletedTimeout = 10 * time.Minute
)

// JobType represent the different jobs we run, with some methods needed
Expand All @@ -26,6 +27,7 @@ const (
JobTypeSetup
JobTypeAfterSetup
JobTypeFencing
JobTypeRestartEtcd
)

func (t JobType) GetSubCommand() string {
Expand All @@ -38,6 +40,8 @@ func (t JobType) GetSubCommand() string {
return "after-setup"
case JobTypeFencing:
return "fencing"
case JobTypeRestartEtcd:
return "restart-etcd"
default:
return ""
}
Expand Down
23 changes: 23 additions & 0 deletions pkg/tnf/restart-etcd/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package restartetcd

import (
"context"

"k8s.io/klog/v2"

"github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/pcs"
)

func RunEtcdRestart() error {
klog.Info("Running TNF etcd restart")
ctx := context.Background()

err := pcs.RestartEtcd(ctx)
if err != nil {
return err
}

klog.Infof("Etcd restart done!")

return nil
}