Skip to content
Merged
4 changes: 4 additions & 0 deletions e2e-tests/pvc-auto-resize/run
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,13 @@ apply_resourcequota 30Gi
echo "Continuing to fill disk to test maxSize enforcement"
fill_disk_to_threshold "${cluster}-rs0-0" 80 4000
wait_for_auto_resize "mongod-data-${cluster}-rs0-0" "7Gi" 60 10
wait_for_auto_resize "mongod-data-${cluster}-rs0-1" "7Gi" 60 10
wait_for_auto_resize "mongod-data-${cluster}-rs0-2" "7Gi" 60 10

fill_disk_to_threshold "${cluster}-rs0-0" 80 5600
wait_for_auto_resize "mongod-data-${cluster}-rs0-0" "9Gi" 60 10
wait_for_auto_resize "mongod-data-${cluster}-rs0-1" "9Gi" 60 10
wait_for_auto_resize "mongod-data-${cluster}-rs0-2" "9Gi" 60 10

fill_disk_to_threshold "${cluster}-rs0-0" 80 7200

Expand Down
27 changes: 25 additions & 2 deletions pkg/controller/perconaservermongodb/metrics_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ import (
"context"
"strconv"
"strings"
"time"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"

"github.com/percona/percona-server-mongodb-operator/pkg/naming"
"github.com/percona/percona-server-mongodb-operator/pkg/psmdb/config"
)

Expand All @@ -25,6 +29,16 @@ func (r *ReconcilePerconaServerMongoDB) getPVCUsageFromMetrics(
pod *corev1.Pod,
pvcName string,
) (*PVCUsage, error) {
if pod == nil {
return nil, errors.New("pod is nil")
}

backoff := wait.Backoff{
Steps: 5,
Duration: 5 * time.Second,
Factor: 2.0,
}

// Execute df command in the mongod container to get disk usage
// df -B1 /data/db outputs in bytes
// Example output:
Expand All @@ -33,9 +47,18 @@ func (r *ReconcilePerconaServerMongoDB) getPVCUsageFromMetrics(
var stdout, stderr bytes.Buffer
command := []string{"df", "-B1", config.MongodContainerDataDir}

err := r.clientcmd.Exec(ctx, pod, "mongod", command, nil, &stdout, &stderr, false)
err := retry.OnError(backoff, func(err error) bool { return true }, func() error {
stdout.Reset()
stderr.Reset()

err := r.clientcmd.Exec(ctx, pod, naming.ComponentMongod, command, nil, &stdout, &stderr, false)
if err != nil {
return errors.Wrapf(err, "failed to execute df in pod %s: %s", pod.Name, stderr.String())
}
return nil
})
if err != nil {
return nil, errors.Wrapf(err, "failed to execute df in pod %s: %s", pod.Name, stderr.String())
return nil, errors.Wrap(err, "wait for df execution")
}

lines := strings.Split(strings.TrimSpace(stdout.String()), "\n")
Expand Down
18 changes: 18 additions & 0 deletions pkg/controller/perconaservermongodb/metrics_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,24 @@ func TestGetPVCUsageFromMetrics(t *testing.T) {
Name: "test-pod-0",
Namespace: "test-namespace",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "mongod",
},
},
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
ContainerStatuses: []corev1.ContainerStatus{
{
Name: "mongod",
State: corev1.ContainerState{
Running: &corev1.ContainerStateRunning{},
},
},
},
},
}

