Skip to content

Commit c2c3c47

Browse files
committed
test/e2e/framework:move functions to test/e2e/scheduling/
1 parent acd97b4 commit c2c3c47

File tree

6 files changed

+137
-142
lines changed

6 files changed

+137
-142
lines changed

test/e2e/framework/events/BUILD

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,9 @@ go_library(
66
importpath = "k8s.io/kubernetes/test/e2e/framework/events",
77
visibility = ["//visibility:public"],
88
deps = [
9-
"//staging/src/k8s.io/api/core/v1:go_default_library",
109
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
11-
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
12-
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
1310
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
14-
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
1511
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
16-
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
17-
"//test/e2e/framework:go_default_library",
18-
"//vendor/github.com/onsi/ginkgo:go_default_library",
1912
],
2013
)
2114

test/e2e/framework/events/events.go

Lines changed: 0 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -20,140 +20,13 @@ import (
2020
"context"
2121
"fmt"
2222
"strings"
23-
"sync"
2423
"time"
2524

26-
"k8s.io/api/core/v1"
2725
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28-
"k8s.io/apimachinery/pkg/fields"
29-
"k8s.io/apimachinery/pkg/runtime"
3026
"k8s.io/apimachinery/pkg/util/wait"
31-
"k8s.io/apimachinery/pkg/watch"
3227
clientset "k8s.io/client-go/kubernetes"
33-
"k8s.io/client-go/tools/cache"
34-
"k8s.io/kubernetes/test/e2e/framework"
35-
36-
"github.com/onsi/ginkgo"
3728
)
3829

