@@ -20,6 +20,7 @@ import (
20
20
"context"
21
21
"fmt"
22
22
"github.com/onsi/gomega"
23
+ "sync"
23
24
"time"
24
25
25
26
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
@@ -32,6 +33,8 @@ import (
32
33
"k8s.io/apimachinery/pkg/watch"
33
34
"k8s.io/apiserver/pkg/storage/names"
34
35
"k8s.io/client-go/dynamic"
36
+ "k8s.io/client-go/dynamic/dynamicinformer"
37
+ "k8s.io/client-go/tools/cache"
35
38
"k8s.io/kubernetes/test/e2e/framework"
36
39
"k8s.io/kubernetes/test/utils/crd"
37
40
imageutils "k8s.io/kubernetes/test/utils/image"
@@ -150,15 +153,44 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fu
150
153
151
154
ginkgo .By ("Watching with field selectors" )
152
155
153
- v2Client , gvr := customResourceClient (crd , "v2" )
154
- hostWatch , err := v2Client .Namespace (f .Namespace .Name ).Watch (ctx , metav1.ListOptions {FieldSelector : "host=host1" })
156
+ v2Client , v2gvr := customResourceClient (crd , "v2" )
157
+ v2hostWatch , err := v2Client .Namespace (f .Namespace .Name ).Watch (ctx , metav1.ListOptions {FieldSelector : "host=host1" })
155
158
framework .ExpectNoError (err , "watching custom resources with field selector" )
159
+ v2hostWatchAcc := watchAccumulator (v2hostWatch )
156
160
v2hostPortWatch , err := v2Client .Namespace (f .Namespace .Name ).Watch (ctx , metav1.ListOptions {FieldSelector : "host=host1,port=80" })
157
161
framework .ExpectNoError (err , "watching custom resources with field selector" )
162
+ framework .ExpectNoError (err , "watching custom resources with field selector" )
163
+ v2hostPortWatchAcc := watchAccumulator (v2hostPortWatch )
158
164
159
165
v1Client , _ := customResourceClient (crd , "v1" )
160
166
v1hostPortWatch , err := v1Client .Namespace (f .Namespace .Name ).Watch (ctx , metav1.ListOptions {FieldSelector : "hostPort=host1:80" })
161
167
framework .ExpectNoError (err , "watching custom resources with field selector" )
168
+ v1hostPortWatchAcc := watchAccumulator (v1hostPortWatch )
169
+
170
+ ginkgo .By ("Registering informers with field selectors" )
171
+
172
+ informerCtx , cancel := context .WithTimeout (ctx , 30 * time .Second )
173
+ defer cancel ()
174
+
175
+ hostInformer := dynamicinformer .NewFilteredDynamicSharedInformerFactory (f .DynamicClient , 0 , f .Namespace .Name , func (opts * metav1.ListOptions ) {
176
+ opts .FieldSelector = "host=host1"
177
+ })
178
+ hostInformer .Start (informerCtx .Done ())
179
+
180
+ hostPortInformer := dynamicinformer .NewFilteredDynamicSharedInformerFactory (f .DynamicClient , 0 , f .Namespace .Name , func (opts * metav1.ListOptions ) {
181
+ opts .FieldSelector = "host=host1,port=80"
182
+ })
183
+ hostPortInformer .Start (informerCtx .Done ())
184
+
185
+ v2HostInformer := hostInformer .ForResource (v2gvr ).Informer ()
186
+ go v2HostInformer .Run (informerCtx .Done ())
187
+ v2HostPortInformer := hostPortInformer .ForResource (v2gvr ).Informer ()
188
+ go v2HostPortInformer .Run (informerCtx .Done ())
189
+
190
+ v2HostInformerAcc := informerAccumulator (v2HostInformer )
191
+ v2HostPortInformerAcc := informerAccumulator (v2HostPortInformer )
192
+
193
+ framework .ExpectNoError (err , "adding event handler" )
162
194
163
195
ginkgo .By ("Creating custom resources" )
164
196
toCreate := []map [string ]any {
@@ -181,7 +213,7 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fu
181
213
crNames [i ] = name
182
214
183
215
obj := map [string ]interface {}{
184
- "apiVersion" : gvr .Group + "/" + gvr .Version ,
216
+ "apiVersion" : v2gvr .Group + "/" + v2gvr .Version ,
185
217
"kind" : crd .Spec .Names .Kind ,
186
218
"metadata" : map [string ]interface {}{
187
219
"name" : name ,
@@ -215,15 +247,23 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fu
215
247
gomega .Expect (listResultToNames (list )).To (gomega .Equal (sets .New (crNames [1 ])))
216
248
217
249
ginkgo .By ("Waiting for watch events to contain v2 custom resources for field selector host=host1" )
218
- gomega .Eventually (ctx , watchAccumulator ( hostWatch ) ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
250
+ gomega .Eventually (ctx , v2hostWatchAcc ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
219
251
Should (gomega .Equal (addedEvents (sets .New (crNames [0 ], crNames [1 ]))))
220
252
221
253
ginkgo .By ("Waiting for watch events to contain v2 custom resources for field selector host=host1,port=80" )
222
- gomega .Eventually (ctx , watchAccumulator ( v2hostPortWatch ) ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
254
+ gomega .Eventually (ctx , v2hostPortWatchAcc ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
223
255
Should (gomega .Equal (addedEvents (sets .New (crNames [0 ]))))
224
256
225
257
ginkgo .By ("Waiting for watch events to contain v1 custom resources for field selector hostPort=host1:80" )
226
- gomega .Eventually (ctx , watchAccumulator (v1hostPortWatch )).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
258
+ gomega .Eventually (ctx , v1hostPortWatchAcc ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
259
+ Should (gomega .Equal (addedEvents (sets .New (crNames [0 ]))))
260
+
261
+ ginkgo .By ("Waiting for informer events to contain v2 custom resources for field selector host=host1" )
262
+ gomega .Eventually (ctx , v2HostInformerAcc ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
263
+ Should (gomega .Equal (addedEvents (sets .New (crNames [0 ], crNames [1 ]))))
264
+
265
+ ginkgo .By ("Waiting for informer events to contain v2 custom resources for field selector host=host1,port=80" )
266
+ gomega .Eventually (ctx , v2HostPortInformerAcc ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
227
267
Should (gomega .Equal (addedEvents (sets .New (crNames [0 ]))))
228
268
229
269
ginkgo .By ("Deleting one custom resources to ensure that deletions are observed" )
@@ -249,18 +289,25 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fu
249
289
gomega .Expect (listResultToNames (list )).To (gomega .Equal (sets .New [string ]()))
250
290
251
291
ginkgo .By ("Waiting for v2 watch events after updates and deletes for field selector host=host1" )
252
- gomega .Eventually (ctx , watchAccumulator ( hostWatch ) ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
253
- Should (gomega .Equal (deletedEvents ( sets .New (crNames [0 ], crNames [1 ]))))
292
+ gomega .Eventually (ctx , v2hostWatchAcc ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
293
+ Should (gomega .Equal (addedAndDeletedEvents ( sets . New ( crNames [ 0 ], crNames [ 1 ]), sets .New (crNames [0 ], crNames [1 ]))))
254
294
255
295
ginkgo .By ("Waiting for v2 watch events after updates and deletes for field selector host=host1,port=80" )
256
- gomega .Eventually (ctx , watchAccumulator ( v2hostPortWatch ) ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
257
- Should (gomega .Equal (deletedEvents ( sets .New (crNames [0 ]))))
296
+ gomega .Eventually (ctx , v2hostPortWatchAcc ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
297
+ Should (gomega .Equal (addedAndDeletedEvents ( sets . New ( crNames [ 0 ]), sets .New (crNames [0 ]))))
258
298
259
299
ginkgo .By ("Waiting for v1 watch events after updates and deletes for field selector hostPort=host1:80" )
260
- gomega .Eventually (ctx , watchAccumulator (v1hostPortWatch )).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
261
- Should (gomega .Equal (deletedEvents (sets .New (crNames [0 ]))))
262
- })
300
+ gomega .Eventually (ctx , v1hostPortWatchAcc ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
301
+ Should (gomega .Equal (addedAndDeletedEvents (sets .New (crNames [0 ]), sets .New (crNames [0 ]))))
263
302
303
+ ginkgo .By ("Waiting for v2 informer events after updates and deletes for field selector host=host1" )
304
+ gomega .Eventually (ctx , v2HostInformerAcc ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
305
+ Should (gomega .Equal (addedAndDeletedEvents (sets .New (crNames [0 ], crNames [1 ]), sets .New (crNames [0 ], crNames [1 ]))))
306
+
307
+ ginkgo .By ("Waiting for v2 informer events after updates and deletes for field selector host=host1,port=80" )
308
+ gomega .Eventually (ctx , v2HostPortInformerAcc ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
309
+ Should (gomega .Equal (addedAndDeletedEvents (sets .New (crNames [0 ]), sets .New (crNames [0 ]))))
310
+ })
264
311
})
265
312
})
266
313
@@ -276,8 +323,8 @@ func addedEvents(added sets.Set[string]) *accumulatedEvents {
276
323
return & accumulatedEvents {added : added , deleted : sets .New [string ]()}
277
324
}
278
325
279
- func deletedEvents ( deleted sets.Set [string ]) * accumulatedEvents {
280
- return & accumulatedEvents {added : sets . New [ string ]() , deleted : deleted }
326
+ func addedAndDeletedEvents ( added , deleted sets.Set [string ]) * accumulatedEvents {
327
+ return & accumulatedEvents {added : added , deleted : deleted }
281
328
}
282
329
283
330
func watchAccumulator (w watch.Interface ) func (ctx context.Context ) (* accumulatedEvents , error ) {
@@ -301,6 +348,39 @@ func watchAccumulator(w watch.Interface) func(ctx context.Context) (*accumulated
301
348
}
302
349
}
303
350
351
+ func informerAccumulator (informer cache.SharedIndexInformer ) func (ctx context.Context ) (* accumulatedEvents , error ) {
352
+ var lock sync.Mutex
353
+ result := emptyEvents ()
354
+
355
+ _ , err := informer .AddEventHandler (cache.ResourceEventHandlerFuncs {
356
+ AddFunc : func (obj any ) {
357
+ defer ginkgo .GinkgoRecover ()
358
+ lock .Lock ()
359
+ defer lock .Unlock ()
360
+ result .added .Insert (obj .(* unstructured.Unstructured ).GetName ())
361
+ },
362
+ UpdateFunc : func (oldObj , newObj any ) {
363
+ defer ginkgo .GinkgoRecover ()
364
+ // ignoring
365
+ },
366
+ DeleteFunc : func (obj any ) {
367
+ defer ginkgo .GinkgoRecover ()
368
+ lock .Lock ()
369
+ defer lock .Unlock ()
370
+ result .deleted .Insert (obj .(* unstructured.Unstructured ).GetName ())
371
+ },
372
+ })
373
+
374
+ return func (ctx context.Context ) (* accumulatedEvents , error ) {
375
+ if err != nil {
376
+ return nil , err
377
+ }
378
+ lock .Lock ()
379
+ defer lock .Unlock ()
380
+ return result , nil
381
+ }
382
+ }
383
+
304
384
func listResultToNames (list * unstructured.UnstructuredList ) sets.Set [string ] {
305
385
found := sets .New [string ]()
306
386
for _ , i := range list .Items {
0 commit comments