result, err := r.getPVCUsageFromMetrics(ctx, pod, tt.pvcName)
Expand Down
8 changes: 8 additions & 0 deletions pkg/controller/perconaservermongodb/mgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -448,6 +449,10 @@ func (r *ReconcilePerconaServerMongoDB) updateConfigMembers(ctx context.Context,

err = cli.WriteConfig(ctx, cnf, false)
if err != nil {
if strings.Contains(err.Error(), "NodeNotFound") {
log.V(1).Info("NodeNotFound error during replset reconfig after removing old members, will retry on next reconcile", "replset", rs.Name)
return rsMembers, 0, nil
}
return rsMembers, 0, errors.Wrap(err, "delete: write mongo config")
}
}
Expand Down Expand Up @@ -1063,6 +1068,9 @@ func (r *ReconcilePerconaServerMongoDB) restoreInProgress(ctx context.Context, c
stsName := cr.Name + "-" + replset.Name
nn := types.NamespacedName{Name: stsName, Namespace: cr.Namespace}
if err := r.client.Get(ctx, nn, &sts); err != nil {
if k8serrors.IsNotFound(err) {
return false, nil
}
return false, errors.Wrapf(err, "get statefulset %s", stsName)
}
_, ok := sts.Annotations[api.AnnotationRestoreInProgress]
Expand Down
21 changes: 13 additions & 8 deletions pkg/controller/perconaservermongodb/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,23 @@ func (r *ReconcilePerconaServerMongoDB) reconcileStatefulSet(ctx context.Context
return sfs, nil
}

if err := r.reconcilePVCs(ctx, cr, sfs, ls, volumeSpec); err != nil {
return nil, errors.Wrapf(err, "reconcile PVCs for %s", sfs.Name)
}

// (non-blocking)
if err := r.reconcileStorageAutoscaling(ctx, cr, sfs, volumeSpec, ls); err != nil {
log.Error(err, "failed to reconcile storage autoscaling", "statefulset", sfs.Name)
}

if _, ok := sfs.Annotations[api.AnnotationPVCResizeInProgress]; ok {
log.V(1).Info("PVC resize in progress, skipping reconciliation of statefulset", "name", sfs.Name)
return sfs, nil
if err := r.reconcilePVCs(ctx, cr, sfs, ls, volumeSpec); err != nil {
return nil, errors.Wrapf(err, "reconcile PVCs for %s", sfs.Name)
}

// Re-read the StatefulSet from the cluster to check for the PVC resize
// annotation, as it may have been set during reconcilePVCs and the local
// sfs object would be stale.
currentSts := new(appsv1.StatefulSet)
if err := r.client.Get(ctx, types.NamespacedName{Name: sfs.Name, Namespace: sfs.Namespace}, currentSts); err == nil {
if _, ok := currentSts.Annotations[api.AnnotationPVCResizeInProgress]; ok {
log.V(1).Info("PVC resize in progress, skipping reconciliation of statefulset", "name", sfs.Name)
return sfs, nil
}
}

err = r.createOrUpdate(ctx, sfs)
Expand Down
26 changes: 15 additions & 11 deletions pkg/controller/perconaservermongodb/volume_autoscaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"

api "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1"
"github.com/percona/percona-server-mongodb-operator/pkg/naming"
"github.com/percona/percona-server-mongodb-operator/pkg/psmdb/config"
)

Expand Down Expand Up @@ -85,7 +86,7 @@ func (r *ReconcilePerconaServerMongoDB) reconcileStorageAutoscaling(

if err := r.checkAndResizePVC(ctx, cr, &pvc, pod, volumeSpec); err != nil {
log.Error(err, "failed to check/resize PVC", "pvc", pvc.Name)
r.updateAutoscalingStatus(cr, pvc.Name, nil, err)
r.updateAutoscalingStatus(ctx, cr, pvc.Name, nil, err)
}
}

Expand All @@ -102,8 +103,8 @@ func (r *ReconcilePerconaServerMongoDB) checkAndResizePVC(
) error {
log := logf.FromContext(ctx).WithName("StorageAutoscaling").WithValues("pvc", pvc.Name)

if pod.Status.Phase != corev1.PodRunning {
log.V(1).Info("skipping PVC check: pod not running", "phase", pod.Status.Phase)
if !isContainerAndPodRunning(*pod, naming.ComponentMongod) {
log.V(1).Info("skipping PVC metrics check: container and pod not running", "phase", pod.Status.Phase)
return nil
}

Expand All @@ -112,7 +113,7 @@ func (r *ReconcilePerconaServerMongoDB) checkAndResizePVC(
return errors.Wrap(err, "get PVC usage from metrics")
}

r.updateAutoscalingStatus(cr, pvc.Name, usage, nil)
r.updateAutoscalingStatus(ctx, cr, pvc.Name, usage, nil)

if !r.shouldTriggerResize(ctx, cr, pvc, usage) {
return nil
Expand Down Expand Up @@ -140,9 +141,6 @@ func (r *ReconcilePerconaServerMongoDB) shouldTriggerResize(
config := cr.Spec.StorageAutoscaling()

if usage.UsagePercent < config.TriggerThresholdPercent {
log.V(1).Info("usage below threshold",
"usage", usage.UsagePercent,
"threshold", config.TriggerThresholdPercent)
return false
}

Expand Down Expand Up @@ -201,12 +199,11 @@ func (r *ReconcilePerconaServerMongoDB) triggerResize(
) error {
log := logf.FromContext(ctx).WithName("StorageAutoscaling").WithValues("pvc", pvc.Name)

patch := client.MergeFrom(cr.DeepCopy())
orig := cr.DeepCopy()

// We are modifying cr directly through the pointer. So the original cr object does get the storage size updated.
volumeSpec.PersistentVolumeClaim.Resources.Requests[corev1.ResourceStorage] = newSize

if err := r.client.Patch(ctx, cr.DeepCopy(), patch); err != nil {
if err := r.client.Patch(ctx, cr.DeepCopy(), client.MergeFrom(orig)); err != nil {
return errors.Wrap(err, "patch CR with new storage size")
}

Expand All @@ -219,11 +216,19 @@ func (r *ReconcilePerconaServerMongoDB) triggerResize(

// updateAutoscalingStatus updates the status for a specific PVC
func (r *ReconcilePerconaServerMongoDB) updateAutoscalingStatus(
ctx context.Context,
cr *api.PerconaServerMongoDB,
pvcName string,
usage *PVCUsage,
err error,
) {
log := logf.FromContext(ctx).WithName("StorageAutoscaling")

if pvcName == "" {
log.V(1).Info("no pvc name specified")
return
}

if cr.Status.StorageAutoscaling == nil {
cr.Status.StorageAutoscaling = make(map[string]api.StorageAutoscalingStatus)
}
Expand All @@ -239,7 +244,6 @@ func (r *ReconcilePerconaServerMongoDB) updateAutoscalingStatus(
status.ResizeCount++
}
}

status.CurrentSize = newSize.String()
status.LastError = ""
}
Expand Down
13 changes: 3 additions & 10 deletions pkg/controller/perconaservermongodb/volume_autoscaling_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package perconaservermongodb

import (
"context"
"errors"
"testing"

Expand All @@ -19,7 +18,6 @@ import (

func TestShouldTriggerResize(t *testing.T) {
r := &ReconcilePerconaServerMongoDB{}
ctx := context.Background()

tests := []struct {
name string
Expand Down Expand Up @@ -139,7 +137,7 @@ func TestShouldTriggerResize(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := r.shouldTriggerResize(ctx, tt.cr, tt.pvc, tt.usage)
result := r.shouldTriggerResize(t.Context(), tt.cr, tt.pvc, tt.usage)
assert.Equal(t, tt.expected, result)
})
}
Expand Down Expand Up @@ -410,7 +408,6 @@ func TestUpdateAutoscalingStatus(t *testing.T) {
TotalBytes: 10 * 1024 * 1024 * 1024,
UsagePercent: 50,
},
err: nil,
expectedStatus: api.StorageAutoscalingStatus{
CurrentSize: "10Gi",
LastError: "",
Expand All @@ -429,7 +426,6 @@ func TestUpdateAutoscalingStatus(t *testing.T) {
},
},
pvcName: "mongod-data-test-rs0-0",
usage: nil,
err: errors.New("connection refused"),
expectedStatus: api.StorageAutoscalingStatus{
CurrentSize: "10Gi",
Expand All @@ -453,7 +449,6 @@ func TestUpdateAutoscalingStatus(t *testing.T) {
TotalBytes: 20 * 1024 * 1024 * 1024,
UsagePercent: 40,
},
err: nil,
expectedStatus: api.StorageAutoscalingStatus{
CurrentSize: "20Gi",
LastError: "",
Expand All @@ -464,7 +459,7 @@ func TestUpdateAutoscalingStatus(t *testing.T) {

for name, tt := range tests {
t.Run(name, func(t *testing.T) {
r.updateAutoscalingStatus(tt.cr, tt.pvcName, tt.usage, tt.err)
r.updateAutoscalingStatus(t.Context(), tt.cr, tt.pvcName, tt.usage, tt.err)

require.NotNil(t, tt.cr.Status.StorageAutoscaling)
status, ok := tt.cr.Status.StorageAutoscaling[tt.pvcName]
Expand All @@ -482,8 +477,6 @@ func TestUpdateAutoscalingStatus(t *testing.T) {
}

func TestTriggerResize(t *testing.T) {
ctx := context.Background()

tests := map[string]struct {
cr *api.PerconaServerMongoDB
pvc *corev1.PersistentVolumeClaim
Expand Down Expand Up @@ -695,7 +688,7 @@ func TestTriggerResize(t *testing.T) {

originalSize := volumeSpec.PersistentVolumeClaim.Resources.Requests[corev1.ResourceStorage]

err = r.triggerResize(ctx, tt.cr, tt.pvc, tt.newSize, volumeSpec)
err = r.triggerResize(t.Context(), tt.cr, tt.pvc, tt.newSize, volumeSpec)
require.NoError(t, err)

updatedSize := volumeSpec.PersistentVolumeClaim.Resources.Requests[corev1.ResourceStorage]
Expand Down
7 changes: 7 additions & 0 deletions pkg/controller/perconaservermongodb/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,13 @@ func (r *ReconcilePerconaServerMongoDB) resizeVolumesIfNeeded(ctx context.Contex
continue
}

// Re-read the PVC to get the latest resourceVersion before updating,
// as it may have been modified since the initial list (e.g. by fixVolumeLabels
// or by the Kubernetes PVC controller).
if err := r.client.Get(ctx, client.ObjectKeyFromObject(&pvc), &pvc); err != nil {
return errors.Wrapf(err, "get persistentvolumeclaim/%s", pvc.Name)
}

if pvc.Status.Capacity.Storage().Cmp(requested) == 0 {
log.Info("PVC already resized", "name", pvc.Name, "actual", pvc.Status.Capacity.Storage(), "requested", requested)
continue
Expand Down
Loading