39-
// Action is a function to be performed by the system.
40-
type Action func() error
41-
42-
// ObserveNodeUpdateAfterAction returns true if a node update matching the predicate was emitted
43-
// from the system after performing the supplied action.
44-
func ObserveNodeUpdateAfterAction(c clientset.Interface, nodeName string, nodePredicate func(*v1.Node) bool, action Action) (bool, error) {
45-
observedMatchingNode := false
46-
nodeSelector := fields.OneTermEqualSelector("metadata.name", nodeName)
47-
informerStartedChan := make(chan struct{})
48-
var informerStartedGuard sync.Once
49-
50-
_, controller := cache.NewInformer(
51-
&cache.ListWatch{
52-
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
53-
options.FieldSelector = nodeSelector.String()
54-
ls, err := c.CoreV1().Nodes().List(context.TODO(), options)
55-
return ls, err
56-
},
57-
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
58-
// Signal parent goroutine that watching has begun.
59-
defer informerStartedGuard.Do(func() { close(informerStartedChan) })
60-
options.FieldSelector = nodeSelector.String()
61-
w, err := c.CoreV1().Nodes().Watch(context.TODO(), options)
62-
return w, err
63-
},
64-
},
65-
&v1.Node{},
66-
0,
67-
cache.ResourceEventHandlerFuncs{
68-
UpdateFunc: func(oldObj, newObj interface{}) {
69-
n, ok := newObj.(*v1.Node)
70-
framework.ExpectEqual(ok, true)
71-
if nodePredicate(n) {
72-
observedMatchingNode = true
73-
}
74-
},
75-
},
76-
)
77-
78-
// Start the informer and block this goroutine waiting for the started signal.
79-
informerStopChan := make(chan struct{})
80-
defer func() { close(informerStopChan) }()
81-
go controller.Run(informerStopChan)
82-
<-informerStartedChan
83-
84-
// Invoke the action function.
85-
err := action()
86-
if err != nil {
87-
return false, err
88-
}
89-
90-
// Poll whether the informer has found a matching node update with a timeout.
91-
// Wait up 2 minutes polling every second.
92-
timeout := 2 * time.Minute
93-
interval := 1 * time.Second
94-
err = wait.Poll(interval, timeout, func() (bool, error) {
95-
return observedMatchingNode, nil
96-
})
97-
return err == nil, err
98-
}
99-
100-
// ObserveEventAfterAction returns true if an event matching the predicate was emitted
101-
// from the system after performing the supplied action.
102-
func ObserveEventAfterAction(c clientset.Interface, ns string, eventPredicate func(*v1.Event) bool, action Action) (bool, error) {
103-
observedMatchingEvent := false
104-
informerStartedChan := make(chan struct{})
105-
var informerStartedGuard sync.Once
106-
107-
// Create an informer to list/watch events from the test framework namespace.
108-
_, controller := cache.NewInformer(
109-
&cache.ListWatch{
110-
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
111-
ls, err := c.CoreV1().Events(ns).List(context.TODO(), options)
112-
return ls, err
113-
},
114-
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
115-
// Signal parent goroutine that watching has begun.
116-
defer informerStartedGuard.Do(func() { close(informerStartedChan) })
117-
w, err := c.CoreV1().Events(ns).Watch(context.TODO(), options)
118-
return w, err
119-
},
120-
},
121-
&v1.Event{},
122-
0,
123-
cache.ResourceEventHandlerFuncs{
124-
AddFunc: func(obj interface{}) {
125-
e, ok := obj.(*v1.Event)
126-
ginkgo.By(fmt.Sprintf("Considering event: \nType = [%s], Name = [%s], Reason = [%s], Message = [%s]", e.Type, e.Name, e.Reason, e.Message))
127-
framework.ExpectEqual(ok, true)
128-
if eventPredicate(e) {
129-
observedMatchingEvent = true
130-
}
131-
},
132-
},
133-
)
134-
135-
// Start the informer and block this goroutine waiting for the started signal.
136-
informerStopChan := make(chan struct{})
137-
defer func() { close(informerStopChan) }()
138-
go controller.Run(informerStopChan)
139-
<-informerStartedChan
140-
141-
// Invoke the action function.
142-
err := action()
143-
if err != nil {
144-
return false, err
145-
}
146-
147-
// Poll whether the informer has found a matching event with a timeout.
148-
// Wait up 2 minutes polling every second.
149-
timeout := 2 * time.Minute
150-
interval := 1 * time.Second
151-
err = wait.Poll(interval, timeout, func() (bool, error) {
152-
return observedMatchingEvent, nil
153-
})
154-
return err == nil, err
155-
}
156-
15730
// WaitTimeoutForEvent waits the given timeout duration for an event to occur.
15831
func WaitTimeoutForEvent(c clientset.Interface, namespace, eventSelector, msg string, timeout time.Duration) error {
15932
interval := 2 * time.Second

test/e2e/scheduling/BUILD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ go_library(
2828
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
2929
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
3030
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
31+
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
3132
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
3233
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
3334
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
@@ -40,7 +41,6 @@ go_library(
4041
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
4142
"//staging/src/k8s.io/client-go/tools/watch:go_default_library",
4243
"//test/e2e/framework:go_default_library",
43-
"//test/e2e/framework/events:go_default_library",
4444
"//test/e2e/framework/gpu:go_default_library",
4545
"//test/e2e/framework/job:go_default_library",
4646
"//test/e2e/framework/kubelet:go_default_library",

test/e2e/scheduling/events.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,23 @@ limitations under the License.
1717
package scheduling
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"strings"
23+
"sync"
24+
"time"
2225

2326
"k8s.io/api/core/v1"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/apimachinery/pkg/fields"
29+
"k8s.io/apimachinery/pkg/runtime"
30+
"k8s.io/apimachinery/pkg/util/wait"
31+
"k8s.io/apimachinery/pkg/watch"
32+
clientset "k8s.io/client-go/kubernetes"
33+
"k8s.io/client-go/tools/cache"
34+
"k8s.io/kubernetes/test/e2e/framework"
35+
36+
"github.com/onsi/ginkgo"
2437
)
2538

2639
func scheduleSuccessEvent(ns, podName, nodeName string) func(*v1.Event) bool {
@@ -39,3 +52,121 @@ func scheduleFailureEvent(podName string) func(*v1.Event) bool {
3952
e.Reason == "FailedScheduling"
4053
}
4154
}
55+
56+
// Action is a function to be performed by the system.
57+
type Action func() error
58+
59+
// observeNodeUpdateAfterAction returns true if a node update matching the predicate was emitted
60+
// from the system after performing the supplied action.
61+
func observeNodeUpdateAfterAction(c clientset.Interface, nodeName string, nodePredicate func(*v1.Node) bool, action Action) (bool, error) {
62+
observedMatchingNode := false
63+
nodeSelector := fields.OneTermEqualSelector("metadata.name", nodeName)
64+
informerStartedChan := make(chan struct{})
65+
var informerStartedGuard sync.Once
66+
67+
_, controller := cache.NewInformer(
68+
&cache.ListWatch{
69+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
70+
options.FieldSelector = nodeSelector.String()
71+
ls, err := c.CoreV1().Nodes().List(context.TODO(), options)
72+
return ls, err
73+
},
74+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
75+
// Signal parent goroutine that watching has begun.
76+
defer informerStartedGuard.Do(func() { close(informerStartedChan) })
77+
options.FieldSelector = nodeSelector.String()
78+
w, err := c.CoreV1().Nodes().Watch(context.TODO(), options)
79+
return w, err
80+
},
81+
},
82+
&v1.Node{},
83+
0,
84+
cache.ResourceEventHandlerFuncs{
85+
UpdateFunc: func(oldObj, newObj interface{}) {
86+
n, ok := newObj.(*v1.Node)
87+
framework.ExpectEqual(ok, true)
88+
if nodePredicate(n) {
89+
observedMatchingNode = true
90+
}
91+
},
92+
},
93+
)
94+
95+
// Start the informer and block this goroutine waiting for the started signal.
96+
informerStopChan := make(chan struct{})
97+
defer func() { close(informerStopChan) }()
98+
go controller.Run(informerStopChan)
99+
<-informerStartedChan
100+
101+
// Invoke the action function.
102+
err := action()
103+
if err != nil {
104+
return false, err
105+
}
106+
107+
// Poll whether the informer has found a matching node update with a timeout.
108+
// Wait up 2 minutes polling every second.
109+
timeout := 2 * time.Minute
110+
interval := 1 * time.Second
111+
err = wait.Poll(interval, timeout, func() (bool, error) {
112+
return observedMatchingNode, nil
113+
})
114+
return err == nil, err
115+
}
116+
117+
// observeEventAfterAction returns true if an event matching the predicate was emitted
118+
// from the system after performing the supplied action.
119+
func observeEventAfterAction(c clientset.Interface, ns string, eventPredicate func(*v1.Event) bool, action Action) (bool, error) {
120+
observedMatchingEvent := false
121+
informerStartedChan := make(chan struct{})
122+
var informerStartedGuard sync.Once
123+
124+
// Create an informer to list/watch events from the test framework namespace.
125+
_, controller := cache.NewInformer(
126+
&cache.ListWatch{
127+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
128+
ls, err := c.CoreV1().Events(ns).List(context.TODO(), options)
129+
return ls, err
130+
},
131+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
132+
// Signal parent goroutine that watching has begun.
133+
defer informerStartedGuard.Do(func() { close(informerStartedChan) })
134+
w, err := c.CoreV1().Events(ns).Watch(context.TODO(), options)
135+
return w, err
136+
},
137+
},
138+
&v1.Event{},
139+
0,
140+
cache.ResourceEventHandlerFuncs{
141+
AddFunc: func(obj interface{}) {
142+
e, ok := obj.(*v1.Event)
143+
ginkgo.By(fmt.Sprintf("Considering event: \nType = [%s], Name = [%s], Reason = [%s], Message = [%s]", e.Type, e.Name, e.Reason, e.Message))
144+
framework.ExpectEqual(ok, true)
145+
if eventPredicate(e) {
146+
observedMatchingEvent = true
147+
}
148+
},
149+
},
150+
)
151+
152+
// Start the informer and block this goroutine waiting for the started signal.
153+
informerStopChan := make(chan struct{})
154+
defer func() { close(informerStopChan) }()
155+
go controller.Run(informerStopChan)
156+
<-informerStartedChan
157+
158+
// Invoke the action function.
159+
err := action()
160+
if err != nil {
161+
return false, err
162+
}
163+
164+
// Poll whether the informer has found a matching event with a timeout.
165+
// Wait up 2 minutes polling every second.
166+
timeout := 2 * time.Minute
167+
interval := 1 * time.Second
168+
err = wait.Poll(interval, timeout, func() (bool, error) {
169+
return observedMatchingEvent, nil
170+
})
171+
return err == nil, err
172+
}

