@@ -33,17 +33,13 @@ import (
33
33
policyv1 "k8s.io/api/policy/v1"
34
34
apierrors "k8s.io/apimachinery/pkg/api/errors"
35
35
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36
- "k8s.io/apimachinery/pkg/fields"
37
36
"k8s.io/apimachinery/pkg/labels"
38
- "k8s.io/apimachinery/pkg/runtime"
39
37
"k8s.io/apimachinery/pkg/util/intstr"
40
38
utilnet "k8s.io/apimachinery/pkg/util/net"
41
39
"k8s.io/apimachinery/pkg/util/sets"
42
40
"k8s.io/apimachinery/pkg/util/uuid"
43
41
"k8s.io/apimachinery/pkg/util/wait"
44
- "k8s.io/apimachinery/pkg/watch"
45
42
clientset "k8s.io/client-go/kubernetes"
46
- "k8s.io/client-go/tools/cache"
47
43
"k8s.io/kubernetes/test/e2e/framework"
48
44
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
49
45
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
@@ -404,42 +400,47 @@ func (j *TestJig) GetEndpointNodeNames(ctx context.Context) (sets.String, error)
404
400
if err != nil {
405
401
return nil , err
406
402
}
407
- endpoints , err := j .Client .CoreV1 ().Endpoints (j .Namespace ).Get (ctx , j . Name , metav1.GetOptions { })
403
+ slices , err := j .Client .DiscoveryV1 ().EndpointSlices (j .Namespace ).List (ctx , metav1.ListOptions { LabelSelector : discoveryv1 . LabelServiceName + "=" + j . Name })
408
404
if err != nil {
409
- return nil , fmt .Errorf ("get endpoints for service %s/%s failed (%s)" , j .Namespace , j .Name , err )
410
- }
411
- if len (endpoints .Subsets ) == 0 {
412
- return nil , fmt .Errorf ("endpoint has no subsets, cannot determine node addresses" )
405
+ return nil , fmt .Errorf ("list endpointslices for service %s/%s failed (%w)" , j .Namespace , j .Name , err )
413
406
}
414
407
epNodes := sets .NewString ()
415
- for _ , ss := range endpoints . Subsets {
416
- for _ , e := range ss . Addresses {
417
- if e .NodeName != nil {
418
- epNodes .Insert (* e .NodeName )
408
+ for _ , slice := range slices . Items {
409
+ for _ , ep := range slice . Endpoints {
410
+ if ep .NodeName != nil {
411
+ epNodes .Insert (* ep .NodeName )
419
412
}
420
413
}
421
414
}
415
+ if len (epNodes ) == 0 {
416
+ return nil , fmt .Errorf ("EndpointSlice has no endpoints, cannot determine node addresses" )
417
+ }
422
418
return epNodes , nil
423
419
}
424
420
425
- // WaitForEndpointOnNode waits for a service endpoint on the given node.
421
+ // WaitForEndpointOnNode waits for a service endpoint on the given node (which must be the service's only endpoint) .
426
422
func (j * TestJig ) WaitForEndpointOnNode (ctx context.Context , nodeName string ) error {
427
423
return wait .PollUntilContextTimeout (ctx , framework .Poll , KubeProxyLagTimeout , true , func (ctx context.Context ) (bool , error ) {
428
- endpoints , err := j .Client .CoreV1 ().Endpoints (j .Namespace ).Get (ctx , j . Name , metav1.GetOptions { })
424
+ slices , err := j .Client .DiscoveryV1 ().EndpointSlices (j .Namespace ).List (ctx , metav1.ListOptions { LabelSelector : "kubernetes.io/service-name=" + j . Name })
429
425
if err != nil {
430
- framework .Logf ("Get endpoints for service %s/%s failed (%s)" , j .Namespace , j .Name , err )
426
+ framework .Logf ("List endpointslices for service %s/%s failed (%s)" , j .Namespace , j .Name , err )
427
+ return false , nil
428
+ }
429
+ if len (slices .Items ) == 0 {
430
+ framework .Logf ("Expected 1 EndpointSlice for service %s/%s, got 0" , j .Namespace , j .Name )
431
431
return false , nil
432
432
}
433
- if len (endpoints .Subsets ) == 0 {
434
- framework .Logf ("Expect endpoints with subsets, got none." )
433
+ slice := slices .Items [0 ]
434
+ if len (slice .Endpoints ) == 0 {
435
+ framework .Logf ("Expected EndpointSlice with Endpoints, got none." )
435
436
return false , nil
436
437
}
437
- // TODO: Handle multiple endpoints
438
- if len (endpoints . Subsets [ 0 ]. Addresses ) == 0 {
438
+ endpoint := slice . Endpoints [ 0 ]
439
+ if len (endpoint . Addresses ) == 0 || ( endpoint . Conditions . Ready != nil && ! * endpoint . Conditions . Ready ) {
439
440
framework .Logf ("Expected Ready endpoints - found none" )
440
441
return false , nil
441
442
}
442
- epHostName := * endpoints . Subsets [ 0 ]. Addresses [ 0 ] .NodeName
443
+ epHostName := * endpoint .NodeName
443
444
framework .Logf ("Pod for service %s/%s is on node %s" , j .Namespace , j .Name , epHostName )
444
445
if epHostName != nodeName {
445
446
framework .Logf ("Found endpoint on wrong node, expected %v, got %v" , nodeName , epHostName )
@@ -451,91 +452,18 @@ func (j *TestJig) WaitForEndpointOnNode(ctx context.Context, nodeName string) er
451
452
452
453
// waitForAvailableEndpoint waits for at least 1 endpoint to be available till timeout
453
454
func (j * TestJig ) waitForAvailableEndpoint (ctx context.Context , timeout time.Duration ) error {
454
- ctx , cancel := context .WithTimeout (ctx , timeout )
455
- defer cancel ()
456
- //Wait for endpoints to be created, this may take longer time if service backing pods are taking longer time to run
457
- endpointSelector := fields .OneTermEqualSelector ("metadata.name" , j .Name )
458
- endpointAvailable := false
459
- endpointSliceAvailable := false
460
-
461
- var controller cache.Controller
462
- _ , controller = cache .NewInformer (
463
- & cache.ListWatch {
464
- ListFunc : func (options metav1.ListOptions ) (runtime.Object , error ) {
465
- options .FieldSelector = endpointSelector .String ()
466
- obj , err := j .Client .CoreV1 ().Endpoints (j .Namespace ).List (ctx , options )
467
- return runtime .Object (obj ), err
468
- },
469
- WatchFunc : func (options metav1.ListOptions ) (watch.Interface , error ) {
470
- options .FieldSelector = endpointSelector .String ()
471
- return j .Client .CoreV1 ().Endpoints (j .Namespace ).Watch (ctx , options )
472
- },
473
- },
474
- & v1.Endpoints {},
475
- 0 ,
476
- cache.ResourceEventHandlerFuncs {
477
- AddFunc : func (obj interface {}) {
478
- if e , ok := obj .(* v1.Endpoints ); ok {
479
- if len (e .Subsets ) > 0 && len (e .Subsets [0 ].Addresses ) > 0 {
480
- endpointAvailable = true
481
- }
482
- }
483
- },
484
- UpdateFunc : func (old , cur interface {}) {
485
- if e , ok := cur .(* v1.Endpoints ); ok {
486
- if len (e .Subsets ) > 0 && len (e .Subsets [0 ].Addresses ) > 0 {
487
- endpointAvailable = true
488
- }
489
- }
490
- },
491
- },
492
- )
493
-
494
- go controller .Run (ctx .Done ())
495
-
496
- var esController cache.Controller
497
- _ , esController = cache .NewInformer (
498
- & cache.ListWatch {
499
- ListFunc : func (options metav1.ListOptions ) (runtime.Object , error ) {
500
- options .LabelSelector = "kubernetes.io/service-name=" + j .Name
501
- obj , err := j .Client .DiscoveryV1 ().EndpointSlices (j .Namespace ).List (ctx , options )
502
- return runtime .Object (obj ), err
503
- },
504
- WatchFunc : func (options metav1.ListOptions ) (watch.Interface , error ) {
505
- options .LabelSelector = "kubernetes.io/service-name=" + j .Name
506
- return j .Client .DiscoveryV1 ().EndpointSlices (j .Namespace ).Watch (ctx , options )
507
- },
508
- },
509
- & discoveryv1.EndpointSlice {},
510
- 0 ,
511
- cache.ResourceEventHandlerFuncs {
512
- AddFunc : func (obj interface {}) {
513
- if es , ok := obj .(* discoveryv1.EndpointSlice ); ok {
514
- // TODO: currently we only consider addresses in 1 slice, but services with
515
- // a large number of endpoints (>1000) may have multiple slices. Some slices
516
- // with only a few addresses. We should check the addresses in all slices.
517
- if len (es .Endpoints ) > 0 && len (es .Endpoints [0 ].Addresses ) > 0 {
518
- endpointSliceAvailable = true
519
- }
520
- }
521
- },
522
- UpdateFunc : func (old , cur interface {}) {
523
- if es , ok := cur .(* discoveryv1.EndpointSlice ); ok {
524
- // TODO: currently we only consider addresses in 1 slice, but services with
525
- // a large number of endpoints (>1000) may have multiple slices. Some slices
526
- // with only a few addresses. We should check the addresses in all slices.
527
- if len (es .Endpoints ) > 0 && len (es .Endpoints [0 ].Addresses ) > 0 {
528
- endpointSliceAvailable = true
529
- }
530
- }
531
- },
532
- },
533
- )
534
-
535
- go esController .Run (ctx .Done ())
536
-
537
- err := wait .PollUntilContextCancel (ctx , 1 * time .Second , false , func (ctx context.Context ) (bool , error ) {
538
- return endpointAvailable && endpointSliceAvailable , nil
455
+ err := wait .PollUntilContextTimeout (ctx , framework .Poll , timeout , true , func (ctx context.Context ) (bool , error ) {
456
+ slices , err := j .Client .DiscoveryV1 ().EndpointSlices (j .Namespace ).List (ctx , metav1.ListOptions {LabelSelector : "kubernetes.io/service-name=" + j .Name })
457
+ if err != nil || len (slices .Items ) == 0 {
458
+ // Retry
459
+ return false , nil
460
+ }
461
+ for _ , es := range slices .Items {
462
+ if len (es .Endpoints ) > 0 && len (es .Endpoints [0 ].Addresses ) > 0 {
463
+ return true , nil
464
+ }
465
+ }
466
+ return false , nil
539
467
})
540
468
if err != nil {
541
469
return fmt .Errorf ("no subset of available IP address found for the endpoint %s within timeout %v" , j .Name , timeout )
0 commit comments