Skip to content

Commit d0fbcdf

Browse files
mnenciaNiccoloFei
andauthored
test(e2e): spread pods across nodes with topology spread constraint (cloudnative-pg#10070)
Add a TopologySpreadConstraint to all E2E test clusters so that CNPG-managed pods (including initdb/join jobs) are spread across available nodes. Without this, concurrent test clusters' job pods can co-locate on a single node, causing I/O contention and timeouts. Since PVCs with WaitForFirstConsumer bind to the job's node, this co-location also locks subsequent instance pods there, defeating instance-level anti-affinity. The constraint uses ScheduleAnyway with maxSkew=1, matching all pods with the app.kubernetes.io/managed-by=cloudnative-pg label, which covers both instance pods and job pods across all clusters. Closes cloudnative-pg#10068 Signed-off-by: Marco Nenciarini <marco.nenciarini@enterprisedb.com> Signed-off-by: Niccolò Fei <niccolo.fei@enterprisedb.com> Co-authored-by: Niccolò Fei <niccolo.fei@enterprisedb.com>
1 parent 248a8c1 commit d0fbcdf

File tree

6 files changed

+29
-0
lines changed

6 files changed

+29
-0
lines changed

tests/e2e/asserts_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2685,6 +2685,9 @@ func CreateResourcesFromFileWithError(namespace, sampleFilePath string) error {
26852685
return wrapErr(err)
26862686
}
26872687
for _, obj := range objects {
2688+
if cluster, ok := obj.(*apiv1.Cluster); ok {
2689+
clusterutils.AddTopologySpreadConstraint(cluster)
2690+
}
26882691
_, err := objectsutils.Create(env.Ctx, env.Client, obj)
26892692
if err != nil {
26902693
return wrapErr(err)

tests/e2e/cluster_major_upgrade_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,7 @@ var _ = Describe("Postgres Major Upgrade", Label(tests.LabelPostgresMajorUpgrade
452452
}
453453

454454
cluster := scenario.startingCluster
455+
clusterutils.AddTopologySpreadConstraint(cluster)
455456
err := env.Client.Create(env.Ctx, cluster)
456457
Expect(err).NotTo(HaveOccurred())
457458
AssertClusterIsReady(cluster.Namespace, cluster.Name, testTimeouts[timeouts.ClusterIsReady],

tests/e2e/configuration_update_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ var _ = Describe("Configuration update", Label(tests.LabelClusterMetadata), func
273273
cluster := generateBaseCluster(namespace)
274274
cluster.Spec.ImageName = env.MinimalImageName(targetTag)
275275
cluster.Spec.PrimaryUpdateMethod = apiv1.PrimaryUpdateMethodSwitchover
276+
clusterutils.AddTopologySpreadConstraint(cluster)
276277
err = env.Client.Create(env.Ctx, cluster)
277278
Expect(err).NotTo(HaveOccurred())
278279
AssertClusterIsReady(cluster.Namespace, cluster.Name, testTimeouts[timeouts.ClusterIsReady], env)
@@ -468,6 +469,7 @@ var _ = Describe("Configuration update", Label(tests.LabelClusterMetadata), func
468469
cluster := generateBaseCluster(namespace)
469470
cluster.Spec.ImageName = env.MinimalImageName(targetTag)
470471
cluster.Spec.PrimaryUpdateMethod = apiv1.PrimaryUpdateMethodRestart
472+
clusterutils.AddTopologySpreadConstraint(cluster)
471473
err = env.Client.Create(env.Ctx, cluster)
472474
Expect(err).NotTo(HaveOccurred())
473475
AssertClusterIsReady(cluster.Namespace, cluster.Name, testTimeouts[timeouts.ClusterIsReady], env)

tests/e2e/imagevolume_extensions_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ var _ = Describe("ImageVolume Extensions", Label(tests.LabelImageVolumeExtension
347347
}
348348
err := env.Client.Create(env.Ctx, catalog)
349349
Expect(err).ToNot(HaveOccurred())
350+
clusterutils.AddTopologySpreadConstraint(cluster)
350351
err = env.Client.Create(env.Ctx, cluster)
351352
Expect(err).ToNot(HaveOccurred())
352353
AssertClusterIsReady(namespace, clusterName, testTimeouts[timeouts.ClusterIsReady], env)

tests/e2e/rolling_update_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,7 @@ var _ = Describe("Rolling updates", Label(tests.LabelPostgresConfiguration), fun
379379
clusterName := cluster.Name
380380
err := env.Client.Create(env.Ctx, catalog)
381381
Expect(err).ToNot(HaveOccurred())
382+
clusterutils.AddTopologySpreadConstraint(cluster)
382383
err = env.Client.Create(env.Ctx, cluster)
383384
Expect(err).ToNot(HaveOccurred())
384385
AssertClusterIsReady(namespace, clusterName, testTimeouts[timeouts.ClusterIsReady], env)

tests/utils/clusterutils/cluster.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"fmt"
2626

2727
corev1 "k8s.io/api/core/v1"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2829
"k8s.io/apimachinery/pkg/types"
2930
"sigs.k8s.io/controller-runtime/pkg/client"
3031

@@ -225,6 +226,26 @@ func GetFirstReplica(
225226
return &podList.Items[0], nil
226227
}
227228

229+
// AddTopologySpreadConstraint appends a soft topology spread constraint
230+
// that distributes all CNPG-managed pods (instances and jobs) across
231+
// nodes. Call this on E2E clusters to prevent co-location of concurrent
232+
// test workloads.
233+
func AddTopologySpreadConstraint(cluster *apiv1.Cluster) {
234+
cluster.Spec.TopologySpreadConstraints = append(
235+
cluster.Spec.TopologySpreadConstraints,
236+
corev1.TopologySpreadConstraint{
237+
MaxSkew: 1,
238+
TopologyKey: "kubernetes.io/hostname",
239+
WhenUnsatisfiable: corev1.ScheduleAnyway,
240+
LabelSelector: &metav1.LabelSelector{
241+
MatchLabels: map[string]string{
242+
utils.KubernetesAppManagedByLabelName: utils.ManagerName,
243+
},
244+
},
245+
},
246+
)
247+
}
248+
228249
// ScaleSize scales a cluster to the requested size
229250
func ScaleSize(
230251
ctx context.Context,

0 commit comments

Comments
 (0)