test/e2e/scheduling/predicates.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
clientset "k8s.io/client-go/kubernetes"
3131
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
3232
"k8s.io/kubernetes/test/e2e/framework"
33-
e2eevents "k8s.io/kubernetes/test/e2e/framework/events"
3433
e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet"
3534
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
3635
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
@@ -708,15 +707,15 @@ func getRequestedStorageEphemeralStorage(pod v1.Pod) int64 {
708707

709708
// removeTaintFromNodeAction returns a closure that removes the given taint
710709
// from the given node upon invocation.
711-
func removeTaintFromNodeAction(cs clientset.Interface, nodeName string, testTaint v1.Taint) e2eevents.Action {
710+
func removeTaintFromNodeAction(cs clientset.Interface, nodeName string, testTaint v1.Taint) Action {
712711
return func() error {
713712
framework.RemoveTaintOffNode(cs, nodeName, testTaint)
714713
return nil
715714
}
716715
}
717716

718717
// createPausePodAction returns a closure that creates a pause pod upon invocation.
719-
func createPausePodAction(f *framework.Framework, conf pausePodConfig) e2eevents.Action {
718+
func createPausePodAction(f *framework.Framework, conf pausePodConfig) Action {
720719
return func() error {
721720
_, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(context.TODO(), initPausePod(f, conf), metav1.CreateOptions{})
722721
return err
@@ -725,12 +724,12 @@ func createPausePodAction(f *framework.Framework, conf pausePodConfig) e2eevents
725724

726725
// WaitForSchedulerAfterAction performs the provided action and then waits for
727726
// scheduler to act on the given pod.
728-
func WaitForSchedulerAfterAction(f *framework.Framework, action e2eevents.Action, ns, podName string, expectSuccess bool) {
727+
func WaitForSchedulerAfterAction(f *framework.Framework, action Action, ns, podName string, expectSuccess bool) {
729728
predicate := scheduleFailureEvent(podName)
730729
if expectSuccess {
731730
predicate = scheduleSuccessEvent(ns, podName, "" /* any node */)
732731
}
733-
success, err := e2eevents.ObserveEventAfterAction(f.ClientSet, f.Namespace.Name, predicate, action)
732+
success, err := observeEventAfterAction(f.ClientSet, f.Namespace.Name, predicate, action)
734733
framework.ExpectNoError(err)
735734
framework.ExpectEqual(success, true)
736735
}

test/e2e/scheduling/priorities.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import (
3838
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
3939
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
4040
"k8s.io/kubernetes/test/e2e/framework"
41-
e2eevents "k8s.io/kubernetes/test/e2e/framework/events"
4241
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
4342
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
4443
e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
@@ -285,7 +284,7 @@ var _ = SIGDescribe("SchedulerPriorities [Serial]", func() {
285284
}
286285
return node.Annotations[v1.PreferAvoidPodsAnnotationKey] == string(val)
287286
}
288-
success, err := e2eevents.ObserveNodeUpdateAfterAction(f.ClientSet, nodeName, predicate, action)
287+
success, err := observeNodeUpdateAfterAction(f.ClientSet, nodeName, predicate, action)
289288
framework.ExpectNoError(err)
290289
framework.ExpectEqual(success, true)
291290

0 commit comments

Comments
 (0)