Skip to content

Commit 74853dc

Browse files
committed
Refactor Volume Auto Grow to support additional volume types
This commit refactors the existing pgData volume auto grow code to better support upcoming feature enhancements relating to auto grow capability for the pgBackRest repository volume and pg_wal volume. Issue: PGO-2606
1 parent fae895c commit 74853dc

File tree

9 files changed

+878
-592
lines changed

9 files changed

+878
-592
lines changed

internal/controller/postgrescluster/instance.go

Lines changed: 1 addition & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
appsv1 "k8s.io/api/apps/v1"
1818
corev1 "k8s.io/api/core/v1"
1919
policyv1 "k8s.io/api/policy/v1"
20-
"k8s.io/apimachinery/pkg/api/resource"
2120
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2221
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2322
"k8s.io/apimachinery/pkg/util/intstr"
@@ -360,7 +359,7 @@ func (r *Reconciler) observeInstances(
360359
// If autogrow is enabled, get the desired volume size for each instance.
361360
if autogrow {
362361
for _, instance := range observed.bySet[name] {
363-
status.DesiredPGDataVolume[instance.Name] = r.storeDesiredRequest(ctx, cluster,
362+
status.DesiredPGDataVolume[instance.Name] = r.storeDesiredRequest(ctx, cluster, "pgData",
364363
name, status.DesiredPGDataVolume[instance.Name], previousDesiredRequests[instance.Name])
365364
}
366365
}
@@ -371,67 +370,6 @@ func (r *Reconciler) observeInstances(
371370
return observed, err
372371
}
373372

374-
// storeDesiredRequest saves the appropriate request value to the PostgresCluster
375-
// status. If the value has grown, create an Event.
376-
func (r *Reconciler) storeDesiredRequest(
377-
ctx context.Context, cluster *v1beta1.PostgresCluster,
378-
instanceSetName, desiredRequest, desiredRequestBackup string,
379-
) string {
380-
var current resource.Quantity
381-
var previous resource.Quantity
382-
var err error
383-
log := logging.FromContext(ctx)
384-
385-
// Parse the desired request from the cluster's status.
386-
if desiredRequest != "" {
387-
current, err = resource.ParseQuantity(desiredRequest)
388-
if err != nil {
389-
log.Error(err, "Unable to parse pgData volume request from status ("+
390-
desiredRequest+") for "+cluster.Name+"/"+instanceSetName)
391-
// If there was an error parsing the value, treat as unset (equivalent to zero).
392-
desiredRequest = ""
393-
current, _ = resource.ParseQuantity("")
394-
395-
}
396-
}
397-
398-
// Parse the desired request from the status backup.
399-
if desiredRequestBackup != "" {
400-
previous, err = resource.ParseQuantity(desiredRequestBackup)
401-
if err != nil {
402-
log.Error(err, "Unable to parse pgData volume request from status backup ("+
403-
desiredRequestBackup+") for "+cluster.Name+"/"+instanceSetName)
404-
// If there was an error parsing the value, treat as unset (equivalent to zero).
405-
desiredRequestBackup = ""
406-
previous, _ = resource.ParseQuantity("")
407-
408-
}
409-
}
410-
411-
// Determine if the limit is set for this instance set.
412-
var limitSet bool
413-
for _, specInstance := range cluster.Spec.InstanceSets {
414-
if specInstance.Name == instanceSetName {
415-
limitSet = !specInstance.DataVolumeClaimSpec.Resources.Limits.Storage().IsZero()
416-
}
417-
}
418-
419-
if limitSet && current.Value() > previous.Value() {
420-
r.Recorder.Eventf(cluster, corev1.EventTypeNormal, "VolumeAutoGrow",
421-
"pgData volume expansion to %v requested for %s/%s.",
422-
current.String(), cluster.Name, instanceSetName)
423-
}
424-
425-
// If the desired size was not observed, update with previously stored value.
426-
// This can happen in scenarios where the annotation on the Pod is missing
427-
// such as when the cluster is shutdown or a Pod is in the middle of a restart.
428-
if desiredRequest == "" {
429-
desiredRequest = desiredRequestBackup
430-
}
431-
432-
return desiredRequest
433-
}
434-
435373
// +kubebuilder:rbac:groups="",resources="pods",verbs={list}
436374
// +kubebuilder:rbac:groups="apps",resources="statefulsets",verbs={patch}
437375

internal/controller/postgrescluster/instance_test.go

Lines changed: 0 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"testing"
1515
"time"
1616

17-
"github.com/go-logr/logr/funcr"
1817
"github.com/google/go-cmp/cmp/cmpopts"
1918
"gotest.tools/v3/assert"
2019
appsv1 "k8s.io/api/apps/v1"
@@ -36,10 +35,8 @@ import (
3635
"github.com/crunchydata/postgres-operator/internal/controller/runtime"
3736
"github.com/crunchydata/postgres-operator/internal/feature"
3837
"github.com/crunchydata/postgres-operator/internal/initialize"
39-
"github.com/crunchydata/postgres-operator/internal/logging"
4038
"github.com/crunchydata/postgres-operator/internal/naming"
4139
"github.com/crunchydata/postgres-operator/internal/testing/cmp"
42-
"github.com/crunchydata/postgres-operator/internal/testing/events"
4340
"github.com/crunchydata/postgres-operator/internal/testing/require"
4441
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
4542
)
@@ -260,121 +257,6 @@ func TestNewObservedInstances(t *testing.T) {
260257
})
261258
}
262259

