diff --git a/pkg/operator/staticpod/certsyncpod/certsync_controller.go b/pkg/operator/staticpod/certsyncpod/certsync_controller.go index 111776d994..e2fa7474ed 100644 --- a/pkg/operator/staticpod/certsyncpod/certsync_controller.go +++ b/pkg/operator/staticpod/certsyncpod/certsync_controller.go @@ -2,6 +2,7 @@ package certsyncpod import ( "context" + "fmt" "os" "path/filepath" "reflect" @@ -17,10 +18,13 @@ import ( "github.com/openshift/library-go/pkg/controller/factory" "github.com/openshift/library-go/pkg/operator/events" - "github.com/openshift/library-go/pkg/operator/staticpod" "github.com/openshift/library-go/pkg/operator/staticpod/controller/installer" + "github.com/openshift/library-go/pkg/operator/staticpod/internal/atomicdir" + "github.com/openshift/library-go/pkg/operator/staticpod/internal/dirutils" ) +const stagingDirUID = "cert-sync" + type CertSyncController struct { destinationDir string namespace string @@ -60,15 +64,32 @@ func NewCertSyncController(targetDir, targetNamespace string, configmaps, secret ) } +func getStagingDir(targetDir string) string { + return filepath.Join(targetDir, "staging", stagingDirUID) +} + func getConfigMapDir(targetDir, configMapName string) string { return filepath.Join(targetDir, "configmaps", configMapName) } +func getConfigMapStagingDir(targetDir, secretName string) string { + return filepath.Join(getStagingDir(targetDir), "configmaps", secretName) +} + func getSecretDir(targetDir, secretName string) string { return filepath.Join(targetDir, "secrets", secretName) } +func getSecretStagingDir(targetDir, secretName string) string { + return filepath.Join(getStagingDir(targetDir), "secrets", secretName) +} + func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncContext) error { + if err := dirutils.RemoveContent(getStagingDir(c.destinationDir)); err != nil { + c.eventRecorder.Warningf("CertificateUpdateFailed", fmt.Sprintf("Failed to prune staging directory: %v", err)) + return err + } + errors := []error{} klog.Infof("Syncing configmaps: %v", c.configMaps) @@ -114,8 +135,9 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte } contentDir := getConfigMapDir(c.destinationDir, cm.Name) + stagingDir := getConfigMapStagingDir(c.destinationDir, cm.Name) - data := map[string]string{} + data := make(map[string]string, len(configMap.Data)) for filename := range configMap.Data { fullFilename := filepath.Join(contentDir, filename) @@ -152,27 +174,11 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte continue } - klog.Infof("Creating directory %q ...", contentDir) - if err := os.MkdirAll(contentDir, 0755); err != nil && !os.IsExist(err) { - c.eventRecorder.Warningf("CertificateUpdateFailed", "Failed creating directory for configmap: %s/%s: %v", configMap.Namespace, configMap.Name, err) - errors = append(errors, err) - continue - } - for filename, content := range configMap.Data { - fullFilename := filepath.Join(contentDir, filename) - // if the existing is the same, do nothing - if reflect.DeepEqual(data[fullFilename], content) { - continue - } - - klog.Infof("Writing configmap manifest %q ...", fullFilename) - if err := staticpod.WriteFileAtomic([]byte(content), 0644, fullFilename); err != nil { - c.eventRecorder.Warningf("CertificateUpdateFailed", "Failed writing file for configmap: %s/%s: %v", configMap.Namespace, configMap.Name, err) - errors = append(errors, err) - continue - } + files := make(map[string][]byte, len(configMap.Data)) + for k, v := range configMap.Data { + files[k] = []byte(v) } - c.eventRecorder.Eventf("CertificateUpdated", "Wrote updated configmap: %s/%s", configMap.Namespace, configMap.Name) + errors = append(errors, syncDirectory(c.eventRecorder, "configmap", configMap.ObjectMeta, contentDir, 0755, stagingDir, files, 0644)) } klog.Infof("Syncing secrets: %v", c.secrets) @@ -219,8 +225,9 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte } contentDir := getSecretDir(c.destinationDir, s.Name) + stagingDir := getSecretStagingDir(c.destinationDir, s.Name) - data := map[string][]byte{} + data := make(map[string][]byte, len(secret.Data)) for filename := range secret.Data { fullFilename := filepath.Join(contentDir, filename) @@ -257,29 +264,22 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte continue } - klog.Infof("Creating directory %q ...", contentDir) - if err := os.MkdirAll(contentDir, 0755); err != nil && !os.IsExist(err) { - c.eventRecorder.Warningf("CertificateUpdateFailed", "Failed creating directory for secret: %s/%s: %v", secret.Namespace, secret.Name, err) - errors = append(errors, err) - continue - } - for filename, content := range secret.Data { - // TODO fix permissions - fullFilename := filepath.Join(contentDir, filename) - // if the existing is the same, do nothing - if reflect.DeepEqual(data[fullFilename], content) { - continue - } - - klog.Infof("Writing secret manifest %q ...", fullFilename) - if err := staticpod.WriteFileAtomic(content, 0600, fullFilename); err != nil { - c.eventRecorder.Warningf("CertificateUpdateFailed", "Failed writing file for secret: %s/%s: %v", secret.Namespace, secret.Name, err) - errors = append(errors, err) - continue - } - } - c.eventRecorder.Eventf("CertificateUpdated", "Wrote updated secret: %s/%s", secret.Namespace, secret.Name) + errors = append(errors, syncDirectory(c.eventRecorder, "secret", secret.ObjectMeta, contentDir, 0700, stagingDir, data, 0600)) } - return utilerrors.NewAggregate(errors) } + +func syncDirectory( + eventRecorder events.Recorder, + typeName string, o metav1.ObjectMeta, + targetDir string, targetDirPerm os.FileMode, stagingDir string, + files map[string][]byte, filePerm os.FileMode, +) error { + if err := atomicdir.Sync(targetDir, targetDirPerm, stagingDir, files, filePerm); err != nil { + err = fmt.Errorf("failed to sync %s %s/%s (directory %q): %w", typeName, o.Name, o.Namespace, targetDir, err) + eventRecorder.Warning("CertificateUpdateFailed", err.Error()) + return err + } + eventRecorder.Eventf("CertificateUpdated", "Wrote updated %s: %s/%s", typeName, o.Namespace, o.Name) + return nil +} diff --git a/pkg/operator/staticpod/certsyncpod/certsync_controller_linux_test.go b/pkg/operator/staticpod/certsyncpod/certsync_controller_linux_test.go new file mode 100644 index 0000000000..ce33433a77 --- /dev/null +++ b/pkg/operator/staticpod/certsyncpod/certsync_controller_linux_test.go @@ -0,0 +1,155 @@ +//go:build linux + +package certsyncpod + +import ( + "bytes" + "context" + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "math/big" + "os" + "path/filepath" + "sync" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/server/dynamiccertificates" + + "github.com/openshift/library-go/pkg/operator/staticpod/internal/atomicdir" +) + +// TestDynamicCertificates makes sure the receiving side of certificate synchronization works as expected. +// It reads and watches the certificates being synchronized in the same way as e.g. kube-apiserver, +// the very same libraries are being used. +func TestDynamicCertificates(t *testing.T) { + const typeName = "secret" + om := metav1.ObjectMeta{ + Namespace: "openshift-kube-apiserver", + Name: "s1", + } + + // Generate all necessary keypairs. + tlsCert, tlsKey := generateKeypair(t) + tlsCertUpdated, tlsKeyUpdated := generateKeypair(t) + + // Write the keypair into a secret directory. + secretDir := filepath.Join(t.TempDir(), "secrets", om.Name) + stagingDir := filepath.Join(t.TempDir(), "staging", stagingDirUID, "secrets", om.Name) + certFile := filepath.Join(secretDir, "tls.crt") + keyFile := filepath.Join(secretDir, "tls.key") + + if err := os.MkdirAll(secretDir, 0700); err != nil { + t.Fatalf("Failed to create secret directory %q: %v", secretDir, err) + } + if err := os.WriteFile(certFile, tlsCert, 0600); err != nil { + t.Fatalf("Failed to write TLS certificate into %q: %v", certFile, err) + } + if err := os.WriteFile(keyFile, tlsKey, 0600); err != nil { + t.Fatalf("Failed to write TLS key into %q: %v", keyFile, err) + } + + // Start the watcher. + // This reads the keypair synchronously so the initial state is loaded here. + dc, err := dynamiccertificates.NewDynamicServingContentFromFiles("localhost TLS", certFile, keyFile) + if err != nil { + t.Fatalf("Failed to init dynamic certificate: %v", err) + } + + // Check the initial keypair is loaded. + cert, key := dc.CurrentCertKeyContent() + if !bytes.Equal(cert, tlsCert) || !bytes.Equal(key, tlsKey) { + t.Fatal("Unexpected initial keypair loaded") + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + dc.Run(ctx, 1) + }() + defer wg.Wait() + defer cancel() + + // Poll until update detected. + files := map[string][]byte{ + "tls.crt": tlsCertUpdated, + "tls.key": tlsKeyUpdated, + } + err = wait.PollUntilContextCancel(ctx, 250*time.Millisecond, true, func(ctx context.Context) (bool, error) { + // Replace the secret directory. + if err := atomicdir.Sync(secretDir, 0700, stagingDir, files, 0600); err != nil { + t.Errorf("Failed to write files: %v", err) + return false, err + } + + // Check the loaded content matches. + // This is most probably updated based on write in a previous Poll invocation. + cert, key := dc.CurrentCertKeyContent() + return bytes.Equal(cert, tlsCertUpdated) && bytes.Equal(key, tlsKeyUpdated), nil + }) + if err != nil { + t.Fatalf("Failed to wait for dynamic certificate: %v", err) + } +} + +// generateKeypair returns (cert, key). +func generateKeypair(t *testing.T) ([]byte, []byte) { + t.Helper() + + privateKey, err := ecdsa.GenerateKey(elliptic.P224(), rand.Reader) + if err != nil { + t.Fatalf("Failed to generate TLS key: %v", err) + } + + notBefore := time.Now() + notAfter := notBefore.Add(1 * time.Hour) + + serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128) + serialNumber, err := rand.Int(rand.Reader, serialNumberLimit) + if err != nil { + t.Fatalf("Failed to generate serial number for TLS keypair: %v", err) + } + + template := x509.Certificate{ + SerialNumber: serialNumber, + Subject: pkix.Name{ + Organization: []string{"Example Org"}, + }, + NotBefore: notBefore, + NotAfter: notAfter, + KeyUsage: x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + BasicConstraintsValid: true, + DNSNames: []string{"example.com"}, + } + + publicKeyBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &privateKey.PublicKey, privateKey) + if err != nil { + t.Fatalf("Failed to create TLS certificate: %v", err) + } + + var certOut bytes.Buffer + if err := pem.Encode(&certOut, &pem.Block{Type: "CERTIFICATE", Bytes: publicKeyBytes}); err != nil { + t.Fatalf("Failed to write certificate PEM: %v", err) + } + + privateKeyBytes, err := x509.MarshalPKCS8PrivateKey(privateKey) + if err != nil { + t.Fatalf("Unable to marshal private key: %v", err) + } + + var keyOut bytes.Buffer + if err := pem.Encode(&keyOut, &pem.Block{Type: "PRIVATE KEY", Bytes: privateKeyBytes}); err != nil { + t.Fatalf("Failed to write certificate PEM: %v", err) + } + + return certOut.Bytes(), keyOut.Bytes() +} diff --git a/pkg/operator/staticpod/installerpod/cmd.go b/pkg/operator/staticpod/installerpod/cmd.go index 31afadff86..09fe1f5ff3 100644 --- a/pkg/operator/staticpod/installerpod/cmd.go +++ b/pkg/operator/staticpod/installerpod/cmd.go @@ -5,38 +5,40 @@ import ( "fmt" "os" "path" + "path/filepath" "sort" "strconv" "strings" "time" - "k8s.io/utils/clock" - - "k8s.io/apimachinery/pkg/util/wait" - "github.com/blang/semver/v4" "github.com/davecgh/go-spew/spew" "github.com/spf13/cobra" "github.com/spf13/pflag" - "k8s.io/klog/v2" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/server" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/klog/v2" + "k8s.io/utils/clock" "github.com/openshift/library-go/pkg/config/client" "github.com/openshift/library-go/pkg/operator/events" "github.com/openshift/library-go/pkg/operator/resource/resourceread" "github.com/openshift/library-go/pkg/operator/resource/retry" - "github.com/openshift/library-go/pkg/operator/staticpod" "github.com/openshift/library-go/pkg/operator/staticpod/internal" + "github.com/openshift/library-go/pkg/operator/staticpod/internal/atomicdir" + "github.com/openshift/library-go/pkg/operator/staticpod/internal/dirutils" "github.com/openshift/library-go/pkg/operator/staticpod/internal/flock" ) +const stagingDirUID = "installer" + type InstallOptions struct { // TODO replace with genericclioptions KubeConfig string @@ -224,6 +226,12 @@ func (o *InstallOptions) copySecretsAndConfigMaps(ctx context.Context, resourceD return err } + stagingDirBase := getStagingDir(resourceDir) + klog.Infof("Pruning staging directory %q ...", stagingDirBase) + if err := dirutils.RemoveContent(stagingDirBase); err != nil { + return err + } + // Gather secrets. If we get API server error, retry getting until we hit the timeout. // Retrying will prevent temporary API server blips or networking issues. // We return when all "required" secrets are gathered, optional secrets are not checked. @@ -258,15 +266,12 @@ func (o *InstallOptions) copySecretsAndConfigMaps(ctx context.Context, resourceD if prefixed { secretBaseName = o.prefixFor(secret.Name) } - contentDir := path.Join(resourceDir, "secrets", secretBaseName) - klog.Infof("Creating directory %q ...", contentDir) - if err := os.MkdirAll(contentDir, 0755); err != nil { - return err - } - for filename, content := range secret.Data { - if err := writeSecret(content, path.Join(contentDir, filename)); err != nil { - return err - } + + contentDir := getSecretDir(resourceDir, secretBaseName) + stagingDir := getSecretStagingDir(resourceDir, secretBaseName) + + if err := atomicdir.Sync(contentDir, 0700, stagingDir, secret.Data, 0600); err != nil { + return fmt.Errorf("failed to sync secret %s/%s (directory %q): %w", secret.Namespace, secret.Name, contentDir, err) } } for _, configmap := range configs { @@ -274,18 +279,19 @@ func (o *InstallOptions) copySecretsAndConfigMaps(ctx context.Context, resourceD if prefixed { configMapBaseName = o.prefixFor(configmap.Name) } - contentDir := path.Join(resourceDir, "configmaps", configMapBaseName) - klog.Infof("Creating directory %q ...", contentDir) - if err := os.MkdirAll(contentDir, 0755); err != nil { - return err + + contentDir := getConfigMapDir(resourceDir, configMapBaseName) + stagingDir := getConfigMapStagingDir(resourceDir, configMapBaseName) + + files := make(map[string][]byte, len(configmap.Data)) + for k, v := range configmap.Data { + files[k] = []byte(v) } - for filename, content := range configmap.Data { - if err := writeConfig([]byte(content), path.Join(contentDir, filename)); err != nil { - return err - } + + if err := atomicdir.Sync(contentDir, 0700, stagingDir, files, 0600); err != nil { + return fmt.Errorf("failed to sync configmap %s/%s (directory %q): %w", configmap.Namespace, configmap.Name, contentDir, err) } } - return nil } @@ -626,22 +632,22 @@ func (o *InstallOptions) writePod(rawPodBytes []byte, manifestFileName, resource return nil } -func writeConfig(content []byte, fullFilename string) error { - klog.Infof("Writing config file %q ...", fullFilename) +func getStagingDir(targetDir string) string { + return filepath.Join(targetDir, "staging", stagingDirUID) +} - filePerms := os.FileMode(0600) - if strings.HasSuffix(fullFilename, ".sh") { - filePerms = 0755 - } - return staticpod.WriteFileAtomic(content, filePerms, fullFilename) +func getConfigMapDir(targetDir, configMapName string) string { + return filepath.Join(targetDir, "configmaps", configMapName) } -func writeSecret(content []byte, fullFilename string) error { - klog.Infof("Writing secret manifest %q ...", fullFilename) +func getConfigMapStagingDir(targetDir, secretName string) string { + return filepath.Join(getStagingDir(targetDir), "configmaps", secretName) +} - filePerms := os.FileMode(0600) - if strings.HasSuffix(fullFilename, ".sh") { - filePerms = 0700 - } - return staticpod.WriteFileAtomic(content, filePerms, fullFilename) +func getSecretDir(targetDir, secretName string) string { + return filepath.Join(targetDir, "secrets", secretName) +} + +func getSecretStagingDir(targetDir, secretName string) string { + return filepath.Join(getStagingDir(targetDir), "secrets", secretName) } diff --git a/pkg/operator/staticpod/internal/dirutils/remove_content.go b/pkg/operator/staticpod/internal/dirutils/remove_content.go new file mode 100644 index 0000000000..76ba507002 --- /dev/null +++ b/pkg/operator/staticpod/internal/dirutils/remove_content.go @@ -0,0 +1,24 @@ +package dirutils + +import ( + "fmt" + "os" + "path/filepath" +) + +// RemoveContent removes all entries in the given directory. +func RemoveContent(path string) error { + entries, err := os.ReadDir(path) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return fmt.Errorf("failed reading directory %q: %w", path, err) + } + for _, entry := range entries { + if err := os.RemoveAll(filepath.Join(path, entry.Name())); err != nil { + return fmt.Errorf("failed removing directory %q: %w", path, err) + } + } + return nil +}