Skip to content

Commit 0b75a70

Browse files
authored
Merge pull request #6474 from zhzhuang-zju/binding
Fix workload scale up bypass FederatedResourceQuota check issue
2 parents b20715e + 7b9bf2f commit 0b75a70

File tree

4 files changed

+118
-20
lines changed

4 files changed

+118
-20
lines changed

pkg/controllers/binding/common.go

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -48,26 +48,30 @@ func ensureWork(
4848
) error {
4949
bindingSpec := getBindingSpec(binding, scope)
5050
targetClusters := mergeTargetClusters(bindingSpec.Clusters, bindingSpec.RequiredBy)
51+
var err error
52+
var errs []error
5153

5254
var jobCompletions []workv1alpha2.TargetCluster
53-
var err error
54-
if workload.GetKind() == util.JobKind {
55+
if workload.GetKind() == util.JobKind && needReviseJobCompletions(bindingSpec.Replicas, bindingSpec.Placement) {
5556
jobCompletions, err = divideReplicasByJobCompletions(workload, targetClusters)
5657
if err != nil {
5758
return err
5859
}
5960
}
6061

61-
var errs []error
6262
for i := range targetClusters {
6363
targetCluster := targetClusters[i]
6464
clonedWorkload := workload.DeepCopy()
6565

6666
workNamespace := names.GenerateExecutionSpaceName(targetCluster.Name)
6767

68-
// If and only if the resource template has replicas, and the replica scheduling policy is divided,
69-
// we need to revise replicas.
70-
if needReviseReplicas(bindingSpec.Replicas, bindingSpec.Placement) {
68+
// When syncing workloads to member clusters, the controller MUST strictly adhere to the scheduling results
69+
// specified in bindingSpec.Clusters for replica allocation, rather than using the replicas declared in the
70+
// workload's resource template.
71+
// This rule applies regardless of whether the workload distribution mode is "Divided" or "Duplicated".
72+
// Failing to do so could allow workloads to bypass the quota checks performed by the scheduler
73+
// (especially during scale-up operations) or skip queue validation when scheduling is suspended.
74+
if needReviseReplicas(bindingSpec.Replicas) {
7175
if resourceInterpreter.HookEnabled(clonedWorkload.GroupVersionKind(), configv1alpha1.InterpreterOperationReviseReplica) {
7276
clonedWorkload, err = resourceInterpreter.ReviseReplica(clonedWorkload, int64(targetCluster.Replicas))
7377
if err != nil {
@@ -77,18 +81,22 @@ func ensureWork(
7781
continue
7882
}
7983
}
84+
}
8085

86+
// jobSpec.Completions specifies the desired number of successfully finished pods the job should be run with.
87+
// When the replica scheduling policy is set to "divided", jobSpec.Completions should also be divided accordingly.
88+
// The weight assigned to each cluster roughly equals that cluster's jobSpec.Parallelism value. This approach helps
89+
// balance the execution time of the job across member clusters.
90+
if len(jobCompletions) > 0 {
8191
// Set allocated completions for Job only when the '.spec.completions' field not omitted from resource template.
8292
// For jobs running with a 'work queue' usually leaves '.spec.completions' unset, in that case we skip
8393
// setting this field as well.
8494
// Refer to: https://kubernetes.io/docs/concepts/workloads/controllers/job/#parallel-jobs.
85-
if len(jobCompletions) > 0 {
86-
if err = helper.ApplyReplica(clonedWorkload, int64(jobCompletions[i].Replicas), util.CompletionsField); err != nil {
87-
klog.Errorf("Failed to apply Completions for %s/%s/%s in cluster %s, err is: %v",
88-
clonedWorkload.GetKind(), clonedWorkload.GetNamespace(), clonedWorkload.GetName(), targetCluster.Name, err)
89-
errs = append(errs, err)
90-
continue
91-
}
95+
if err = helper.ApplyReplica(clonedWorkload, int64(jobCompletions[i].Replicas), util.CompletionsField); err != nil {
96+
klog.Errorf("Failed to apply Completions for %s/%s/%s in cluster %s, err is: %v",
97+
clonedWorkload.GetKind(), clonedWorkload.GetNamespace(), clonedWorkload.GetName(), targetCluster.Name, err)
98+
errs = append(errs, err)
99+
continue
92100
}
93101
}
94102

@@ -137,10 +145,8 @@ func ensureWork(
137145
continue
138146
}
139147
}
140-
if len(errs) > 0 {
141-
return errors.NewAggregate(errs)
142-
}
143-
return nil
148+
149+
return errors.NewAggregate(errs)
144150
}
145151

146152
func getBindingSpec(binding metav1.Object, scope apiextensionsv1.ResourceScope) workv1alpha2.ResourceBindingSpec {
@@ -312,7 +318,11 @@ func divideReplicasByJobCompletions(workload *unstructured.Unstructured, cluster
312318
return targetClusters, nil
313319
}
314320

315-
func needReviseReplicas(replicas int32, placement *policyv1alpha1.Placement) bool {
321+
func needReviseReplicas(replicas int32) bool {
322+
return replicas > 0
323+
}
324+
325+
func needReviseJobCompletions(replicas int32, placement *policyv1alpha1.Placement) bool {
316326
return replicas > 0 && placement != nil && placement.ReplicaSchedulingType() == policyv1alpha1.ReplicaSchedulingTypeDivided
317327
}
318328

pkg/controllers/binding/common_test.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,34 @@ func Test_shouldSuspendDispatching(t *testing.T) {
382382
}
383383

384384
func Test_needReviseReplicas(t *testing.T) {
385+
tests := []struct {
386+
name string
387+
replicas int32
388+
placement *policyv1alpha1.Placement
389+
want bool
390+
}{
391+
{
392+
name: "replicas is zero",
393+
replicas: 0,
394+
want: false,
395+
},
396+
{
397+
name: "replicas is greater than zero",
398+
replicas: 1,
399+
want: true,
400+
},
401+
}
402+
403+
for _, tt := range tests {
404+
t.Run(tt.name, func(t *testing.T) {
405+
if got := needReviseReplicas(tt.replicas); got != tt.want {
406+
t.Errorf("needReviseReplicas() = %v, want %v", got, tt.want)
407+
}
408+
})
409+
}
410+
}
411+
412+
func Test_needReviseJobCompletions(t *testing.T) {
385413
tests := []struct {
386414
name string
387415
replicas int32
@@ -428,8 +456,8 @@ func Test_needReviseReplicas(t *testing.T) {
428456

429457
for _, tt := range tests {
430458
t.Run(tt.name, func(t *testing.T) {
431-
if got := needReviseReplicas(tt.replicas, tt.placement); got != tt.want {
432-
t.Errorf("needReviseReplicas() = %v, want %v", got, tt.want)
459+
if got := needReviseJobCompletions(tt.replicas, tt.placement); got != tt.want {
460+
t.Errorf("needReviseJobCompletions() = %v, want %v", got, tt.want)
433461
}
434462
})
435463
}

test/e2e/suites/base/federatedresourcequota_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
appsv1 "k8s.io/api/apps/v1"
2929
corev1 "k8s.io/api/core/v1"
3030
apierrors "k8s.io/apimachinery/pkg/api/errors"
31+
"k8s.io/apimachinery/pkg/api/meta"
3132
"k8s.io/apimachinery/pkg/api/resource"
3233
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3334
"k8s.io/apimachinery/pkg/types"
@@ -275,6 +276,9 @@ var _ = ginkgo.Describe("FederatedResourceQuota enforcement testing", func() {
275276
deployNamespace = fmt.Sprintf("karmadatest-%s", rand.String(RandomStrLength))
276277
err := setupTestNamespace(deployNamespace, kubeClient)
277278
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
279+
ginkgo.DeferCleanup(func() {
280+
framework.RemoveNamespace(kubeClient, deployNamespace)
281+
})
278282
frqNamespace = deployNamespace
279283
frqName = federatedResourceQuotaPrefix + rand.String(RandomStrLength)
280284
clusterNames = framework.ClusterNames()[:1]
@@ -417,11 +421,62 @@ var _ = ginkgo.Describe("FederatedResourceQuota enforcement testing", func() {
417421
framework.WaitEventFitWith(kubeClient, newDeploymentNamespace, newRB, func(event corev1.Event) bool {
418422
return event.Reason == events.EventReasonScheduleBindingFailed && strings.Contains(event.Message, admissionWebhookDenyMsgPrefix)
419423
})
424+
framework.WaitEventFitWith(kubeClient, newDeploymentNamespace, newDeploymentName, func(event corev1.Event) bool {
425+
return event.Reason == events.EventReasonScheduleBindingFailed && strings.Contains(event.Message, admissionWebhookDenyMsgPrefix)
426+
})
427+
428+
gomega.Eventually(func() bool {
429+
rb, err := karmadaClient.WorkV1alpha2().ResourceBindings(newDeploymentNamespace).Get(context.TODO(), newRB, metav1.GetOptions{})
430+
if err != nil {
431+
return false
432+
}
433+
return rb != nil && meta.IsStatusConditionPresentAndEqual(rb.Status.Conditions, workv1alpha2.Scheduled, metav1.ConditionFalse)
434+
}, pollTimeout, pollInterval).Should(gomega.Equal(true))
420435
framework.WaitResourceBindingFitWith(karmadaClient, newDeploymentNamespace, newRB, func(resourceBinding *workv1alpha2.ResourceBinding) bool {
421436
return resourceBinding.Spec.Clusters == nil
422437
})
423438
framework.WaitDeploymentDisappearOnClusters(clusterNames, newDeploymentNamespace, newDeploymentName)
424439
})
425440
})
441+
442+
ginkgo.It("When the quota is insufficient, scaling up will be blocked.", func() {
443+
ginkgo.By("update the replicas of the deployment", func() {
444+
mutateFunc := func(deploy *appsv1.Deployment) {
445+
deploy.Spec.Replicas = ptr.To[int32](2)
446+
}
447+
448+
framework.UpdateDeploymentWith(kubeClient, deploymentNamespace, deploymentName, mutateFunc)
449+
})
450+
451+
ginkgo.By("the quota has been exceed, so the update request for the spec.clusters in the resourcebinding will be intercepted.", func() {
452+
framework.WaitEventFitWith(kubeClient, rbNamespace, rbName, func(event corev1.Event) bool {
453+
return event.Reason == events.EventReasonScheduleBindingFailed && strings.Contains(event.Message, admissionWebhookDenyMsgPrefix)
454+
})
455+
framework.WaitEventFitWith(kubeClient, deploymentNamespace, deploymentName, func(event corev1.Event) bool {
456+
return event.Reason == events.EventReasonScheduleBindingFailed && strings.Contains(event.Message, admissionWebhookDenyMsgPrefix)
457+
})
458+
459+
gomega.Eventually(func() bool {
460+
rb, err := karmadaClient.WorkV1alpha2().ResourceBindings(rbNamespace).Get(context.TODO(), rbName, metav1.GetOptions{})
461+
if err != nil {
462+
return false
463+
}
464+
return rb != nil && meta.IsStatusConditionPresentAndEqual(rb.Status.Conditions, workv1alpha2.Scheduled, metav1.ConditionFalse)
465+
}, pollTimeout, pollInterval).Should(gomega.Equal(true))
466+
})
467+
468+
ginkgo.By("The spec.clusters of resourcebinding was not updated.", func() {
469+
rb, err := karmadaClient.WorkV1alpha2().ResourceBindings(rbNamespace).Get(context.TODO(), rbName, metav1.GetOptions{})
470+
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
471+
for i := range rb.Spec.Clusters {
472+
gomega.Expect(rb.Spec.Clusters[i].Replicas).Should(gomega.Equal(int32(1)))
473+
}
474+
475+
time.Sleep(waitTimeout)
476+
framework.WaitDeploymentPresentOnClustersFitWith(clusterNames, deploymentNamespace, deploymentName, func(deployment *appsv1.Deployment) bool {
477+
return *deployment.Spec.Replicas == 1
478+
})
479+
})
480+
})
426481
})
427482
})

test/e2e/suites/base/scheduling_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,11 @@ var _ = ginkgo.Describe("propagation with label and group constraints testing",
262262
jobNamespace = testNamespace
263263
jobName = policyName
264264
job = helper.NewJob(jobNamespace, jobName)
265+
// For fixed completion count Jobs, the actual number of pods running in parallel will not exceed the number of remaining completions.
266+
// Higher values of .spec.parallelism are effectively ignored.
267+
// Since .spec.parallelism will be updated to updateParallelism in the subsequent testing, .spec.completions is set to updateParallelism here to make the update of .spec.parallelism take effect.
268+
// More info: https://kubernetes.io/docs/concepts/workloads/controllers/job/
269+
job.Spec.Completions = ptr.To[int32](updateParallelism)
265270
maxGroups = rand.Intn(2) + 1
266271
minGroups = maxGroups
267272

0 commit comments

Comments
 (0)