263-
func TestStoreDesiredRequest(t *testing.T) {
264-
ctx := context.Background()
265-
266-
setupLogCapture := func(ctx context.Context) (context.Context, *[]string) {
267-
calls := []string{}
268-
testlog := funcr.NewJSON(func(object string) {
269-
calls = append(calls, object)
270-
}, funcr.Options{
271-
Verbosity: 1,
272-
})
273-
return logging.NewContext(ctx, testlog), &calls
274-
}
275-
276-
cluster := v1beta1.PostgresCluster{
277-
ObjectMeta: metav1.ObjectMeta{
278-
Name: "rhino",
279-
Namespace: "test-namespace",
280-
},
281-
Spec: v1beta1.PostgresClusterSpec{
282-
InstanceSets: []v1beta1.PostgresInstanceSetSpec{{
283-
Name: "red",
284-
Replicas: initialize.Int32(1),
285-
DataVolumeClaimSpec: v1beta1.VolumeClaimSpec{
286-
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
287-
Resources: corev1.VolumeResourceRequirements{
288-
Limits: map[corev1.ResourceName]resource.Quantity{
289-
corev1.ResourceStorage: resource.MustParse("1Gi"),
290-
}}},
291-
}, {
292-
Name: "blue",
293-
Replicas: initialize.Int32(1),
294-
}}}}
295-
296-
t.Run("BadRequestNoBackup", func(t *testing.T) {
297-
recorder := events.NewRecorder(t, runtime.Scheme)
298-
reconciler := &Reconciler{Recorder: recorder}
299-
ctx, logs := setupLogCapture(ctx)
300-
301-
value := reconciler.storeDesiredRequest(ctx, &cluster, "red", "woot", "")
302-
303-
assert.Equal(t, value, "")
304-
assert.Equal(t, len(recorder.Events), 0)
305-
assert.Equal(t, len(*logs), 1)
306-
assert.Assert(t, cmp.Contains((*logs)[0], "Unable to parse pgData volume request from status"))
307-
})
308-
309-
t.Run("BadRequestWithBackup", func(t *testing.T) {
310-
recorder := events.NewRecorder(t, runtime.Scheme)
311-
reconciler := &Reconciler{Recorder: recorder}
312-
ctx, logs := setupLogCapture(ctx)
313-
314-
value := reconciler.storeDesiredRequest(ctx, &cluster, "red", "foo", "1Gi")
315-
316-
assert.Equal(t, value, "1Gi")
317-
assert.Equal(t, len(recorder.Events), 0)
318-
assert.Equal(t, len(*logs), 1)
319-
assert.Assert(t, cmp.Contains((*logs)[0], "Unable to parse pgData volume request from status (foo) for rhino/red"))
320-
})
321-
322-
t.Run("NoLimitNoEvent", func(t *testing.T) {
323-
recorder := events.NewRecorder(t, runtime.Scheme)
324-
reconciler := &Reconciler{Recorder: recorder}
325-
ctx, logs := setupLogCapture(ctx)
326-
327-
value := reconciler.storeDesiredRequest(ctx, &cluster, "blue", "1Gi", "")
328-
329-
assert.Equal(t, value, "1Gi")
330-
assert.Equal(t, len(*logs), 0)
331-
assert.Equal(t, len(recorder.Events), 0)
332-
})
333-
334-
t.Run("BadBackupRequest", func(t *testing.T) {
335-
recorder := events.NewRecorder(t, runtime.Scheme)
336-
reconciler := &Reconciler{Recorder: recorder}
337-
ctx, logs := setupLogCapture(ctx)
338-
339-
value := reconciler.storeDesiredRequest(ctx, &cluster, "red", "2Gi", "bar")
340-
341-
assert.Equal(t, value, "2Gi")
342-
assert.Equal(t, len(*logs), 1)
343-
assert.Assert(t, cmp.Contains((*logs)[0], "Unable to parse pgData volume request from status backup (bar) for rhino/red"))
344-
assert.Equal(t, len(recorder.Events), 1)
345-
assert.Equal(t, recorder.Events[0].Regarding.Name, cluster.Name)
346-
assert.Equal(t, recorder.Events[0].Reason, "VolumeAutoGrow")
347-
assert.Equal(t, recorder.Events[0].Note, "pgData volume expansion to 2Gi requested for rhino/red.")
348-
})
349-
350-
t.Run("ValueUpdateWithEvent", func(t *testing.T) {
351-
recorder := events.NewRecorder(t, runtime.Scheme)
352-
reconciler := &Reconciler{Recorder: recorder}
353-
ctx, logs := setupLogCapture(ctx)
354-
355-
value := reconciler.storeDesiredRequest(ctx, &cluster, "red", "1Gi", "")
356-
357-
assert.Equal(t, value, "1Gi")
358-
assert.Equal(t, len(*logs), 0)
359-
assert.Equal(t, len(recorder.Events), 1)
360-
assert.Equal(t, recorder.Events[0].Regarding.Name, cluster.Name)
361-
assert.Equal(t, recorder.Events[0].Reason, "VolumeAutoGrow")
362-
assert.Equal(t, recorder.Events[0].Note, "pgData volume expansion to 1Gi requested for rhino/red.")
363-
})
364-
365-
t.Run("NoLimitNoEvent", func(t *testing.T) {
366-
recorder := events.NewRecorder(t, runtime.Scheme)
367-
reconciler := &Reconciler{Recorder: recorder}
368-
ctx, logs := setupLogCapture(ctx)
369-
370-
value := reconciler.storeDesiredRequest(ctx, &cluster, "blue", "1Gi", "")
371-
372-
assert.Equal(t, value, "1Gi")
373-
assert.Equal(t, len(*logs), 0)
374-
assert.Equal(t, len(recorder.Events), 0)
375-
})
376-
}
377-
378260
func TestWritablePod(t *testing.T) {
379261
container := "container"
380262

internal/controller/postgrescluster/postgres.go

Lines changed: 1 addition & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"github.com/pkg/errors"
2020
appsv1 "k8s.io/api/apps/v1"
2121
corev1 "k8s.io/api/core/v1"
22-
"k8s.io/apimachinery/pkg/api/resource"
2322
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2423
"k8s.io/apimachinery/pkg/labels"
2524
"k8s.io/apimachinery/pkg/util/sets"
@@ -775,7 +774,7 @@ func (r *Reconciler) reconcilePostgresDataVolume(
775774
}
776775
}
777776

