@@ -18,6 +18,7 @@ package scheduling
18
18
19
19
import (
20
20
"context"
21
+ "encoding/json"
21
22
"fmt"
22
23
"strings"
23
24
"sync/atomic"
@@ -30,7 +31,9 @@ import (
30
31
"k8s.io/apimachinery/pkg/api/resource"
31
32
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32
33
"k8s.io/apimachinery/pkg/runtime"
34
+ "k8s.io/apimachinery/pkg/types"
33
35
"k8s.io/apimachinery/pkg/util/sets"
36
+ "k8s.io/apimachinery/pkg/util/strategicpatch"
34
37
"k8s.io/apimachinery/pkg/util/wait"
35
38
"k8s.io/apimachinery/pkg/watch"
36
39
clientset "k8s.io/client-go/kubernetes"
@@ -40,7 +43,6 @@ import (
40
43
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
41
44
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
42
45
e2ereplicaset "k8s.io/kubernetes/test/e2e/framework/replicaset"
43
- e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
44
46
45
47
"github.com/onsi/ginkgo"
46
48
"github.com/onsi/gomega"
@@ -77,8 +79,10 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() {
77
79
cs .SchedulingV1 ().PriorityClasses ().Delete (context .TODO (), pair .name , * metav1 .NewDeleteOptions (0 ))
78
80
}
79
81
for _ , node := range nodeList .Items {
80
- delete (node .Status .Capacity , testExtendedResource )
81
- cs .CoreV1 ().Nodes ().UpdateStatus (context .TODO (), & node , metav1.UpdateOptions {})
82
+ nodeCopy := node .DeepCopy ()
83
+ delete (nodeCopy .Status .Capacity , testExtendedResource )
84
+ err := patchNode (cs , & node , nodeCopy )
85
+ framework .ExpectNoError (err )
82
86
}
83
87
})
84
88
@@ -119,8 +123,9 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() {
119
123
// Now create victim pods on each of the node with lower priority
120
124
for i , node := range nodeList .Items {
121
125
// Update each node to advertise 3 available extended resources
122
- node .Status .Capacity [testExtendedResource ] = resource .MustParse ("3" )
123
- node , err := cs .CoreV1 ().Nodes ().UpdateStatus (context .TODO (), & node , metav1.UpdateOptions {})
126
+ nodeCopy := node .DeepCopy ()
127
+ nodeCopy .Status .Capacity [testExtendedResource ] = resource .MustParse ("3" )
128
+ err := patchNode (cs , & node , nodeCopy )
124
129
framework .ExpectNoError (err )
125
130
126
131
// Request 2 of the available resources for the victim pods
@@ -204,8 +209,9 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() {
204
209
pods := make ([]* v1.Pod , 0 , len (nodeList .Items ))
205
210
for i , node := range nodeList .Items {
206
211
// Update each node to advertise 3 available extended resources
207
- node .Status .Capacity [testExtendedResource ] = resource .MustParse ("3" )
208
- node , err := cs .CoreV1 ().Nodes ().UpdateStatus (context .TODO (), & node , metav1.UpdateOptions {})
212
+ nodeCopy := node .DeepCopy ()
213
+ nodeCopy .Status .Capacity [testExtendedResource ] = resource .MustParse ("3" )
214
+ err := patchNode (cs , & node , nodeCopy )
209
215
framework .ExpectNoError (err )
210
216
211
217
// Request 2 of the available resources for the victim pods
@@ -241,7 +247,7 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() {
241
247
framework .Logf ("Created pod: %v" , pods [i ].Name )
242
248
}
243
249
if len (pods ) < 2 {
244
- e2eskipper . Skipf ("We need at least two pods to be created but" +
250
+ framework . Failf ("We need at least two pods to be created but" +
245
251
"all nodes are already heavily utilized, so preemption tests cannot be run" )
246
252
}
247
253
ginkgo .By ("Wait for pods to be scheduled." )
@@ -305,12 +311,10 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() {
305
311
node , err := cs .CoreV1 ().Nodes ().Get (context .TODO (), nodeName , metav1.GetOptions {})
306
312
framework .ExpectNoError (err )
307
313
// update Node API object with a fake resource
308
- nodeCopy := node .DeepCopy ()
309
- // force it to update
310
- nodeCopy .ResourceVersion = "0"
311
314
ginkgo .By (fmt .Sprintf ("Apply 10 fake resource to node %v." , node .Name ))
315
+ nodeCopy := node .DeepCopy ()
312
316
nodeCopy .Status .Capacity [fakeRes ] = resource .MustParse ("10" )
313
- node , err = cs . CoreV1 (). Nodes (). UpdateStatus ( context . TODO (), nodeCopy , metav1. UpdateOptions {} )
317
+ err = patchNode ( cs , node , nodeCopy )
314
318
framework .ExpectNoError (err )
315
319
nodes = append (nodes , node )
316
320
}
@@ -321,10 +325,8 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() {
321
325
}
322
326
for _ , node := range nodes {
323
327
nodeCopy := node .DeepCopy ()
324
- // force it to update
325
- nodeCopy .ResourceVersion = "0"
326
328
delete (nodeCopy .Status .Capacity , fakeRes )
327
- _ , err := cs . CoreV1 (). Nodes (). UpdateStatus ( context . TODO (), nodeCopy , metav1. UpdateOptions {} )
329
+ err := patchNode ( cs , node , nodeCopy )
328
330
framework .ExpectNoError (err )
329
331
}
330
332
})
@@ -470,10 +472,8 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() {
470
472
471
473
if node != nil {
472
474
nodeCopy := node .DeepCopy ()
473
- // force it to update
474
- nodeCopy .ResourceVersion = "0"
475
475
delete (nodeCopy .Status .Capacity , fakecpu )
476
- _ , err := cs . CoreV1 (). Nodes (). UpdateStatus ( context . TODO (), nodeCopy , metav1. UpdateOptions {} )
476
+ err := patchNode ( cs , node , nodeCopy )
477
477
framework .ExpectNoError (err )
478
478
}
479
479
for _ , pair := range priorityPairs {
@@ -504,10 +504,8 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() {
504
504
505
505
// update Node API object with a fake resource
506
506
nodeCopy := node .DeepCopy ()
507
- // force it to update
508
- nodeCopy .ResourceVersion = "0"
509
507
nodeCopy .Status .Capacity [fakecpu ] = resource .MustParse ("1000" )
510
- node , err = cs . CoreV1 (). Nodes (). UpdateStatus ( context . TODO (), nodeCopy , metav1. UpdateOptions {} )
508
+ err = patchNode ( cs , node , nodeCopy )
511
509
framework .ExpectNoError (err )
512
510
513
511
// create four PriorityClass: p1, p2, p3, p4
@@ -737,3 +735,21 @@ func waitForPreemptingWithTimeout(f *framework.Framework, pod *v1.Pod, timeout t
737
735
})
738
736
framework .ExpectNoError (err , "pod %v/%v failed to preempt other pods" , pod .Namespace , pod .Name )
739
737
}
738
+
739
+ func patchNode (client clientset.Interface , old * v1.Node , new * v1.Node ) error {
740
+ oldData , err := json .Marshal (old )
741
+ if err != nil {
742
+ return err
743
+ }
744
+
745
+ newData , err := json .Marshal (new )
746
+ if err != nil {
747
+ return err
748
+ }
749
+ patchBytes , err := strategicpatch .CreateTwoWayMergePatch (oldData , newData , & v1.Node {})
750
+ if err != nil {
751
+ return fmt .Errorf ("failed to create merge patch for node %q: %v" , old .Name , err )
752
+ }
753
+ _ , err = client .CoreV1 ().Nodes ().Patch (context .TODO (), old .Name , types .StrategicMergePatchType , patchBytes , metav1.PatchOptions {}, "status" )
754
+ return err
755
+ }
0 commit comments