@@ -20,140 +20,13 @@ import (
20
20
"context"
21
21
"fmt"
22
22
"strings"
23
- "sync"
24
23
"time"
25
24
26
- "k8s.io/api/core/v1"
27
25
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28
- "k8s.io/apimachinery/pkg/fields"
29
- "k8s.io/apimachinery/pkg/runtime"
30
26
"k8s.io/apimachinery/pkg/util/wait"
31
- "k8s.io/apimachinery/pkg/watch"
32
27
clientset "k8s.io/client-go/kubernetes"
33
- "k8s.io/client-go/tools/cache"
34
- "k8s.io/kubernetes/test/e2e/framework"
35
-
36
- "github.com/onsi/ginkgo"
37
28
)
38
29
39
- // Action is a function to be performed by the system.
40
- type Action func () error
41
-
42
- // ObserveNodeUpdateAfterAction returns true if a node update matching the predicate was emitted
43
- // from the system after performing the supplied action.
44
- func ObserveNodeUpdateAfterAction (c clientset.Interface , nodeName string , nodePredicate func (* v1.Node ) bool , action Action ) (bool , error ) {
45
- observedMatchingNode := false
46
- nodeSelector := fields .OneTermEqualSelector ("metadata.name" , nodeName )
47
- informerStartedChan := make (chan struct {})
48
- var informerStartedGuard sync.Once
49
-
50
- _ , controller := cache .NewInformer (
51
- & cache.ListWatch {
52
- ListFunc : func (options metav1.ListOptions ) (runtime.Object , error ) {
53
- options .FieldSelector = nodeSelector .String ()
54
- ls , err := c .CoreV1 ().Nodes ().List (context .TODO (), options )
55
- return ls , err
56
- },
57
- WatchFunc : func (options metav1.ListOptions ) (watch.Interface , error ) {
58
- // Signal parent goroutine that watching has begun.
59
- defer informerStartedGuard .Do (func () { close (informerStartedChan ) })
60
- options .FieldSelector = nodeSelector .String ()
61
- w , err := c .CoreV1 ().Nodes ().Watch (context .TODO (), options )
62
- return w , err
63
- },
64
- },
65
- & v1.Node {},
66
- 0 ,
67
- cache.ResourceEventHandlerFuncs {
68
- UpdateFunc : func (oldObj , newObj interface {}) {
69
- n , ok := newObj .(* v1.Node )
70
- framework .ExpectEqual (ok , true )
71
- if nodePredicate (n ) {
72
- observedMatchingNode = true
73
- }
74
- },
75
- },
76
- )
77
-
78
- // Start the informer and block this goroutine waiting for the started signal.
79
- informerStopChan := make (chan struct {})
80
- defer func () { close (informerStopChan ) }()
81
- go controller .Run (informerStopChan )
82
- <- informerStartedChan
83
-
84
- // Invoke the action function.
85
- err := action ()
86
- if err != nil {
87
- return false , err
88
- }
89
-
90
- // Poll whether the informer has found a matching node update with a timeout.
91
- // Wait up 2 minutes polling every second.
92
- timeout := 2 * time .Minute
93
- interval := 1 * time .Second
94
- err = wait .Poll (interval , timeout , func () (bool , error ) {
95
- return observedMatchingNode , nil
96
- })
97
- return err == nil , err
98
- }
99
-
100
- // ObserveEventAfterAction returns true if an event matching the predicate was emitted
101
- // from the system after performing the supplied action.
102
- func ObserveEventAfterAction (c clientset.Interface , ns string , eventPredicate func (* v1.Event ) bool , action Action ) (bool , error ) {
103
- observedMatchingEvent := false
104
- informerStartedChan := make (chan struct {})
105
- var informerStartedGuard sync.Once
106
-
107
- // Create an informer to list/watch events from the test framework namespace.
108
- _ , controller := cache .NewInformer (
109
- & cache.ListWatch {
110
- ListFunc : func (options metav1.ListOptions ) (runtime.Object , error ) {
111
- ls , err := c .CoreV1 ().Events (ns ).List (context .TODO (), options )
112
- return ls , err
113
- },
114
- WatchFunc : func (options metav1.ListOptions ) (watch.Interface , error ) {
115
- // Signal parent goroutine that watching has begun.
116
- defer informerStartedGuard .Do (func () { close (informerStartedChan ) })
117
- w , err := c .CoreV1 ().Events (ns ).Watch (context .TODO (), options )
118
- return w , err
119
- },
120
- },
121
- & v1.Event {},
122
- 0 ,
123
- cache.ResourceEventHandlerFuncs {
124
- AddFunc : func (obj interface {}) {
125
- e , ok := obj .(* v1.Event )
126
- ginkgo .By (fmt .Sprintf ("Considering event: \n Type = [%s], Name = [%s], Reason = [%s], Message = [%s]" , e .Type , e .Name , e .Reason , e .Message ))
127
- framework .ExpectEqual (ok , true )
128
- if eventPredicate (e ) {
129
- observedMatchingEvent = true
130
- }
131
- },
132
- },
133
- )
134
-
135
- // Start the informer and block this goroutine waiting for the started signal.
136
- informerStopChan := make (chan struct {})
137
- defer func () { close (informerStopChan ) }()
138
- go controller .Run (informerStopChan )
139
- <- informerStartedChan
140
-
141
- // Invoke the action function.
142
- err := action ()
143
- if err != nil {
144
- return false , err
145
- }
146
-
147
- // Poll whether the informer has found a matching event with a timeout.
148
- // Wait up 2 minutes polling every second.
149
- timeout := 2 * time .Minute
150
- interval := 1 * time .Second
151
- err = wait .Poll (interval , timeout , func () (bool , error ) {
152
- return observedMatchingEvent , nil
153
- })
154
- return err == nil , err
155
- }
156
-
157
30
// WaitTimeoutForEvent waits the given timeout duration for an event to occur.
158
31
func WaitTimeoutForEvent (c clientset.Interface , namespace , eventSelector , msg string , timeout time.Duration ) error {
159
32
interval := 2 * time .Second
0 commit comments