778-
r.setVolumeSize(ctx, cluster, pvc, instanceSpec.Name)
777+
r.setVolumeSize(ctx, cluster, pvc, "pgData", instanceSpec.Name)
779778

780779
// Clear any set limit before applying PVC. This is needed to allow the limit
781780
// value to change later.
@@ -789,75 +788,6 @@ func (r *Reconciler) reconcilePostgresDataVolume(
789788
return pvc, err
790789
}
791790

792-
// setVolumeSize compares the potential sizes from the instance spec, status
793-
// and limit and sets the appropriate current value.
794-
func (r *Reconciler) setVolumeSize(ctx context.Context, cluster *v1beta1.PostgresCluster,
795-
pvc *corev1.PersistentVolumeClaim, instanceSpecName string) {
796-
log := logging.FromContext(ctx)
797-
798-
// Store the limit for this instance set. This value will not change below.
799-
volumeLimitFromSpec := pvc.Spec.Resources.Limits.Storage()
800-
801-
// Capture the largest pgData volume size currently defined for a given instance set.
802-
// This value will capture our desired update.
803-
volumeRequestSize := pvc.Spec.Resources.Requests.Storage()
804-
805-
// If the request value is greater than the set limit, use the limit and issue
806-
// a warning event. A limit of 0 is ignorned.
807-
if !volumeLimitFromSpec.IsZero() &&
808-
volumeRequestSize.Value() > volumeLimitFromSpec.Value() {
809-
r.Recorder.Eventf(cluster, corev1.EventTypeWarning, "VolumeRequestOverLimit",
810-
"pgData volume request (%v) for %s/%s is greater than set limit (%v). Limit value will be used.",
811-
volumeRequestSize, cluster.Name, instanceSpecName, volumeLimitFromSpec)
812-
813-
pvc.Spec.Resources.Requests = corev1.ResourceList{
814-
corev1.ResourceStorage: *resource.NewQuantity(volumeLimitFromSpec.Value(), resource.BinarySI),
815-
}
816-
// Otherwise, if the limit is not set or the feature gate is not enabled, do not autogrow.
817-
} else if !volumeLimitFromSpec.IsZero() && feature.Enabled(ctx, feature.AutoGrowVolumes) {
818-
for i := range cluster.Status.InstanceSets {
819-
if instanceSpecName == cluster.Status.InstanceSets[i].Name {
820-
for _, dpv := range cluster.Status.InstanceSets[i].DesiredPGDataVolume {
821-
if dpv != "" {
822-
desiredRequest, err := resource.ParseQuantity(dpv)
823-
if err == nil {
824-
if desiredRequest.Value() > volumeRequestSize.Value() {
825-
volumeRequestSize = &desiredRequest
826-
}
827-
} else {
828-
log.Error(err, "Unable to parse volume request: "+dpv)
829-
}
830-
}
831-
}
832-
}
833-
}
834-
835-
// If the volume request size is greater than or equal to the limit and the
836-
// limit is not zero, update the request size to the limit value.
837-
// If the user manually requests a lower limit that is smaller than the current
838-
// or requested volume size, it will be ignored in favor of the limit value.
839-
if volumeRequestSize.Value() >= volumeLimitFromSpec.Value() {
840-
841-
r.Recorder.Eventf(cluster, corev1.EventTypeNormal, "VolumeLimitReached",
842-
"pgData volume(s) for %s/%s are at size limit (%v).", cluster.Name,
843-
instanceSpecName, volumeLimitFromSpec)
844-
845-
// If the volume size request is greater than the limit, issue an
846-
// additional event warning.
847-
if volumeRequestSize.Value() > volumeLimitFromSpec.Value() {
848-
r.Recorder.Eventf(cluster, corev1.EventTypeWarning, "DesiredVolumeAboveLimit",
849-
"The desired size (%v) for the %s/%s pgData volume(s) is greater than the size limit (%v).",
850-
volumeRequestSize, cluster.Name, instanceSpecName, volumeLimitFromSpec)
851-
}
852-
853-
volumeRequestSize = volumeLimitFromSpec
854-
}
855-
pvc.Spec.Resources.Requests = corev1.ResourceList{
856-
corev1.ResourceStorage: *resource.NewQuantity(volumeRequestSize.Value(), resource.BinarySI),
857-
}
858-
}
859-
}
860-
861791
// +kubebuilder:rbac:groups="",resources="persistentvolumeclaims",verbs={create,patch}
862792

863793
// reconcileTablespaceVolumes writes the PersistentVolumeClaims for instance's

0 commit comments

Comments
 (0)