@@ -20,6 +20,7 @@ import (
20
20
"context"
21
21
"fmt"
22
22
"github.com/onsi/gomega"
23
+ "sync"
23
24
"time"
24
25
25
26
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
@@ -33,6 +34,8 @@ import (
33
34
"k8s.io/apimachinery/pkg/watch"
34
35
"k8s.io/apiserver/pkg/storage/names"
35
36
"k8s.io/client-go/dynamic"
37
+ "k8s.io/client-go/dynamic/dynamicinformer"
38
+ "k8s.io/client-go/tools/cache"
36
39
"k8s.io/kubernetes/test/e2e/framework"
37
40
"k8s.io/kubernetes/test/utils/crd"
38
41
imageutils "k8s.io/kubernetes/test/utils/image"
@@ -151,15 +154,44 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fr
151
154
152
155
ginkgo .By ("Watching with field selectors" )
153
156
154
- v2Client , gvr := customResourceClient (crd , "v2" )
155
- hostWatch , err := v2Client .Namespace (f .Namespace .Name ).Watch (ctx , metav1.ListOptions {FieldSelector : "host=host1" })
157
+ v2Client , v2gvr := customResourceClient (crd , "v2" )
158
+ v2hostWatch , err := v2Client .Namespace (f .Namespace .Name ).Watch (ctx , metav1.ListOptions {FieldSelector : "host=host1" })
156
159
framework .ExpectNoError (err , "watching custom resources with field selector" )
160
+ v2hostWatchAcc := watchAccumulator (v2hostWatch )
157
161
v2hostPortWatch , err := v2Client .Namespace (f .Namespace .Name ).Watch (ctx , metav1.ListOptions {FieldSelector : "host=host1,port=80" })
158
162
framework .ExpectNoError (err , "watching custom resources with field selector" )
163
+ framework .ExpectNoError (err , "watching custom resources with field selector" )
164
+ v2hostPortWatchAcc := watchAccumulator (v2hostPortWatch )
159
165
160
166
v1Client , _ := customResourceClient (crd , "v1" )
161
167
v1hostPortWatch , err := v1Client .Namespace (f .Namespace .Name ).Watch (ctx , metav1.ListOptions {FieldSelector : "hostPort=host1:80" })
162
168
framework .ExpectNoError (err , "watching custom resources with field selector" )
169
+ v1hostPortWatchAcc := watchAccumulator (v1hostPortWatch )
170
+
171
+ ginkgo .By ("Registering informers with field selectors" )
172
+
173
+ informerCtx , cancel := context .WithTimeout (ctx , 30 * time .Second )
174
+ defer cancel ()
175
+
176
+ hostInformer := dynamicinformer .NewFilteredDynamicSharedInformerFactory (f .DynamicClient , 0 , f .Namespace .Name , func (opts * metav1.ListOptions ) {
177
+ opts .FieldSelector = "host=host1"
178
+ })
179
+ hostInformer .Start (informerCtx .Done ())
180
+
181
+ hostPortInformer := dynamicinformer .NewFilteredDynamicSharedInformerFactory (f .DynamicClient , 0 , f .Namespace .Name , func (opts * metav1.ListOptions ) {
182
+ opts .FieldSelector = "host=host1,port=80"
183
+ })
184
+ hostPortInformer .Start (informerCtx .Done ())
185
+
186
+ v2HostInformer := hostInformer .ForResource (v2gvr ).Informer ()
187
+ go v2HostInformer .Run (informerCtx .Done ())
188
+ v2HostPortInformer := hostPortInformer .ForResource (v2gvr ).Informer ()
189
+ go v2HostPortInformer .Run (informerCtx .Done ())
190
+
191
+ v2HostInformerAcc := informerAccumulator (v2HostInformer )
192
+ v2HostPortInformerAcc := informerAccumulator (v2HostPortInformer )
193
+
194
+ framework .ExpectNoError (err , "adding event handler" )
163
195
164
196
ginkgo .By ("Creating custom resources" )
165
197
toCreate := []map [string ]any {
@@ -182,7 +214,7 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fr
182
214
crNames [i ] = name
183
215
184
216
obj := map [string ]interface {}{
185
- "apiVersion" : gvr .Group + "/" + gvr .Version ,
217
+ "apiVersion" : v2gvr .Group + "/" + v2gvr .Version ,
186
218
"kind" : crd .Spec .Names .Kind ,
187
219
"metadata" : map [string ]interface {}{
188
220
"name" : name ,
@@ -216,15 +248,23 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fr
216
248
gomega .Expect (listResultToNames (list )).To (gomega .Equal (sets .New (crNames [1 ])))
217
249
218
250
ginkgo .By ("Waiting for watch events to contain v2 custom resources for field selector host=host1" )
219
- gomega .Eventually (ctx , watchAccumulator ( hostWatch ) ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
251
+ gomega .Eventually (ctx , v2hostWatchAcc ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
220
252
Should (gomega .Equal (addedEvents (sets .New (crNames [0 ], crNames [1 ]))))
221
253
222
254
ginkgo .By ("Waiting for watch events to contain v2 custom resources for field selector host=host1,port=80" )
223
- gomega .Eventually (ctx , watchAccumulator ( v2hostPortWatch ) ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
255
+ gomega .Eventually (ctx , v2hostPortWatchAcc ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
224
256
Should (gomega .Equal (addedEvents (sets .New (crNames [0 ]))))
225
257
226
258
ginkgo .By ("Waiting for watch events to contain v1 custom resources for field selector hostPort=host1:80" )
227
- gomega .Eventually (ctx , watchAccumulator (v1hostPortWatch )).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
259
+ gomega .Eventually (ctx , v1hostPortWatchAcc ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
260
+ Should (gomega .Equal (addedEvents (sets .New (crNames [0 ]))))
261
+
262
+ ginkgo .By ("Waiting for informer events to contain v2 custom resources for field selector host=host1" )
263
+ gomega .Eventually (ctx , v2HostInformerAcc ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
264
+ Should (gomega .Equal (addedEvents (sets .New (crNames [0 ], crNames [1 ]))))
265
+
266
+ ginkgo .By ("Waiting for informer events to contain v2 custom resources for field selector host=host1,port=80" )
267
+ gomega .Eventually (ctx , v2HostPortInformerAcc ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
228
268
Should (gomega .Equal (addedEvents (sets .New (crNames [0 ]))))
229
269
230
270
ginkgo .By ("Deleting one custom resources to ensure that deletions are observed" )
@@ -250,18 +290,25 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fr
250
290
gomega .Expect (listResultToNames (list )).To (gomega .Equal (sets .New [string ]()))
251
291
252
292
ginkgo .By ("Waiting for v2 watch events after updates and deletes for field selector host=host1" )
253
- gomega .Eventually (ctx , watchAccumulator ( hostWatch ) ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
254
- Should (gomega .Equal (deletedEvents ( sets .New (crNames [0 ], crNames [1 ]))))
293
+ gomega .Eventually (ctx , v2hostWatchAcc ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
294
+ Should (gomega .Equal (addedAndDeletedEvents ( sets . New ( crNames [ 0 ], crNames [ 1 ]), sets .New (crNames [0 ], crNames [1 ]))))
255
295
256
296
ginkgo .By ("Waiting for v2 watch events after updates and deletes for field selector host=host1,port=80" )
257
- gomega .Eventually (ctx , watchAccumulator ( v2hostPortWatch ) ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
258
- Should (gomega .Equal (deletedEvents ( sets .New (crNames [0 ]))))
297
+ gomega .Eventually (ctx , v2hostPortWatchAcc ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
298
+ Should (gomega .Equal (addedAndDeletedEvents ( sets . New ( crNames [ 0 ]), sets .New (crNames [0 ]))))
259
299
260
300
ginkgo .By ("Waiting for v1 watch events after updates and deletes for field selector hostPort=host1:80" )
261
- gomega .Eventually (ctx , watchAccumulator (v1hostPortWatch )).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
262
- Should (gomega .Equal (deletedEvents (sets .New (crNames [0 ]))))
263
- })
301
+ gomega .Eventually (ctx , v1hostPortWatchAcc ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
302
+ Should (gomega .Equal (addedAndDeletedEvents (sets .New (crNames [0 ]), sets .New (crNames [0 ]))))
264
303
304
+ ginkgo .By ("Waiting for v2 informer events after updates and deletes for field selector host=host1" )
305
+ gomega .Eventually (ctx , v2HostInformerAcc ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
306
+ Should (gomega .Equal (addedAndDeletedEvents (sets .New (crNames [0 ], crNames [1 ]), sets .New (crNames [0 ], crNames [1 ]))))
307
+
308
+ ginkgo .By ("Waiting for v2 informer events after updates and deletes for field selector host=host1,port=80" )
309
+ gomega .Eventually (ctx , v2HostPortInformerAcc ).WithPolling (5 * time .Millisecond ).WithTimeout (30 * time .Second ).
310
+ Should (gomega .Equal (addedAndDeletedEvents (sets .New (crNames [0 ]), sets .New (crNames [0 ]))))
311
+ })
265
312
})
266
313
})
267
314
@@ -277,8 +324,8 @@ func addedEvents(added sets.Set[string]) *accumulatedEvents {
277
324
return & accumulatedEvents {added : added , deleted : sets .New [string ]()}
278
325
}
279
326
280
- func deletedEvents ( deleted sets.Set [string ]) * accumulatedEvents {
281
- return & accumulatedEvents {added : sets . New [ string ]() , deleted : deleted }
327
+ func addedAndDeletedEvents ( added , deleted sets.Set [string ]) * accumulatedEvents {
328
+ return & accumulatedEvents {added : added , deleted : deleted }
282
329
}
283
330
284
331
func watchAccumulator (w watch.Interface ) func (ctx context.Context ) (* accumulatedEvents , error ) {
@@ -302,6 +349,39 @@ func watchAccumulator(w watch.Interface) func(ctx context.Context) (*accumulated
302
349
}
303
350
}
304
351
352
+ func informerAccumulator (informer cache.SharedIndexInformer ) func (ctx context.Context ) (* accumulatedEvents , error ) {
353
+ var lock sync.Mutex
354
+ result := emptyEvents ()
355
+
356
+ _ , err := informer .AddEventHandler (cache.ResourceEventHandlerFuncs {
357
+ AddFunc : func (obj any ) {
358
+ defer ginkgo .GinkgoRecover ()
359
+ lock .Lock ()
360
+ defer lock .Unlock ()
361
+ result .added .Insert (obj .(* unstructured.Unstructured ).GetName ())
362
+ },
363
+ UpdateFunc : func (oldObj , newObj any ) {
364
+ defer ginkgo .GinkgoRecover ()
365
+ // ignoring
366
+ },
367
+ DeleteFunc : func (obj any ) {
368
+ defer ginkgo .GinkgoRecover ()
369
+ lock .Lock ()
370
+ defer lock .Unlock ()
371
+ result .deleted .Insert (obj .(* unstructured.Unstructured ).GetName ())
372
+ },
373
+ })
374
+
375
+ return func (ctx context.Context ) (* accumulatedEvents , error ) {
376
+ if err != nil {
377
+ return nil , err
378
+ }
379
+ lock .Lock ()
380
+ defer lock .Unlock ()
381
+ return result , nil
382
+ }
383
+ }
384
+
305
385
func listResultToNames (list * unstructured.UnstructuredList ) sets.Set [string ] {
306
386
found := sets .New [string ]()
307
387
for _ , i := range list .Items {
0 commit comments