diff --git a/cmd/tnf-setup-runner/main.go b/cmd/tnf-setup-runner/main.go index 8452dad92..fbf6b2568 100644 --- a/cmd/tnf-setup-runner/main.go +++ b/cmd/tnf-setup-runner/main.go @@ -19,6 +19,7 @@ import ( tnfauth "github.com/openshift/cluster-etcd-operator/pkg/tnf/auth" tnffencing "github.com/openshift/cluster-etcd-operator/pkg/tnf/fencing" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools" + tnfrestartetcd "github.com/openshift/cluster-etcd-operator/pkg/tnf/restart-etcd" tnfsetup "github.com/openshift/cluster-etcd-operator/pkg/tnf/setup" ) @@ -58,6 +59,7 @@ func NewTnfSetupRunnerCommand() *cobra.Command { cmd.AddCommand(NewSetupCommand()) cmd.AddCommand(NewAfterSetupCommand()) cmd.AddCommand(NewFencingCommand()) + cmd.AddCommand(NewRestartEtcdCommand()) return cmd } @@ -113,3 +115,16 @@ func NewFencingCommand() *cobra.Command { }, } } + +func NewRestartEtcdCommand() *cobra.Command { + return &cobra.Command{ + Use: tools.JobTypeRestartEtcd.GetSubCommand(), + Short: "Run restart etcd steps for cert change", + Run: func(cmd *cobra.Command, args []string) { + err := tnfrestartetcd.RunEtcdRestart() + if err != nil { + klog.Fatal(err) + } + }, + } +} diff --git a/pkg/tnf/operator/starter.go b/pkg/tnf/operator/starter.go index a40681f88..037d899da 100644 --- a/pkg/tnf/operator/starter.go +++ b/pkg/tnf/operator/starter.go @@ -36,6 +36,7 @@ import ( "github.com/openshift/cluster-etcd-operator/pkg/operator/ceohelpers" "github.com/openshift/cluster-etcd-operator/pkg/operator/externaletcdsupportcontroller" "github.com/openshift/cluster-etcd-operator/pkg/operator/operatorclient" + "github.com/openshift/cluster-etcd-operator/pkg/tlshelpers" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/jobs" "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/tools" ) @@ -45,6 +46,10 @@ var ( fencingUpdateTriggered bool // fencingUpdateMutex is used to make usage of fencingUpdateTriggered thread safe fencingUpdateMutex sync.Mutex + // etcdCertUpdateTriggered is set to true when a etcd cert update is already triggered + etcdCertUpdateTriggered bool + // etcdCertUpdateMutex is used to make usage of etcdCertUpdateTriggered thread safe + etcdCertUpdateMutex sync.Mutex ) // HandleDualReplicaClusters checks feature gate and control plane topology, @@ -105,6 +110,7 @@ func HandleDualReplicaClusters( }, UpdateFunc: func(oldObj, newObj interface{}) { handleFencingSecretChange(ctx, kubeClient, oldObj, newObj) + handleEtcdCertChange(ctx, controllerContext, operatorClient, kubeInformersForNamespaces, controlPlaneNodeLister, kubeClient, oldObj, newObj) }, DeleteFunc: func(obj interface{}) { handleFencingSecretChange(ctx, kubeClient, nil, obj) @@ -275,6 +281,128 @@ func runJobController(ctx context.Context, jobType tools.JobType, nodeName *stri go tnfJobController.Run(ctx, 1) } +func handleEtcdCertChange(ctx context.Context, controllerContext *controllercmd.ControllerContext, operatorClient v1helpers.StaticPodOperatorClient, kubeInformersForNamespaces v1helpers.KubeInformersForNamespaces, controlPlaneNodeLister corev1listers.NodeLister, client kubernetes.Interface, oldObj, obj interface{}) { + secret, ok := obj.(*corev1.Secret) + if !ok { + klog.Warningf("failed to convert added / modified / deleted object to Secret %+v", obj) + return + } + + if secret.GetName() != tlshelpers.EtcdAllCertsSecretName { + return + } + + var oldSecret *corev1.Secret + if oldObj != nil { + oldSecret, ok = oldObj.(*corev1.Secret) + if !ok { + klog.Warningf("failed to convert old object to Secret %+v", oldObj) + return + } + } else { + // Nothing to do, no old cert was found, things should have progressed as normal + // Might need to revisit this in the future if some edge case is found + return + } + + certsChanged := false + for key, oldValue := range oldSecret.Data { + newValue, exists := secret.Data[key] + if !exists || !bytes.Equal(oldValue, newValue) { + klog.Infof("etcd certs changed, restarting etcd") + certsChanged = true + break + } + } + if !certsChanged { + klog.Infof("etcd certs did not change, skipping restart of podman-etcd") + return + } + klog.Infof("etcd certs changed, restarting etcd") + + etcdCertUpdateMutex.Lock() + if etcdCertUpdateTriggered { + klog.Infof("etcd cert update triggered already, skipping recreation of etcd job for secret %s", secret.GetName()) + etcdCertUpdateMutex.Unlock() + return + } + etcdCertUpdateTriggered = true + etcdCertUpdateMutex.Unlock() + + defer func() { + etcdCertUpdateMutex.Lock() + etcdCertUpdateTriggered = false + etcdCertUpdateMutex.Unlock() + }() + + nodeList, err := controlPlaneNodeLister.List(labels.Everything()) + if err != nil { + klog.Errorf("failed to list control plane nodes while waiting to create TNF jobs: %v", err) + return + } + if len(nodeList) != 2 { + klog.Info("not starting TNF jobs yet, waiting for 2 control plane nodes to exist") + return + } + + for _, node := range nodeList { + runJobController(ctx, tools.JobTypeRestartEtcd, &node.Name, controllerContext, operatorClient, client, kubeInformersForNamespaces) + } + + jobsFound := false + jobsDone := map[string]bool{} + + // helper func for waiting for a running job + // finished = Complete, Failed, or not found + isResrtartJobFinished := func(context.Context) (finished bool, returnErr error) { + var err error + jobsFound = false + jobs, err := client.BatchV1().Jobs(operatorclient.TargetNamespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("app.kubernetes.io/name=%s", tools.JobTypeRestartEtcd.GetNameLabelValue()), + }) + if err != nil { + if apierrors.IsNotFound(err) { + return true, nil + } + klog.Errorf("failed to get fencing job, will retry: %v", err) + return false, nil + } + jobsFound = true + for _, job := range jobs.Items { + if tools.IsConditionTrue(job.Status.Conditions, batchv1.JobComplete) || tools.IsConditionTrue(job.Status.Conditions, batchv1.JobFailed) { + jobsDone[job.Name] = true + } + } + if len(jobsDone) == len(nodeList) { + return true, nil + } + klog.Infof("fencing job still running, skipping recreation for now, will retry") + return false, nil + } + + // wait as long as the fencing job waits as well, plus some execution time + err = wait.PollUntilContextTimeout(ctx, tools.JobPollIntervall, tools.RestartEtcdJobCompletedTimeout, true, isResrtartJobFinished) + if err != nil { + // if we set timeouts right, this should not happen... + klog.Errorf("timed out waiting for fencing job to complete: %v", err) + return + } + + if !jobsFound { + klog.Errorf("fencing job not found, nothing to do") + return + } + + klog.Info("deleting fencing job for recreation") + for jobName, _ := range jobsDone { + err = client.BatchV1().Jobs(operatorclient.TargetNamespace).Delete(ctx, jobName, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + // TODO how to trigger a retry here... + klog.Errorf("failed to delete fencing job: %v", err) + } + } +} + func handleFencingSecretChange(ctx context.Context, client kubernetes.Interface, oldObj, obj interface{}) { secret, ok := obj.(*corev1.Secret) if !ok { diff --git a/pkg/tnf/pkg/pcs/etcd.go b/pkg/tnf/pkg/pcs/etcd.go index 2e2ebe3f8..71a5fd0b6 100644 --- a/pkg/tnf/pkg/pcs/etcd.go +++ b/pkg/tnf/pkg/pcs/etcd.go @@ -33,6 +33,28 @@ func ConfigureEtcd(ctx context.Context, cfg config.ClusterConfig) error { return nil } +func RestartEtcd(ctx context.Context) error { + klog.Info("Checking pcs resources") + + stdOut, stdErr, err := exec.Execute(ctx, "/usr/sbin/pcs resource status") + if err != nil || len(stdErr) > 0 { + klog.Error(err, "Failed to get pcs resource status", "stdout", stdOut, "stderr", stdErr, "err", err) + return err + } + if !strings.Contains(stdOut, "etcd") { + klog.Info("etcd resource not found, nothing to do") + return nil + } + + klog.Info("Restarting etcd") + stdOut, stdErr, err = exec.Execute(ctx, "/usr/sbin/pcs resource restart etcd") + if err != nil || len(stdErr) > 0 { + klog.Error(err, "Failed to restart etcd", "stdout", stdOut, "stderr", stdErr, "err", err) + return err + } + return nil +} + // ConfigureConstraints configures the etcd constraints func ConfigureConstraints(ctx context.Context) (bool, error) { klog.Info("Checking pcs constraints") diff --git a/pkg/tnf/pkg/tools/jobs.go b/pkg/tnf/pkg/tools/jobs.go index 793c51bc9..df3ecaf11 100644 --- a/pkg/tnf/pkg/tools/jobs.go +++ b/pkg/tnf/pkg/tools/jobs.go @@ -11,10 +11,11 @@ import ( // setup job: waits for auth jobs to complete // after setup jobs: waits for setup job to complete const ( - JobPollIntervall = 15 * time.Second - AuthJobCompletedTimeout = 10 * time.Minute - SetupJobCompletedTimeout = 20 * time.Minute - FencingJobCompletedTimeout = 25 * time.Minute + JobPollIntervall = 15 * time.Second + AuthJobCompletedTimeout = 10 * time.Minute + SetupJobCompletedTimeout = 20 * time.Minute + FencingJobCompletedTimeout = 25 * time.Minute + RestartEtcdJobCompletedTimeout = 10 * time.Minute ) // JobType represent the different jobs we run, with some methods needed @@ -26,6 +27,7 @@ const ( JobTypeSetup JobTypeAfterSetup JobTypeFencing + JobTypeRestartEtcd ) func (t JobType) GetSubCommand() string { @@ -38,6 +40,8 @@ func (t JobType) GetSubCommand() string { return "after-setup" case JobTypeFencing: return "fencing" + case JobTypeRestartEtcd: + return "restart-etcd" default: return "" } diff --git a/pkg/tnf/restart-etcd/runner.go b/pkg/tnf/restart-etcd/runner.go new file mode 100644 index 000000000..1146e4c9d --- /dev/null +++ b/pkg/tnf/restart-etcd/runner.go @@ -0,0 +1,23 @@ +package restartetcd + +import ( + "context" + + "k8s.io/klog/v2" + + "github.com/openshift/cluster-etcd-operator/pkg/tnf/pkg/pcs" +) + +func RunEtcdRestart() error { + klog.Info("Running TNF etcd restart") + ctx := context.Background() + + err := pcs.RestartEtcd(ctx) + if err != nil { + return err + } + + klog.Infof("Etcd restart done!") + + return nil +}