Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/v1/objectstore_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1

import (
barmanapi "github.com/cloudnative-pg/barman-cloud/pkg/api"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -29,6 +30,10 @@ type InstanceSidecarConfiguration struct {
// +kubebuilder:validation:Maximum=3600
// +kubebuilder:default=180
CacheTTL *int `json:"cacheTTL,omitempty"`

// The environment to be explicitly passed to the sidecar
// +optional
Env []corev1.EnvVar `json:"env,omitempty"`
}

// GetCacheTTL returns the cache TTL value, defaulting to 180 seconds if not set.
Expand Down
123 changes: 123 additions & 0 deletions config/crd/bases/barmancloud.cnpg.io_objectstores.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -389,13 +389,136 @@ spec:
maximum: 3600
minimum: 0
type: integer
env:
description: The environment to be explicitly passed to the sidecar
items:
description: EnvVar represents an environment variable present
in a Container.
properties:
name:
description: Name of the environment variable. Must be a
C_IDENTIFIER.
type: string
value:
description: |-
Variable references $(VAR_NAME) are expanded
using the previously defined environment variables in the container and
any service environment variables. If a variable cannot be resolved,
the reference in the input string will be unchanged. Double $$ are reduced
to a single $, which allows for escaping the $(VAR_NAME) syntax: i.e.
"$$(VAR_NAME)" will produce the string literal "$(VAR_NAME)".
Escaped references will never be expanded, regardless of whether the variable
exists or not.
Defaults to "".
type: string
valueFrom:
description: Source for the environment variable's value.
Cannot be used if value is not empty.
properties:
configMapKeyRef:
description: Selects a key of a ConfigMap.
properties:
key:
description: The key to select.
type: string
name:
default: ""
description: |-
Name of the referent.
This field is effectively required, but due to backwards compatibility is
allowed to be empty. Instances of this type with an empty value here are
almost certainly wrong.
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
type: string
optional:
description: Specify whether the ConfigMap or its
key must be defined
type: boolean
required:
- key
type: object
x-kubernetes-map-type: atomic
fieldRef:
description: |-
Selects a field of the pod: supports metadata.name, metadata.namespace, `metadata.labels['<KEY>']`, `metadata.annotations['<KEY>']`,
spec.nodeName, spec.serviceAccountName, status.hostIP, status.podIP, status.podIPs.
properties:
apiVersion:
description: Version of the schema the FieldPath
is written in terms of, defaults to "v1".
type: string
fieldPath:
description: Path of the field to select in the
specified API version.
type: string
required:
- fieldPath
type: object
x-kubernetes-map-type: atomic
resourceFieldRef:
description: |-
Selects a resource of the container: only resources limits and requests
(limits.cpu, limits.memory, limits.ephemeral-storage, requests.cpu, requests.memory and requests.ephemeral-storage) are currently supported.
properties:
containerName:
description: 'Container name: required for volumes,
optional for env vars'
type: string
divisor:
anyOf:
- type: integer
- type: string
description: Specifies the output format of the
exposed resources, defaults to "1"
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
resource:
description: 'Required: resource to select'
type: string
required:
- resource
type: object
x-kubernetes-map-type: atomic
secretKeyRef:
description: Selects a key of a secret in the pod's
namespace
properties:
key:
description: The key of the secret to select from. Must
be a valid secret key.
type: string
name:
default: ""
description: |-
Name of the referent.
This field is effectively required, but due to backwards compatibility is
allowed to be empty. Instances of this type with an empty value here are
almost certainly wrong.
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
type: string
optional:
description: Specify whether the Secret or its key
must be defined
type: boolean
required:
- key
type: object
x-kubernetes-map-type: atomic
type: object
required:
- name
type: object
type: array
type: object
required:
- configuration
type: object
status:
description: ObjectStoreStatus defines the observed state of ObjectStore.
type: object
required:
- metadata
- spec
type: object
served: true
storage: true
Expand Down
105 changes: 100 additions & 5 deletions internal/cnpgi/operator/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ import (
"github.com/spf13/viper"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/operator/config"
)

// LifecycleImplementation is the implementation of the lifecycle handler
type LifecycleImplementation struct {
lifecycle.UnimplementedOperatorLifecycleServer
Client client.Client
}

// GetCapabilities exposes the lifecycle capabilities
Expand Down Expand Up @@ -94,20 +98,85 @@ func (impl LifecycleImplementation) LifecycleHook(
switch kind {
case "Pod":
contextLogger.Info("Reconciling pod")
return reconcilePod(ctx, &cluster, request, pluginConfiguration)
return impl.reconcilePod(ctx, &cluster, request, pluginConfiguration)
case "Job":
contextLogger.Info("Reconciling job")
return reconcileJob(ctx, &cluster, request, pluginConfiguration)
return impl.reconcileJob(ctx, &cluster, request, pluginConfiguration)
default:
return nil, fmt.Errorf("unsupported kind: %s", kind)
}
}

func (impl LifecycleImplementation) collectAdditionalEnvs(
ctx context.Context,
namespace string,
pluginConfiguration *config.PluginConfiguration,
) ([]corev1.EnvVar, error) {
var result []corev1.EnvVar

if len(pluginConfiguration.BarmanObjectName) > 0 {
envs, err := impl.collectObjectStoreEnvs(
ctx,
types.NamespacedName{
Name: pluginConfiguration.BarmanObjectName,
Namespace: namespace,
},
)
if err != nil {
return nil, err
}
result = append(result, envs...)
}

if len(pluginConfiguration.RecoveryBarmanObjectName) > 0 {
envs, err := impl.collectObjectStoreEnvs(
ctx,
types.NamespacedName{
Name: pluginConfiguration.RecoveryBarmanObjectName,
Namespace: namespace,
},
)
if err != nil {
return nil, err
}
result = append(result, envs...)
}

return result, nil
}

func (impl LifecycleImplementation) collectObjectStoreEnvs(
ctx context.Context,
barmanObjectKey types.NamespacedName,
) ([]corev1.EnvVar, error) {
var objectStore barmancloudv1.ObjectStore
if err := impl.Client.Get(ctx, barmanObjectKey, &objectStore); err != nil {
return nil, err
}

return objectStore.Spec.InstanceSidecarConfiguration.Env, nil
}

func (impl LifecycleImplementation) reconcileJob(
ctx context.Context,
cluster *cnpgv1.Cluster,
request *lifecycle.OperatorLifecycleRequest,
pluginConfiguration *config.PluginConfiguration,
) (*lifecycle.OperatorLifecycleResponse, error) {
env, err := impl.collectAdditionalEnvs(ctx, cluster.Namespace, pluginConfiguration)
if err != nil {
return nil, nil
}

return reconcileJob(ctx, cluster, request, pluginConfiguration, env)
}

func reconcileJob(
ctx context.Context,
cluster *cnpgv1.Cluster,
request *lifecycle.OperatorLifecycleRequest,
pluginConfiguration *config.PluginConfiguration,
env []corev1.EnvVar,
) (*lifecycle.OperatorLifecycleResponse, error) {
contextLogger := log.FromContext(ctx).WithName("lifecycle")
if pluginConfig := cluster.GetRecoverySourcePlugin(); pluginConfig == nil || pluginConfig.Name != metadata.PluginName {
Expand Down Expand Up @@ -144,6 +213,7 @@ func reconcileJob(
corev1.Container{
Args: []string{"restore"},
},
env,
); err != nil {
return nil, fmt.Errorf("while reconciling pod spec for job: %w", err)
}
Expand All @@ -159,11 +229,26 @@ func reconcileJob(
}, nil
}

func (impl LifecycleImplementation) reconcilePod(
ctx context.Context,
cluster *cnpgv1.Cluster,
request *lifecycle.OperatorLifecycleRequest,
pluginConfiguration *config.PluginConfiguration,
) (*lifecycle.OperatorLifecycleResponse, error) {
env, err := impl.collectAdditionalEnvs(ctx, cluster.Namespace, pluginConfiguration)
if err != nil {
return nil, nil
}

return reconcilePod(ctx, cluster, request, pluginConfiguration, env)
}

func reconcilePod(
ctx context.Context,
cluster *cnpgv1.Cluster,
request *lifecycle.OperatorLifecycleRequest,
pluginConfiguration *config.PluginConfiguration,
env []corev1.EnvVar,
) (*lifecycle.OperatorLifecycleResponse, error) {
pod, err := decoder.DecodePodJSON(request.GetObjectDefinition())
if err != nil {
Expand All @@ -176,9 +261,16 @@ func reconcilePod(
mutatedPod := pod.DeepCopy()

if len(pluginConfiguration.BarmanObjectName) != 0 {
if err := reconcilePodSpec(pluginConfiguration, cluster, &mutatedPod.Spec, "postgres", corev1.Container{
Args: []string{"instance"},
}); err != nil {
if err := reconcilePodSpec(
pluginConfiguration,
cluster,
&mutatedPod.Spec,
"postgres",
corev1.Container{
Args: []string{"instance"},
},
env,
); err != nil {
return nil, fmt.Errorf("while reconciling pod spec for pod: %w", err)
}
} else {
Expand All @@ -202,6 +294,7 @@ func reconcilePodSpec(
spec *corev1.PodSpec,
mainContainerName string,
sidecarConfig corev1.Container,
additionalEnvs []corev1.EnvVar,
) error {
envs := []corev1.EnvVar{
{
Expand Down Expand Up @@ -246,6 +339,8 @@ func reconcilePodSpec(
)
}

envs = append(envs, additionalEnvs...)

baseProbe := &corev1.Probe{
FailureThreshold: 3,
ProbeHandler: corev1.ProbeHandler{
Expand Down
12 changes: 6 additions & 6 deletions internal/cnpgi/operator/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ var _ = Describe("LifecycleImplementation", func() {
ObjectDefinition: jobJSON,
}

response, err := reconcileJob(ctx, cluster, request, pluginConfiguration)
response, err := reconcileJob(ctx, cluster, request, pluginConfiguration, nil)
Expect(err).NotTo(HaveOccurred())
Expect(response).NotTo(BeNil())
Expect(response.JsonPatch).NotTo(BeEmpty())
Expand All @@ -128,7 +128,7 @@ var _ = Describe("LifecycleImplementation", func() {
ObjectDefinition: jobJSON,
}

response, err := reconcileJob(ctx, cluster, request, pluginConfiguration)
response, err := reconcileJob(ctx, cluster, request, pluginConfiguration, nil)
Expect(err).NotTo(HaveOccurred())
Expect(response).To(BeNil())
})
Expand All @@ -138,7 +138,7 @@ var _ = Describe("LifecycleImplementation", func() {
ObjectDefinition: []byte("invalid-json"),
}

response, err := reconcileJob(ctx, cluster, request, pluginConfiguration)
response, err := reconcileJob(ctx, cluster, request, pluginConfiguration, nil)
Expect(err).To(HaveOccurred())
Expect(response).To(BeNil())
})
Expand All @@ -165,7 +165,7 @@ var _ = Describe("LifecycleImplementation", func() {
ObjectDefinition: jobJSON,
}

response, err := reconcileJob(ctx, cluster, request, pluginConfiguration)
response, err := reconcileJob(ctx, cluster, request, pluginConfiguration, nil)
Expect(err).NotTo(HaveOccurred())
Expect(response).To(BeNil())
})
Expand All @@ -185,7 +185,7 @@ var _ = Describe("LifecycleImplementation", func() {
ObjectDefinition: podJSON,
}

response, err := reconcilePod(ctx, cluster, request, pluginConfiguration)
response, err := reconcilePod(ctx, cluster, request, pluginConfiguration, nil)
Expect(err).NotTo(HaveOccurred())
Expect(response).NotTo(BeNil())
Expect(response.JsonPatch).NotTo(BeEmpty())
Expand All @@ -203,7 +203,7 @@ var _ = Describe("LifecycleImplementation", func() {
ObjectDefinition: []byte("invalid-json"),
}

response, err := reconcilePod(ctx, cluster, request, pluginConfiguration)
response, err := reconcilePod(ctx, cluster, request, pluginConfiguration, nil)
Expect(err).To(HaveOccurred())
Expect(response).To(BeNil())
})
Expand Down
Loading
Loading