diff --git a/api/v1/objectstore_types.go b/api/v1/objectstore_types.go index 57a5d764..e2812695 100644 --- a/api/v1/objectstore_types.go +++ b/api/v1/objectstore_types.go @@ -24,26 +24,11 @@ import ( // InstanceSidecarConfiguration defines the configuration for the sidecar that runs in the instance pods. type InstanceSidecarConfiguration struct { - // The expiration time of the cache entries not managed by the informers. Expressed in seconds. - // +optional - // +kubebuilder:validation:Minimum=0 - // +kubebuilder:validation:Maximum=3600 - // +kubebuilder:default=180 - CacheTTL *int `json:"cacheTTL,omitempty"` - // The environment to be explicitly passed to the sidecar // +optional Env []corev1.EnvVar `json:"env,omitempty"` } -// GetCacheTTL returns the cache TTL value, defaulting to 180 seconds if not set. -func (i InstanceSidecarConfiguration) GetCacheTTL() int { - if i.CacheTTL == nil { - return 180 - } - return *i.CacheTTL -} - // ObjectStoreSpec defines the desired state of ObjectStore. type ObjectStoreSpec struct { Configuration barmanapi.BarmanObjectStoreConfiguration `json:"configuration"` diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 876c2138..43b0574f 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -28,11 +28,6 @@ import ( // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *InstanceSidecarConfiguration) DeepCopyInto(out *InstanceSidecarConfiguration) { *out = *in - if in.CacheTTL != nil { - in, out := &in.CacheTTL, &out.CacheTTL - *out = new(int) - **out = **in - } if in.Env != nil { in, out := &in.Env, &out.Env *out = make([]corev1.EnvVar, len(*in)) diff --git a/config/crd/bases/barmancloud.cnpg.io_objectstores.yaml b/config/crd/bases/barmancloud.cnpg.io_objectstores.yaml index 0a190c73..397a4b81 100644 --- a/config/crd/bases/barmancloud.cnpg.io_objectstores.yaml +++ b/config/crd/bases/barmancloud.cnpg.io_objectstores.yaml @@ -382,13 +382,6 @@ spec: description: InstanceSidecarConfiguration defines the configuration for the sidecar that runs in the instance pods. properties: - cacheTTL: - default: 180 - description: The expiration time of the cache entries not managed - by the informers. Expressed in seconds. - maximum: 3600 - minimum: 0 - type: integer env: description: The environment to be explicitly passed to the sidecar items: diff --git a/docs/examples/cluster-example.yaml b/docs/examples/cluster-example.yaml index b80557d5..b367f91b 100644 --- a/docs/examples/cluster-example.yaml +++ b/docs/examples/cluster-example.yaml @@ -4,7 +4,7 @@ metadata: name: cluster-example spec: instances: 3 - imagePullPolicy: IfNotPresent + imagePullPolicy: Always plugins: - name: barman-cloud.cloudnative-pg.io parameters: diff --git a/go.mod b/go.mod index a9b59fd4..ccbab8f9 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/cloudnative-pg/barman-cloud v0.0.0-20241105055149-ae6c2408bd14 github.com/cloudnative-pg/cloudnative-pg v1.24.1-0.20241113134512-8608232c2813 github.com/cloudnative-pg/cnpg-i v0.0.0-20241109002750-8abd359df734 - github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20241129144432-bd94f16685d3 + github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20241205093641-958e207b8afe github.com/cloudnative-pg/machinery v0.0.0-20241105070525-042a028b767c github.com/docker/docker v27.3.1+incompatible github.com/onsi/ginkgo/v2 v2.21.0 diff --git a/go.sum b/go.sum index 9c10fc2e..e5e4e584 100644 --- a/go.sum +++ b/go.sum @@ -32,8 +32,8 @@ github.com/cloudnative-pg/cloudnative-pg v1.24.1-0.20241113134512-8608232c2813 h github.com/cloudnative-pg/cloudnative-pg v1.24.1-0.20241113134512-8608232c2813/go.mod h1:f4hObdRVoQtMmVtWqZ6VDZBrI6ok9Td/UMhojQ+EPmk= github.com/cloudnative-pg/cnpg-i v0.0.0-20241109002750-8abd359df734 h1:4jq/FUrlAKxu0Kw9PL5lj5Njq8pAnmUpP/kXKOrJAaE= github.com/cloudnative-pg/cnpg-i v0.0.0-20241109002750-8abd359df734/go.mod h1:3U7miYasKr2rYCQzrn/IvbSQc0OpYF8ieZt2FKG4nv0= -github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20241129144432-bd94f16685d3 h1:hKTlmgyOq5ZS7t1eVa4SY1hH361gZ7VIb0an+BH9rJs= -github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20241129144432-bd94f16685d3/go.mod h1:X6r1fRuUEIAv4+5SSBY2RmQ201K6GcptOXgnmaX/8tY= +github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20241205093641-958e207b8afe h1:gUGqx4eTHreM0QWbszSx6wnbBw9Vavp5uYl4uA9fh1k= +github.com/cloudnative-pg/cnpg-i-machinery v0.0.0-20241205093641-958e207b8afe/go.mod h1:X6r1fRuUEIAv4+5SSBY2RmQ201K6GcptOXgnmaX/8tY= github.com/cloudnative-pg/machinery v0.0.0-20241105070525-042a028b767c h1:t0RBU2gBiwJQ9XGeXlHPBYpsTscSKHgB5TfcWaiwanc= github.com/cloudnative-pg/machinery v0.0.0-20241105070525-042a028b767c/go.mod h1:uBHGRIk5rt07mO4zjIC1uvGBWTH6PqIiD1PfpvPGZKU= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= diff --git a/internal/cmd/instance/main.go b/internal/cmd/instance/main.go index cbb2b44c..70662901 100644 --- a/internal/cmd/instance/main.go +++ b/internal/cmd/instance/main.go @@ -18,11 +18,8 @@ func NewCmd() *cobra.Command { RunE: func(cmd *cobra.Command, _ []string) error { requiredSettings := []string{ "namespace", - "barman-object-name", - "cluster-name", "pod-name", "spool-directory", - "server-name", } for _, k := range requiredSettings { @@ -36,16 +33,9 @@ func NewCmd() *cobra.Command { } _ = viper.BindEnv("namespace", "NAMESPACE") - _ = viper.BindEnv("cluster-name", "CLUSTER_NAME") _ = viper.BindEnv("pod-name", "POD_NAME") _ = viper.BindEnv("pgdata", "PGDATA") _ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY") - _ = viper.BindEnv("barman-object-name", "BARMAN_OBJECT_NAME") - _ = viper.BindEnv("server-name", "SERVER_NAME") - - _ = viper.BindEnv("recovery-barman-object-name", "RECOVERY_BARMAN_OBJECT_NAME") - _ = viper.BindEnv("recovery-server-name", "RECOVERY_SERVER_NAME") - return cmd } diff --git a/internal/cmd/restore/main.go b/internal/cmd/restore/main.go index 617caf56..13ac0770 100644 --- a/internal/cmd/restore/main.go +++ b/internal/cmd/restore/main.go @@ -20,14 +20,8 @@ func NewCmd() *cobra.Command { RunE: func(cmd *cobra.Command, _ []string) error { requiredSettings := []string{ "namespace", - "cluster-name", "pod-name", "spool-directory", - - // IMPORTANT: barman-object-name and server-name are not required - // to restore a cluster. - "recovery-barman-object-name", - "recovery-server-name", } for _, k := range requiredSettings { @@ -41,16 +35,9 @@ func NewCmd() *cobra.Command { } _ = viper.BindEnv("namespace", "NAMESPACE") - _ = viper.BindEnv("cluster-name", "CLUSTER_NAME") _ = viper.BindEnv("pod-name", "POD_NAME") _ = viper.BindEnv("pgdata", "PGDATA") _ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY") - _ = viper.BindEnv("barman-object-name", "BARMAN_OBJECT_NAME") - _ = viper.BindEnv("server-name", "SERVER_NAME") - - _ = viper.BindEnv("recovery-barman-object-name", "RECOVERY_BARMAN_OBJECT_NAME") - _ = viper.BindEnv("recovery-server-name", "RECOVERY_SERVER_NAME") - return cmd } diff --git a/internal/cnpgi/common/errors.go b/internal/cnpgi/common/errors.go new file mode 100644 index 00000000..48b2cdee --- /dev/null +++ b/internal/cnpgi/common/errors.go @@ -0,0 +1,16 @@ +package common + +// walNotFoundError is raised when a WAL file has not been found in the object store +type walNotFoundError struct{} + +func newWALNotFoundError() *walNotFoundError { return &walNotFoundError{} } + +// ShouldPrintStackTrace tells whether the sidecar log stream should contain the stack trace +func (e walNotFoundError) ShouldPrintStackTrace() bool { + return false +} + +// Error implements the error interface +func (e walNotFoundError) Error() string { + return "WAL file not found" +} diff --git a/internal/cnpgi/common/wal.go b/internal/cnpgi/common/wal.go index 8880ac38..ef321726 100644 --- a/internal/cnpgi/common/wal.go +++ b/internal/cnpgi/common/wal.go @@ -20,23 +20,17 @@ import ( barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata" + "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/operator/config" ) // WALServiceImplementation is the implementation of the WAL Service type WALServiceImplementation struct { wal.UnimplementedWALServer - ClusterObjectKey client.ObjectKey - Client client.Client - InstanceName string - SpoolDirectory string - PGDataPath string - PGWALPath string - - BarmanObjectKey client.ObjectKey - ServerName string - - RecoveryBarmanObjectKey client.ObjectKey - RecoveryServerName string + Client client.Client + InstanceName string + SpoolDirectory string + PGDataPath string + PGWALPath string } // GetCapabilities implements the WALService interface @@ -72,13 +66,13 @@ func (w WALServiceImplementation) Archive( contextLogger := log.FromContext(ctx) contextLogger.Debug("starting wal archive") - var cluster cnpgv1.Cluster - if err := w.Client.Get(ctx, w.ClusterObjectKey, &cluster); err != nil { + configuration, err := config.NewFromClusterJSON(request.ClusterDefinition) + if err != nil { return nil, err } var objectStore barmancloudv1.ObjectStore - if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil { + if err := w.Client.Get(ctx, configuration.GetBarmanObjectKey(), &objectStore); err != nil { return nil, err } @@ -106,7 +100,7 @@ func (w WALServiceImplementation) Archive( return nil, err } - options, err := arch.BarmanCloudWalArchiveOptions(ctx, &objectStore.Spec.Configuration, w.ServerName) + options, err := arch.BarmanCloudWalArchiveOptions(ctx, &objectStore.Spec.Configuration, configuration.ServerName) if err != nil { return nil, err } @@ -127,12 +121,13 @@ func (w WALServiceImplementation) Restore( ctx context.Context, request *wal.WALRestoreRequest, ) (*wal.WALRestoreResult, error) { - // TODO: build full paths + contextLogger := log.FromContext(ctx) + walName := request.GetSourceWalName() destinationPath := request.GetDestinationFileName() - var cluster cnpgv1.Cluster - if err := w.Client.Get(ctx, w.ClusterObjectKey, &cluster); err != nil { + configuration, err := config.NewFromClusterJSON(request.ClusterDefinition) + if err != nil { return nil, err } @@ -140,30 +135,35 @@ func (w WALServiceImplementation) Restore( var serverName string switch { - case cluster.IsReplica() && cluster.Status.CurrentPrimary == w.InstanceName: - // Designated primary on replica cluster, using recovery object store - serverName = w.RecoveryServerName - if err := w.Client.Get(ctx, w.RecoveryBarmanObjectKey, &objectStore); err != nil { + case configuration.Cluster.IsReplica() && configuration.Cluster.Status.CurrentPrimary == w.InstanceName: + // Designated primary on replica cluster, using replica source object store + serverName = configuration.ReplicaSourceServerName + if err := w.Client.Get(ctx, configuration.GetReplicaSourceBarmanObjectKey(), &objectStore); err != nil { return nil, err } - case cluster.Status.CurrentPrimary == "": + case configuration.Cluster.Status.CurrentPrimary == "": // Recovery from object store, using recovery object store - serverName = w.RecoveryServerName - if err := w.Client.Get(ctx, w.RecoveryBarmanObjectKey, &objectStore); err != nil { + serverName = configuration.RecoveryServerName + if err := w.Client.Get(ctx, configuration.GetRecoveryBarmanObjectKey(), &objectStore); err != nil { return nil, err } default: // Using cluster object store - serverName = w.ServerName - if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil { + serverName = configuration.ServerName + if err := w.Client.Get(ctx, configuration.GetBarmanObjectKey(), &objectStore); err != nil { return nil, err } } + contextLogger.Info( + "Restoring WAL file", + "objectStore", objectStore.Name, + "serverName", serverName, + "walName", walName) return &wal.WALRestoreResult{}, w.restoreFromBarmanObjectStore( - ctx, &cluster, &objectStore, serverName, walName, destinationPath) + ctx, configuration.Cluster, &objectStore, serverName, walName, destinationPath) } func (w WALServiceImplementation) restoreFromBarmanObjectStore( @@ -246,6 +246,10 @@ func (w WALServiceImplementation) restoreFromBarmanObjectStore( // is the one that PostgreSQL has requested to restore. // The failure has already been logged in walRestorer.RestoreList method if walStatus[0].Err != nil { + if errors.Is(walStatus[0].Err, barmanRestorer.ErrWALNotFound) { + return newWALNotFoundError() + } + return walStatus[0].Err } diff --git a/internal/cnpgi/instance/backup.go b/internal/cnpgi/instance/backup.go index 5e1e042e..207550ad 100644 --- a/internal/cnpgi/instance/backup.go +++ b/internal/cnpgi/instance/backup.go @@ -10,7 +10,6 @@ import ( barmanBackup "github.com/cloudnative-pg/barman-cloud/pkg/backup" barmanCapabilities "github.com/cloudnative-pg/barman-cloud/pkg/capabilities" barmanCredentials "github.com/cloudnative-pg/barman-cloud/pkg/credentials" - cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" "github.com/cloudnative-pg/cloudnative-pg/pkg/postgres" "github.com/cloudnative-pg/cnpg-i/pkg/backup" "github.com/cloudnative-pg/machinery/pkg/fileutils" @@ -22,16 +21,14 @@ import ( barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/common" "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata" + "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/operator/config" ) // BackupServiceImplementation is the implementation // of the Backup CNPG capability type BackupServiceImplementation struct { - BarmanObjectKey client.ObjectKey - ClusterObjectKey client.ObjectKey - Client client.Client - InstanceName string - ServerName string + Client client.Client + InstanceName string backup.UnimplementedBackupServer } @@ -65,20 +62,20 @@ func (b BackupServiceImplementation) GetCapabilities( // Backup implements the Backup interface func (b BackupServiceImplementation) Backup( ctx context.Context, - _ *backup.BackupRequest, + request *backup.BackupRequest, ) (*backup.BackupResult, error) { contextLogger := log.FromContext(ctx) contextLogger.Info("Starting backup") - var cluster cnpgv1.Cluster - if err := b.Client.Get(ctx, b.ClusterObjectKey, &cluster); err != nil { + configuration, err := config.NewFromClusterJSON(request.ClusterDefinition) + if err != nil { return nil, err } var objectStore barmancloudv1.ObjectStore - if err := b.Client.Get(ctx, b.BarmanObjectKey, &objectStore); err != nil { - contextLogger.Error(err, "while getting object store", "key", b.BarmanObjectKey) + if err := b.Client.Get(ctx, configuration.GetBarmanObjectKey(), &objectStore); err != nil { + contextLogger.Error(err, "while getting object store", "key", configuration.GetRecoveryBarmanObjectKey()) return nil, err } @@ -117,7 +114,7 @@ func (b BackupServiceImplementation) Backup( if err = backupCmd.Take( ctx, backupName, - b.ServerName, + configuration.ServerName, env, barmanCloudExecutor{}, postgres.BackupTemporaryDirectory, @@ -129,7 +126,7 @@ func (b BackupServiceImplementation) Backup( executedBackupInfo, err := backupCmd.GetExecutedBackupInfo( ctx, backupName, - b.ServerName, + configuration.ServerName, barmanCloudExecutor{}, env) if err != nil { diff --git a/internal/cnpgi/instance/identity.go b/internal/cnpgi/instance/identity.go index 1e29d6b7..ef9c8458 100644 --- a/internal/cnpgi/instance/identity.go +++ b/internal/cnpgi/instance/identity.go @@ -2,20 +2,17 @@ package instance import ( "context" - "fmt" "github.com/cloudnative-pg/cnpg-i/pkg/identity" "sigs.k8s.io/controller-runtime/pkg/client" - barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata" ) // IdentityImplementation implements IdentityServer type IdentityImplementation struct { identity.UnimplementedIdentityServer - BarmanObjectKey client.ObjectKey - Client client.Client + Client client.Client } // GetPluginMetadata implements IdentityServer @@ -53,14 +50,9 @@ func (i IdentityImplementation) GetPluginCapabilities( // Probe implements IdentityServer func (i IdentityImplementation) Probe( - ctx context.Context, + _ context.Context, _ *identity.ProbeRequest, ) (*identity.ProbeResponse, error) { - var obj barmancloudv1.ObjectStore - if err := i.Client.Get(ctx, i.BarmanObjectKey, &obj); err != nil { - return nil, fmt.Errorf("while fetching object store %s: %w", i.BarmanObjectKey.Name, err) - } - return &identity.ProbeResponse{ Ready: true, }, nil diff --git a/internal/cnpgi/instance/internal/client/client.go b/internal/cnpgi/instance/internal/client/client.go index cab7ebcd..6b5dbd35 100644 --- a/internal/cnpgi/instance/internal/client/client.go +++ b/internal/cnpgi/instance/internal/client/client.go @@ -3,61 +3,56 @@ package client import ( "context" "fmt" - "math" + "reflect" "sync" "time" "github.com/cloudnative-pg/machinery/pkg/log" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" - - v1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" ) -type cachedSecret struct { - secret *corev1.Secret +// DefaultTTLSeconds is the default TTL in seconds of cache entries +const DefaultTTLSeconds = 10 + +type cachedEntry struct { + entry client.Object fetchUnixTime int64 + ttl time.Duration +} + +func (e *cachedEntry) isExpired() bool { + return time.Now().Unix()-e.fetchUnixTime > int64(e.ttl) } // ExtendedClient is an extended client that is capable of caching multiple secrets without relying on informers type ExtendedClient struct { client.Client - barmanObjectKeys []client.ObjectKey - cachedSecrets []*cachedSecret - mux *sync.Mutex - ttl int + cachedObjects []cachedEntry + mux *sync.Mutex } // NewExtendedClient returns an extended client capable of caching secrets on the 'Get' operation func NewExtendedClient( baseClient client.Client, - objectStoreKeys []client.ObjectKey, ) client.Client { return &ExtendedClient{ - Client: baseClient, - barmanObjectKeys: objectStoreKeys, - mux: &sync.Mutex{}, + Client: baseClient, + mux: &sync.Mutex{}, } } -func (e *ExtendedClient) refreshTTL(ctx context.Context) error { - minTTL := math.MaxInt - - for _, key := range e.barmanObjectKeys { - var object v1.ObjectStore - - if err := e.Get(ctx, key, &object); err != nil { - return fmt.Errorf("failed to get the object store while refreshing the TTL parameter: %w", err) - } +func (e *ExtendedClient) isObjectCached(obj client.Object) bool { + if _, isSecret := obj.(*corev1.Secret); isSecret { + return true + } - currentTTL := object.Spec.InstanceSidecarConfiguration.GetCacheTTL() - if currentTTL < minTTL { - minTTL = currentTTL - } + if _, isObjectStore := obj.(*corev1.Secret); isObjectStore { + return true } - e.ttl = minTTL - return nil + return false } // Get behaves like the original Get method, but uses a cache for secrets @@ -67,84 +62,87 @@ func (e *ExtendedClient) Get( obj client.Object, opts ...client.GetOption, ) error { - contextLogger := log.FromContext(ctx). - WithName("extended_client"). - WithValues("name", key.Name, "namespace", key.Namespace) - - if _, ok := obj.(*corev1.Secret); !ok { - contextLogger.Trace("not a secret, skipping") + if !e.isObjectCached(obj) { return e.Client.Get(ctx, key, obj, opts...) } - if err := e.refreshTTL(ctx); err != nil { - return err - } + return e.getCachedObject(ctx, key, obj, opts...) +} - if e.isCacheDisabled() { - contextLogger.Trace("cache is disabled") - return e.Client.Get(ctx, key, obj, opts...) - } +func (e *ExtendedClient) getCachedObject( + ctx context.Context, + key client.ObjectKey, + obj client.Object, + opts ...client.GetOption, +) error { + contextLogger := log.FromContext(ctx). + WithName("extended_client"). + WithValues("name", key.Name, "namespace", key.Namespace) contextLogger.Trace("locking the cache") e.mux.Lock() defer e.mux.Unlock() - expiredSecretIndex := -1 // check if in cache - for idx, cache := range e.cachedSecrets { - if cache.secret.Namespace != key.Namespace || cache.secret.Name != key.Name { + expiredObjectIndex := -1 + for idx, cacheEntry := range e.cachedObjects { + if cacheEntry.entry.GetNamespace() != key.Namespace || cacheEntry.entry.GetName() != key.Name { continue } - if e.isExpired(cache.fetchUnixTime) { - contextLogger.Trace("secret found, but it is expired") - expiredSecretIndex = idx + if cacheEntry.entry.GetObjectKind().GroupVersionKind() != obj.GetObjectKind().GroupVersionKind() { + continue + } + if cacheEntry.isExpired() { + contextLogger.Trace("expired object found") + expiredObjectIndex = idx break } - contextLogger.Debug("secret found, loading it from cache") - cache.secret.DeepCopyInto(obj.(*corev1.Secret)) + + contextLogger.Debug("object found, loading it from cache") + + // Yes, this is a terrible hack, but that's exactly the way + // controller-runtime works. + // https://github.com/kubernetes-sigs/controller-runtime/blob/ + // 717b32aede14c921d239cf1b974a11e718949865/pkg/cache/internal/cache_reader.go#L92 + outVal := reflect.ValueOf(obj) + objVal := reflect.ValueOf(cacheEntry.entry) + if !objVal.Type().AssignableTo(outVal.Type()) { + return fmt.Errorf("cache had type %s, but %s was asked for", objVal.Type(), outVal.Type()) + } + + reflect.Indirect(outVal).Set(reflect.Indirect(objVal)) return nil } - if err := e.Client.Get(ctx, key, obj); err != nil { + if err := e.Client.Get(ctx, key, obj, opts...); err != nil { return err } - cs := &cachedSecret{ - secret: obj.(*corev1.Secret).DeepCopy(), + cs := cachedEntry{ + entry: obj.(runtime.Object).DeepCopyObject().(client.Object), fetchUnixTime: time.Now().Unix(), } - contextLogger.Debug("setting secret in the cache") - if expiredSecretIndex != -1 { - e.cachedSecrets[expiredSecretIndex] = cs + contextLogger.Debug("setting object in the cache") + if expiredObjectIndex != -1 { + e.cachedObjects[expiredObjectIndex] = cs } else { - e.cachedSecrets = append(e.cachedSecrets, cs) + e.cachedObjects = append(e.cachedObjects, cs) } return nil } -func (e *ExtendedClient) isExpired(unixTime int64) bool { - return time.Now().Unix()-unixTime > int64(e.ttl) -} - -func (e *ExtendedClient) isCacheDisabled() bool { - const noCache = 0 - return e.ttl == noCache -} - -// RemoveSecret ensures that a secret is not present in the cache -func (e *ExtendedClient) RemoveSecret(key client.ObjectKey) { - if e.isCacheDisabled() { - return - } - +// removeObject ensures that a object is not present in the cache +func (e *ExtendedClient) removeObject(object client.Object) { e.mux.Lock() defer e.mux.Unlock() - for i, cache := range e.cachedSecrets { - if cache.secret.Namespace == key.Namespace && cache.secret.Name == key.Name { - e.cachedSecrets = append(e.cachedSecrets[:i], e.cachedSecrets[i+1:]...) + for i, cache := range e.cachedObjects { + if cache.entry.GetNamespace() == object.GetNamespace() && + cache.entry.GetName() == object.GetName() && + cache.entry.GetObjectKind().GroupVersionKind() != object.GetObjectKind().GroupVersionKind() { + e.cachedObjects = append(e.cachedObjects[:i], e.cachedObjects[i+1:]...) return } } @@ -156,16 +154,10 @@ func (e *ExtendedClient) Update( obj client.Object, opts ...client.UpdateOption, ) error { - if e.isCacheDisabled() { - return e.Client.Update(ctx, obj, opts...) + if e.isObjectCached(obj) { + e.removeObject(obj) } - if _, ok := obj.(*corev1.Secret); !ok { - return e.Client.Update(ctx, obj, opts...) - } - - e.RemoveSecret(client.ObjectKeyFromObject(obj)) - return e.Client.Update(ctx, obj, opts...) } @@ -175,16 +167,10 @@ func (e *ExtendedClient) Delete( obj client.Object, opts ...client.DeleteOption, ) error { - if e.isCacheDisabled() { - return e.Client.Delete(ctx, obj, opts...) + if e.isObjectCached(obj) { + e.removeObject(obj) } - if _, ok := obj.(*corev1.Secret); !ok { - return e.Client.Delete(ctx, obj, opts...) - } - - e.RemoveSecret(client.ObjectKeyFromObject(obj)) - return e.Client.Delete(ctx, obj, opts...) } @@ -195,15 +181,9 @@ func (e *ExtendedClient) Patch( patch client.Patch, opts ...client.PatchOption, ) error { - if e.isCacheDisabled() { - return e.Client.Patch(ctx, obj, patch, opts...) + if e.isObjectCached(obj) { + e.removeObject(obj) } - if _, ok := obj.(*corev1.Secret); !ok { - return e.Client.Patch(ctx, obj, patch, opts...) - } - - e.RemoveSecret(client.ObjectKeyFromObject(obj)) - return e.Client.Patch(ctx, obj, patch, opts...) } diff --git a/internal/cnpgi/instance/internal/client/client_test.go b/internal/cnpgi/instance/internal/client/client_test.go index be517497..70568f45 100644 --- a/internal/cnpgi/instance/internal/client/client_test.go +++ b/internal/cnpgi/instance/internal/client/client_test.go @@ -6,7 +6,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -45,19 +44,13 @@ var _ = Describe("ExtendedClient Get", func() { Namespace: "default", Name: "test-object-store", }, - Spec: v1.ObjectStoreSpec{ - InstanceSidecarConfiguration: v1.InstanceSidecarConfiguration{ - CacheTTL: ptr.To(60), - }, - }, + Spec: v1.ObjectStoreSpec{}, } baseClient := fake.NewClientBuilder(). WithScheme(scheme). WithObjects(secretInClient, objectStore).Build() - extendedClient = NewExtendedClient(baseClient, []client.ObjectKey{ - client.ObjectKeyFromObject(objectStore), - }).(*ExtendedClient) + extendedClient = NewExtendedClient(baseClient).(*ExtendedClient) }) It("returns secret from cache if not expired", func(ctx SpecContext) { @@ -70,22 +63,22 @@ var _ = Describe("ExtendedClient Get", func() { // manually add the secret to the cache, this is not present in the fake client so we are sure it is from the // cache - extendedClient.cachedSecrets = []*cachedSecret{ + extendedClient.cachedObjects = []cachedEntry{ { - secret: secretNotInClient, + entry: secretNotInClient, fetchUnixTime: time.Now().Unix(), }, } err := extendedClient.Get(ctx, client.ObjectKeyFromObject(secretNotInClient), secretInClient) Expect(err).NotTo(HaveOccurred()) - Expect(secretNotInClient).To(Equal(extendedClient.cachedSecrets[0].secret)) + Expect(secretNotInClient).To(Equal(extendedClient.cachedObjects[0].entry)) }) It("fetches secret from base client if cache is expired", func(ctx SpecContext) { - extendedClient.cachedSecrets = []*cachedSecret{ + extendedClient.cachedObjects = []cachedEntry{ { - secret: secretInClient.DeepCopy(), + entry: secretInClient.DeepCopy(), fetchUnixTime: time.Now().Add(-2 * time.Minute).Unix(), }, } @@ -111,6 +104,6 @@ var _ = Describe("ExtendedClient Get", func() { err = extendedClient.Get(ctx, client.ObjectKeyFromObject(configMap), configMap) Expect(err).NotTo(HaveOccurred()) - Expect(extendedClient.cachedSecrets).To(BeEmpty()) + Expect(extendedClient.cachedObjects).To(BeEmpty()) }) }) diff --git a/internal/cnpgi/instance/manager.go b/internal/cnpgi/instance/manager.go index 7bc8e830..757c937f 100644 --- a/internal/cnpgi/instance/manager.go +++ b/internal/cnpgi/instance/manager.go @@ -7,13 +7,10 @@ import ( cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" "github.com/spf13/viper" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" @@ -33,89 +30,35 @@ func init() { func Start(ctx context.Context) error { setupLog := log.FromContext(ctx) setupLog.Info("Starting barman cloud instance plugin") - namespace := viper.GetString("namespace") - clusterName := viper.GetString("cluster-name") podName := viper.GetString("pod-name") - barmanObjectName := viper.GetString("barman-object-name") - recoveryBarmanObjectName := viper.GetString("recovery-barman-object-name") - controllerOptions := ctrl.Options{ Scheme: scheme, - Cache: cache.Options{ - ByObject: map[client.Object]cache.ByObject{ - &cnpgv1.Cluster{}: { - Field: fields.OneTermEqualSelector("metadata.name", clusterName), - Namespaces: map[string]cache.Config{ - namespace: {}, - }, - }, - }, - }, Client: client.Options{ Cache: &client.CacheOptions{ DisableFor: []client.Object{ &corev1.Secret{}, + &barmancloudv1.ObjectStore{}, + &cnpgv1.Cluster{}, }, }, }, } - if len(recoveryBarmanObjectName) == 0 { - controllerOptions.Cache.ByObject[&barmancloudv1.ObjectStore{}] = cache.ByObject{ - Field: fields.OneTermEqualSelector("metadata.name", barmanObjectName), - Namespaces: map[string]cache.Config{ - namespace: {}, - }, - } - } else { - controllerOptions.Client.Cache.DisableFor = append( - controllerOptions.Client.Cache.DisableFor, - &barmancloudv1.ObjectStore{}, - ) - } - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), controllerOptions) if err != nil { setupLog.Error(err, "unable to start manager") return err } - barmanObjectKey := client.ObjectKey{ - Namespace: namespace, - Name: barmanObjectName, - } - recoveryBarmanObjectKey := client.ObjectKey{ - Namespace: namespace, - Name: recoveryBarmanObjectName, - } - - involvedObjectStores := make([]types.NamespacedName, 0, 2) - if len(barmanObjectName) > 0 { - involvedObjectStores = append(involvedObjectStores, barmanObjectKey) - } - if len(recoveryBarmanObjectName) > 0 { - involvedObjectStores = append(involvedObjectStores, recoveryBarmanObjectKey) - } - if err := mgr.Add(&CNPGI{ - Client: extendedclient.NewExtendedClient(mgr.GetClient(), involvedObjectStores), - ClusterObjectKey: client.ObjectKey{ - Namespace: namespace, - Name: clusterName, - }, + Client: extendedclient.NewExtendedClient(mgr.GetClient()), InstanceName: podName, // TODO: improve PGDataPath: viper.GetString("pgdata"), PGWALPath: path.Join(viper.GetString("pgdata"), "pg_wal"), SpoolDirectory: viper.GetString("spool-directory"), PluginPath: viper.GetString("plugin-path"), - - BarmanObjectKey: barmanObjectKey, - ServerName: viper.GetString("server-name"), - - RecoveryBarmanObjectKey: recoveryBarmanObjectKey, - RecoveryServerName: viper.GetString("recovery-server-name"), }); err != nil { setupLog.Error(err, "unable to create CNPGI runnable") return err diff --git a/internal/cnpgi/instance/start.go b/internal/cnpgi/instance/start.go index 6e7dedc9..5eefa793 100644 --- a/internal/cnpgi/instance/start.go +++ b/internal/cnpgi/instance/start.go @@ -14,45 +14,28 @@ import ( // CNPGI is the implementation of the PostgreSQL sidecar type CNPGI struct { - Client client.Client - ClusterObjectKey client.ObjectKey - PGDataPath string - PGWALPath string - SpoolDirectory string + Client client.Client + PGDataPath string + PGWALPath string + SpoolDirectory string // mutually exclusive with serverAddress PluginPath string InstanceName string - - BarmanObjectKey client.ObjectKey - ServerName string - - RecoveryBarmanObjectKey client.ObjectKey - RecoveryServerName string } // Start starts the GRPC service func (c *CNPGI) Start(ctx context.Context) error { enrich := func(server *grpc.Server) error { wal.RegisterWALServer(server, common.WALServiceImplementation{ - ClusterObjectKey: c.ClusterObjectKey, - InstanceName: c.InstanceName, - Client: c.Client, - SpoolDirectory: c.SpoolDirectory, - PGDataPath: c.PGDataPath, - PGWALPath: c.PGWALPath, - - BarmanObjectKey: c.BarmanObjectKey, - ServerName: c.ServerName, - - RecoveryBarmanObjectKey: c.RecoveryBarmanObjectKey, - RecoveryServerName: c.RecoveryServerName, + InstanceName: c.InstanceName, + Client: c.Client, + SpoolDirectory: c.SpoolDirectory, + PGDataPath: c.PGDataPath, + PGWALPath: c.PGWALPath, }) backup.RegisterBackupServer(server, BackupServiceImplementation{ - Client: c.Client, - BarmanObjectKey: c.BarmanObjectKey, - ServerName: c.ServerName, - ClusterObjectKey: c.ClusterObjectKey, - InstanceName: c.InstanceName, + Client: c.Client, + InstanceName: c.InstanceName, }) common.AddHealthCheck(server) return nil @@ -60,8 +43,7 @@ func (c *CNPGI) Start(ctx context.Context) error { srv := http.Server{ IdentityImpl: IdentityImplementation{ - Client: c.Client, - BarmanObjectKey: c.BarmanObjectKey, + Client: c.Client, }, Enrichers: []http.ServerEnricher{enrich}, PluginPath: c.PluginPath, diff --git a/internal/cnpgi/operator/config/config.go b/internal/cnpgi/operator/config/config.go index 7c6e231d..0d66f3b8 100644 --- a/internal/cnpgi/operator/config/config.go +++ b/internal/cnpgi/operator/config/config.go @@ -4,6 +4,9 @@ import ( "strings" cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" + "github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/decoder" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata" ) @@ -44,10 +47,77 @@ func (e *ConfigurationError) IsEmpty() bool { // PluginConfiguration is the configuration of the plugin type PluginConfiguration struct { - BarmanObjectName string - ServerName string + Cluster *cnpgv1.Cluster + + BarmanObjectName string + ServerName string + RecoveryBarmanObjectName string RecoveryServerName string + + ReplicaSourceBarmanObjectName string + ReplicaSourceServerName string +} + +// GetBarmanObjectKey gets the namespaced name of the barman object +func (config *PluginConfiguration) GetBarmanObjectKey() types.NamespacedName { + return types.NamespacedName{ + Namespace: config.Cluster.Namespace, + Name: config.BarmanObjectName, + } +} + +// GetRecoveryBarmanObjectKey gets the namespaced name of the recovery barman object +func (config *PluginConfiguration) GetRecoveryBarmanObjectKey() types.NamespacedName { + return types.NamespacedName{ + Namespace: config.Cluster.Namespace, + Name: config.RecoveryBarmanObjectName, + } +} + +// GetReplicaSourceBarmanObjectKey gets the namespaced name of the replica source barman object +func (config *PluginConfiguration) GetReplicaSourceBarmanObjectKey() types.NamespacedName { + return types.NamespacedName{ + Namespace: config.Cluster.Namespace, + Name: config.ReplicaSourceBarmanObjectName, + } +} + +// GetReferredBarmanObjectsKey gets the list of barman objects referred by this +// plugin configuration +func (config *PluginConfiguration) GetReferredBarmanObjectsKey() []types.NamespacedName { + result := make([]types.NamespacedName, 0, 3) + + if len(config.BarmanObjectName) > 0 { + result = append(result, config.GetBarmanObjectKey()) + } + if len(config.RecoveryBarmanObjectName) > 0 { + result = append(result, config.GetRecoveryBarmanObjectKey()) + } + if len(config.ReplicaSourceBarmanObjectName) > 0 { + result = append(result, config.GetReplicaSourceBarmanObjectKey()) + } + + return result +} + +func getClusterGVK() schema.GroupVersionKind { + return schema.GroupVersionKind{ + Group: cnpgv1.GroupVersion.Group, + Version: cnpgv1.GroupVersion.Version, + Kind: cnpgv1.ClusterKind, + } +} + +// NewFromClusterJSON decodes a JSON representation of a cluster. +func NewFromClusterJSON(clusterJSON []byte) (*PluginConfiguration, error) { + var result cnpgv1.Cluster + + if err := decoder.DecodeObject(clusterJSON, &result, getClusterGVK()); err != nil { + return nil, err + } + + return NewFromCluster(&result), nil } // NewFromCluster extracts the configuration from the cluster @@ -68,7 +138,6 @@ func NewFromCluster(cluster *cnpgv1.Cluster) *PluginConfiguration { recoveryServerName := "" recoveryBarmanObjectName := "" - if recoveryParameters := getRecoveryParameters(cluster); recoveryParameters != nil { recoveryBarmanObjectName = recoveryParameters["barmanObjectName"] recoveryServerName = recoveryParameters["serverName"] @@ -77,20 +146,34 @@ func NewFromCluster(cluster *cnpgv1.Cluster) *PluginConfiguration { } } + replicaSourceServerName := "" + replicaSourceBarmanObjectName := "" + if replicaSourceParameters := getReplicaSourceParameters(cluster); replicaSourceParameters != nil { + replicaSourceBarmanObjectName = replicaSourceParameters["barmanObjectName"] + replicaSourceServerName = replicaSourceParameters["serverName"] + if len(replicaSourceServerName) == 0 { + replicaSourceServerName = cluster.Name + } + } + result := &PluginConfiguration{ + Cluster: cluster, // used for the backup/archive BarmanObjectName: helper.Parameters["barmanObjectName"], ServerName: serverName, - // used for restore/wal_restore + // used for restore and wal_restore during backup recovery RecoveryServerName: recoveryServerName, RecoveryBarmanObjectName: recoveryBarmanObjectName, + // used for wal_restore in the designed primary of a replica cluster + ReplicaSourceServerName: replicaSourceServerName, + ReplicaSourceBarmanObjectName: replicaSourceBarmanObjectName, } return result } func getRecoveryParameters(cluster *cnpgv1.Cluster) map[string]string { - recoveryPluginConfiguration := cluster.GetRecoverySourcePlugin() + recoveryPluginConfiguration := getRecoverySourcePlugin(cluster) if recoveryPluginConfiguration == nil { return nil } @@ -102,11 +185,67 @@ func getRecoveryParameters(cluster *cnpgv1.Cluster) map[string]string { return recoveryPluginConfiguration.Parameters } +func getReplicaSourceParameters(cluster *cnpgv1.Cluster) map[string]string { + replicaSourcePluginConfiguration := getReplicaSourcePlugin(cluster) + if replicaSourcePluginConfiguration == nil { + return nil + } + + if replicaSourcePluginConfiguration.Name != metadata.PluginName { + return nil + } + + return replicaSourcePluginConfiguration.Parameters +} + +// getRecoverySourcePlugin returns the configuration of the plugin being +// the recovery source of the cluster. If no such plugin have been configured, +// nil is returned +func getRecoverySourcePlugin(cluster *cnpgv1.Cluster) *cnpgv1.PluginConfiguration { + if cluster.Spec.Bootstrap == nil || cluster.Spec.Bootstrap.Recovery == nil { + return nil + } + + recoveryConfig := cluster.Spec.Bootstrap.Recovery + if len(recoveryConfig.Source) == 0 { + // Plugin-based recovery is supported only with + // An external cluster definition + return nil + } + + recoveryExternalCluster, found := cluster.ExternalCluster(recoveryConfig.Source) + if !found { + // This error should have already been detected + // by the validating webhook. + return nil + } + + return recoveryExternalCluster.PluginConfiguration +} + +// getRecoverySourcePlugin returns the configuration of the plugin being +// the recovery source of the cluster. If no such plugin have been configured, +// nil is returned +func getReplicaSourcePlugin(cluster *cnpgv1.Cluster) *cnpgv1.PluginConfiguration { + if cluster.Spec.ReplicaCluster == nil || len(cluster.Spec.ReplicaCluster.Source) == 0 { + return nil + } + + recoveryExternalCluster, found := cluster.ExternalCluster(cluster.Spec.ReplicaCluster.Source) + if !found { + // This error should have already been detected + // by the validating webhook. + return nil + } + + return recoveryExternalCluster.PluginConfiguration +} + // Validate checks if the barmanObjectName is set -func (p *PluginConfiguration) Validate() error { +func (config *PluginConfiguration) Validate() error { err := NewConfigurationError() - if len(p.BarmanObjectName) == 0 && len(p.RecoveryBarmanObjectName) == 0 { + if len(config.BarmanObjectName) == 0 && len(config.RecoveryBarmanObjectName) == 0 { return err.WithMessage("no reference to barmanObjectName have been included") } diff --git a/internal/cnpgi/operator/lifecycle.go b/internal/cnpgi/operator/lifecycle.go index 61a1e5e2..736b1d76 100644 --- a/internal/cnpgi/operator/lifecycle.go +++ b/internal/cnpgi/operator/lifecycle.go @@ -168,14 +168,13 @@ func (impl LifecycleImplementation) reconcileJob( return nil, nil } - return reconcileJob(ctx, cluster, request, pluginConfiguration, env) + return reconcileJob(ctx, cluster, request, env) } func reconcileJob( ctx context.Context, cluster *cnpgv1.Cluster, request *lifecycle.OperatorLifecycleRequest, - pluginConfiguration *config.PluginConfiguration, env []corev1.EnvVar, ) (*lifecycle.OperatorLifecycleResponse, error) { contextLogger := log.FromContext(ctx).WithName("lifecycle") @@ -206,7 +205,6 @@ func reconcileJob( mutatedJob := job.DeepCopy() if err := reconcilePodSpec( - pluginConfiguration, cluster, &mutatedJob.Spec.Template.Spec, "full-recovery", @@ -262,7 +260,6 @@ func reconcilePod( if len(pluginConfiguration.BarmanObjectName) != 0 { if err := reconcilePodSpec( - pluginConfiguration, cluster, &mutatedPod.Spec, "postgres", @@ -289,7 +286,6 @@ func reconcilePod( } func reconcilePodSpec( - cfg *config.PluginConfiguration, cluster *cnpgv1.Cluster, spec *corev1.PodSpec, mainContainerName string, @@ -313,32 +309,6 @@ func reconcilePodSpec( }, } - if len(cfg.BarmanObjectName) > 0 { - envs = append(envs, - corev1.EnvVar{ - Name: "BARMAN_OBJECT_NAME", - Value: cfg.BarmanObjectName, - }, - corev1.EnvVar{ - Name: "SERVER_NAME", - Value: cfg.ServerName, - }, - ) - } - - if len(cfg.RecoveryBarmanObjectName) > 0 { - envs = append(envs, - corev1.EnvVar{ - Name: "RECOVERY_BARMAN_OBJECT_NAME", - Value: cfg.RecoveryBarmanObjectName, - }, - corev1.EnvVar{ - Name: "RECOVERY_SERVER_NAME", - Value: cfg.RecoveryServerName, - }, - ) - } - envs = append(envs, additionalEnvs...) baseProbe := &corev1.Probe{ diff --git a/internal/cnpgi/operator/lifecycle_test.go b/internal/cnpgi/operator/lifecycle_test.go index 29285ebf..a8b964b9 100644 --- a/internal/cnpgi/operator/lifecycle_test.go +++ b/internal/cnpgi/operator/lifecycle_test.go @@ -107,7 +107,7 @@ var _ = Describe("LifecycleImplementation", func() { ObjectDefinition: jobJSON, } - response, err := reconcileJob(ctx, cluster, request, pluginConfiguration, nil) + response, err := reconcileJob(ctx, cluster, request, nil) Expect(err).NotTo(HaveOccurred()) Expect(response).NotTo(BeNil()) Expect(response.JsonPatch).NotTo(BeEmpty()) @@ -128,7 +128,7 @@ var _ = Describe("LifecycleImplementation", func() { ObjectDefinition: jobJSON, } - response, err := reconcileJob(ctx, cluster, request, pluginConfiguration, nil) + response, err := reconcileJob(ctx, cluster, request, nil) Expect(err).NotTo(HaveOccurred()) Expect(response).To(BeNil()) }) @@ -138,7 +138,7 @@ var _ = Describe("LifecycleImplementation", func() { ObjectDefinition: []byte("invalid-json"), } - response, err := reconcileJob(ctx, cluster, request, pluginConfiguration, nil) + response, err := reconcileJob(ctx, cluster, request, nil) Expect(err).To(HaveOccurred()) Expect(response).To(BeNil()) }) @@ -165,7 +165,7 @@ var _ = Describe("LifecycleImplementation", func() { ObjectDefinition: jobJSON, } - response, err := reconcileJob(ctx, cluster, request, pluginConfiguration, nil) + response, err := reconcileJob(ctx, cluster, request, nil) Expect(err).NotTo(HaveOccurred()) Expect(response).To(BeNil()) }) diff --git a/internal/cnpgi/operator/reconciler.go b/internal/cnpgi/operator/reconciler.go index c00bbea8..67a96c42 100644 --- a/internal/cnpgi/operator/reconciler.go +++ b/internal/cnpgi/operator/reconciler.go @@ -75,14 +75,10 @@ func (r ReconcilerImplementation) Pre( contextLogger.Debug("parsing barman object configuration") - var barmanObjects []barmancloudv1.ObjectStore - - if pluginConfiguration.BarmanObjectName != "" { + barmanObjects := make([]barmancloudv1.ObjectStore, 0, len(pluginConfiguration.GetReferredBarmanObjectsKey())) + for _, barmanObjectKey := range pluginConfiguration.GetReferredBarmanObjectsKey() { var barmanObject barmancloudv1.ObjectStore - if err := r.Client.Get(ctx, client.ObjectKey{ - Namespace: cluster.Namespace, - Name: pluginConfiguration.BarmanObjectName, - }, &barmanObject); err != nil { + if err := r.Client.Get(ctx, barmanObjectKey, &barmanObject); err != nil { if apierrs.IsNotFound(err) { contextLogger.Info( "barman object configuration not found, requeuing", @@ -99,30 +95,7 @@ func (r ReconcilerImplementation) Pre( barmanObjects = append(barmanObjects, barmanObject) } - if pluginConfiguration.RecoveryBarmanObjectName != "" { - var barmanObject barmancloudv1.ObjectStore - if err := r.Client.Get(ctx, client.ObjectKey{ - Namespace: cluster.Namespace, - Name: pluginConfiguration.RecoveryBarmanObjectName, - }, &barmanObject); err != nil { - if apierrs.IsNotFound(err) { - contextLogger.Info( - "barman recovery object configuration not found, requeuing", - "name", pluginConfiguration.RecoveryBarmanObjectName, - "namespace", cluster.Namespace, - ) - return &reconciler.ReconcilerHooksResult{ - Behavior: reconciler.ReconcilerHooksResult_BEHAVIOR_REQUEUE, - }, nil - } - return nil, err - } - - barmanObjects = append(barmanObjects, barmanObject) - } - - var additionalSecretNames []string - if err := r.ensureRole(ctx, &cluster, barmanObjects, additionalSecretNames); err != nil { + if err := r.ensureRole(ctx, &cluster, barmanObjects); err != nil { return nil, err } @@ -150,10 +123,9 @@ func (r ReconcilerImplementation) ensureRole( ctx context.Context, cluster *cnpgv1.Cluster, barmanObjects []barmancloudv1.ObjectStore, - additionalSecretNames []string, ) error { contextLogger := log.FromContext(ctx) - newRole := specs.BuildRole(cluster, barmanObjects, additionalSecretNames) + newRole := specs.BuildRole(cluster, barmanObjects) var role rbacv1.Role if err := r.Client.Get(ctx, client.ObjectKey{ diff --git a/internal/cnpgi/operator/specs/role.go b/internal/cnpgi/operator/specs/role.go index 0b1cdc78..22a72da3 100644 --- a/internal/cnpgi/operator/specs/role.go +++ b/internal/cnpgi/operator/specs/role.go @@ -15,7 +15,6 @@ import ( func BuildRole( cluster *cnpgv1.Cluster, barmanObjects []barmancloudv1.ObjectStore, - additionalSecretNames []string, ) *rbacv1.Role { role := &rbacv1.Role{ ObjectMeta: metav1.ObjectMeta{ @@ -36,10 +35,6 @@ func BuildRole( } } - for _, secret := range additionalSecretNames { - secretsSet.Put(secret) - } - role.Rules = append(role.Rules, rbacv1.PolicyRule{ APIGroups: []string{ "barmancloud.cnpg.io", diff --git a/internal/cnpgi/restore/manager.go b/internal/cnpgi/restore/manager.go index 8926d156..9503aee5 100644 --- a/internal/cnpgi/restore/manager.go +++ b/internal/cnpgi/restore/manager.go @@ -33,12 +33,6 @@ func Start(ctx context.Context) error { namespace := viper.GetString("namespace") clusterName := viper.GetString("cluster-name") - recoveryBarmanObjectName := viper.GetString("recovery-barman-object-name") - recoveryServerName := viper.GetString("recovery-server-name") - - barmanObjectName := viper.GetString("barman-object-name") - serverName := viper.GetString("server-name") - objs := map[client.Object]cache.ByObject{ &cnpgv1.Cluster{}: { Field: fields.OneTermEqualSelector("metadata.name", clusterName), @@ -48,15 +42,6 @@ func Start(ctx context.Context) error { }, } - if recoveryBarmanObjectName != "" { - objs[&barmancloudv1.ObjectStore{}] = cache.ByObject{ - Field: fields.OneTermEqualSelector("metadata.name", recoveryBarmanObjectName), - Namespaces: map[string]cache.Config{ - namespace: {}, - }, - } - } - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, Cache: cache.Options{ @@ -79,25 +64,9 @@ func Start(ctx context.Context) error { if err := mgr.Add(&CNPGI{ PluginPath: viper.GetString("plugin-path"), SpoolDirectory: viper.GetString("spool-directory"), - ClusterObjectKey: client.ObjectKey{ - Namespace: namespace, - Name: clusterName, - }, - Client: mgr.GetClient(), - PGDataPath: viper.GetString("pgdata"), - InstanceName: viper.GetString("pod-name"), - - ServerName: serverName, - BarmanObjectKey: client.ObjectKey{ - Namespace: namespace, - Name: barmanObjectName, - }, - - RecoveryServerName: recoveryServerName, - RecoveryBarmanObjectKey: client.ObjectKey{ - Namespace: namespace, - Name: recoveryBarmanObjectName, - }, + Client: mgr.GetClient(), + PGDataPath: viper.GetString("pgdata"), + InstanceName: viper.GetString("pod-name"), }); err != nil { setupLog.Error(err, "unable to create CNPGI runnable") return err diff --git a/internal/cnpgi/restore/restore.go b/internal/cnpgi/restore/restore.go index 2837053f..5ecf4637 100644 --- a/internal/cnpgi/restore/restore.go +++ b/internal/cnpgi/restore/restore.go @@ -18,17 +18,16 @@ import ( cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" "github.com/cloudnative-pg/cloudnative-pg/pkg/postgres" "github.com/cloudnative-pg/cloudnative-pg/pkg/utils" - "github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/decoder" restore "github.com/cloudnative-pg/cnpg-i/pkg/restore/job" "github.com/cloudnative-pg/machinery/pkg/execlog" "github.com/cloudnative-pg/machinery/pkg/fileutils" "github.com/cloudnative-pg/machinery/pkg/log" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata" + "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/operator/config" ) const ( @@ -44,14 +43,7 @@ const ( type JobHookImpl struct { restore.UnimplementedRestoreJobHooksServer - Client client.Client - ClusterObjectKey client.ObjectKey - - BarmanObjectKey types.NamespacedName - ServerName string - - RecoveryBarmanObjectKey types.NamespacedName - RecoveryServerName string + Client client.Client SpoolDirectory string PgDataPath string @@ -78,27 +70,24 @@ func (impl JobHookImpl) Restore( req *restore.RestoreRequest, ) (*restore.RestoreResponse, error) { contextLogger := log.FromContext(ctx) - var cluster cnpgv1.Cluster - if err := decoder.DecodeObject( - req.GetClusterDefinition(), - &cluster, - cnpgv1.GroupVersion.WithKind("Cluster"), - ); err != nil { + + configuration, err := config.NewFromClusterJSON(req.ClusterDefinition) + if err != nil { return nil, err } var recoveryObjectStore barmancloudv1.ObjectStore - if err := impl.Client.Get(ctx, impl.RecoveryBarmanObjectKey, &recoveryObjectStore); err != nil { + if err := impl.Client.Get(ctx, configuration.GetRecoveryBarmanObjectKey(), &recoveryObjectStore); err != nil { return nil, err } - if impl.BarmanObjectKey.Name != "" { + if configuration.BarmanObjectName != "" { var targetObjectStore barmancloudv1.ObjectStore - if err := impl.Client.Get(ctx, impl.BarmanObjectKey, &targetObjectStore); err != nil { + if err := impl.Client.Get(ctx, configuration.GetBarmanObjectKey(), &targetObjectStore); err != nil { return nil, err } - if err := impl.checkBackupDestination(ctx, &cluster, &targetObjectStore.Spec.Configuration); err != nil { + if err := impl.checkBackupDestination(ctx, configuration.Cluster, &targetObjectStore.Spec.Configuration); err != nil { return nil, err } } @@ -107,9 +96,9 @@ func (impl JobHookImpl) Restore( backup, env, err := loadBackupObjectFromExternalCluster( ctx, impl.Client, - &cluster, + configuration.Cluster, &recoveryObjectStore.Spec.Configuration, - impl.RecoveryServerName, + configuration.RecoveryServerName, ) if err != nil { return nil, err @@ -133,7 +122,7 @@ func (impl JobHookImpl) Restore( return nil, err } - if cluster.Spec.WalStorage != nil { + if configuration.Cluster.Spec.WalStorage != nil { if _, err := impl.restoreCustomWalDir(ctx); err != nil { return nil, err } diff --git a/internal/cnpgi/restore/start.go b/internal/cnpgi/restore/start.go index 0e0f0cc2..6f216039 100644 --- a/internal/cnpgi/restore/start.go +++ b/internal/cnpgi/restore/start.go @@ -18,14 +18,6 @@ type CNPGI struct { PluginPath string SpoolDirectory string - BarmanObjectKey client.ObjectKey - ServerName string - - RecoveryBarmanObjectKey client.ObjectKey - RecoveryServerName string - - ClusterObjectKey client.ObjectKey - Client client.Client PGDataPath string InstanceName string @@ -38,32 +30,18 @@ func (c *CNPGI) Start(ctx context.Context) error { enrich := func(server *grpc.Server) error { wal.RegisterWALServer(server, common.WALServiceImplementation{ - ClusterObjectKey: c.ClusterObjectKey, - InstanceName: c.InstanceName, - Client: c.Client, - SpoolDirectory: c.SpoolDirectory, - PGDataPath: c.PGDataPath, - PGWALPath: path.Join(c.PGDataPath, "pg_wal"), - - BarmanObjectKey: c.BarmanObjectKey, - ServerName: c.ServerName, - - RecoveryBarmanObjectKey: c.RecoveryBarmanObjectKey, - RecoveryServerName: c.RecoveryServerName, + InstanceName: c.InstanceName, + Client: c.Client, + SpoolDirectory: c.SpoolDirectory, + PGDataPath: c.PGDataPath, + PGWALPath: path.Join(c.PGDataPath, "pg_wal"), }) restore.RegisterRestoreJobHooksServer(server, &JobHookImpl{ Client: c.Client, - ClusterObjectKey: c.ClusterObjectKey, SpoolDirectory: c.SpoolDirectory, PgDataPath: c.PGDataPath, PgWalFolderToSymlink: PgWalVolumePgWalPath, - - BarmanObjectKey: c.BarmanObjectKey, - ServerName: c.ServerName, - - RecoveryBarmanObjectKey: c.RecoveryBarmanObjectKey, - RecoveryServerName: c.RecoveryServerName, }) common.AddHealthCheck(server)