Skip to content

Commit eed4930

Browse files
authored
Merge pull request kubernetes#128896 from danwinship/e2e-endpoints
Remove all references to v1.Endpoints from non-network e2e tests
2 parents e69a5ed + 969ecab commit eed4930

File tree

8 files changed

+57
-348
lines changed

8 files changed

+57
-348
lines changed

test/e2e/apimachinery/aggregator.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
appsv1 "k8s.io/api/apps/v1"
3030
v1 "k8s.io/api/core/v1"
31+
discoveryv1 "k8s.io/api/discovery/v1"
3132
rbacv1 "k8s.io/api/rbac/v1"
3233
apierrors "k8s.io/apimachinery/pkg/api/errors"
3334
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -763,9 +764,9 @@ func validateErrorWithDebugInfo(ctx context.Context, f *framework.Framework, err
763764
msg := fmt.Sprintf(msg, fields...)
764765
msg += fmt.Sprintf(" but received unexpected error:\n%v", err)
765766
client := f.ClientSet
766-
ep, err := client.CoreV1().Endpoints(namespace).Get(ctx, "sample-api", metav1.GetOptions{})
767+
slices, err := client.DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, "sample-api")})
767768
if err == nil {
768-
msg += fmt.Sprintf("\nFound endpoints for sample-api:\n%v", ep)
769+
msg += fmt.Sprintf("\nFound endpoint slices for sample-api:\n%v", slices)
769770
}
770771
pds, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
771772
if err == nil {

test/e2e/framework/endpoints/.import-restrictions

Lines changed: 0 additions & 12 deletions
This file was deleted.

test/e2e/framework/network/utils.go

Lines changed: 0 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ import (
4545
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
4646
e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
4747
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
48-
e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
4948
imageutils "k8s.io/kubernetes/test/utils/image"
5049
netutils "k8s.io/utils/net"
5150
)
@@ -1099,101 +1098,6 @@ func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Re
10991098
return client.Get(url)
11001099
}
11011100

1102-
// TestUnderTemporaryNetworkFailure blocks outgoing network traffic on 'node'. Then runs testFunc and returns its status.
1103-
// At the end (even in case of errors), the network traffic is brought back to normal.
1104-
// This function executes commands on a node so it will work only for some
1105-
// environments.
1106-
func TestUnderTemporaryNetworkFailure(ctx context.Context, c clientset.Interface, ns string, node *v1.Node, testFunc func(ctx context.Context)) {
1107-
host, err := e2enode.GetSSHExternalIP(node)
1108-
if err != nil {
1109-
framework.Failf("Error getting node external ip : %v", err)
1110-
}
1111-
controlPlaneAddresses := framework.GetControlPlaneAddresses(ctx, c)
1112-
ginkgo.By(fmt.Sprintf("block network traffic from node %s to the control plane", node.Name))
1113-
defer func() {
1114-
// This code will execute even if setting the iptables rule failed.
1115-
// It is on purpose because we may have an error even if the new rule
1116-
// had been inserted. (yes, we could look at the error code and ssh error
1117-
// separately, but I prefer to stay on the safe side).
1118-
ginkgo.By(fmt.Sprintf("Unblock network traffic from node %s to the control plane", node.Name))
1119-
for _, instanceAddress := range controlPlaneAddresses {
1120-
UnblockNetwork(ctx, host, instanceAddress)
1121-
}
1122-
}()
1123-
1124-
framework.Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name)
1125-
if !e2enode.WaitConditionToBe(ctx, c, node.Name, v1.NodeReady, true, resizeNodeReadyTimeout) {
1126-
framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
1127-
}
1128-
for _, instanceAddress := range controlPlaneAddresses {
1129-
BlockNetwork(ctx, host, instanceAddress)
1130-
}
1131-
1132-
framework.Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name)
1133-
if !e2enode.WaitConditionToBe(ctx, c, node.Name, v1.NodeReady, false, resizeNodeNotReadyTimeout) {
1134-
framework.Failf("Node %s did not become not-ready within %v", node.Name, resizeNodeNotReadyTimeout)
1135-
}
1136-
1137-
testFunc(ctx)
1138-
// network traffic is unblocked in a deferred function
1139-
}
1140-
1141-
// BlockNetwork blocks network between the given from value and the given to value.
1142-
// The following helper functions can block/unblock network from source
1143-
// host to destination host by manipulating iptable rules.
1144-
// This function assumes it can ssh to the source host.
1145-
//
1146-
// Caution:
1147-
// Recommend to input IP instead of hostnames. Using hostnames will cause iptables to
1148-
// do a DNS lookup to resolve the name to an IP address, which will
1149-
// slow down the test and cause it to fail if DNS is absent or broken.
1150-
//
1151-
// Suggested usage pattern:
1152-
//
1153-
// func foo() {
1154-
// ...
1155-
// defer UnblockNetwork(from, to)
1156-
// BlockNetwork(from, to)
1157-
// ...
1158-
// }
1159-
func BlockNetwork(ctx context.Context, from string, to string) {
1160-
framework.Logf("block network traffic from %s to %s", from, to)
1161-
iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to)
1162-
dropCmd := fmt.Sprintf("sudo iptables --insert %s", iptablesRule)
1163-
if result, err := e2essh.SSH(ctx, dropCmd, from, framework.TestContext.Provider); result.Code != 0 || err != nil {
1164-
e2essh.LogResult(result)
1165-
framework.Failf("Unexpected error: %v", err)
1166-
}
1167-
}
1168-
1169-
// UnblockNetwork unblocks network between the given from value and the given to value.
1170-
func UnblockNetwork(ctx context.Context, from string, to string) {
1171-
framework.Logf("Unblock network traffic from %s to %s", from, to)
1172-
iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to)
1173-
undropCmd := fmt.Sprintf("sudo iptables --delete %s", iptablesRule)
1174-
// Undrop command may fail if the rule has never been created.
1175-
// In such case we just lose 30 seconds, but the cluster is healthy.
1176-
// But if the rule had been created and removing it failed, the node is broken and
1177-
// not coming back. Subsequent tests will run or fewer nodes (some of the tests
1178-
// may fail). Manual intervention is required in such case (recreating the
1179-
// cluster solves the problem too).
1180-
err := wait.PollUntilContextTimeout(ctx, time.Millisecond*100, time.Second*30, false, func(ctx context.Context) (bool, error) {
1181-
result, err := e2essh.SSH(ctx, undropCmd, from, framework.TestContext.Provider)
1182-
if result.Code == 0 && err == nil {
1183-
return true, nil
1184-
}
1185-
e2essh.LogResult(result)
1186-
if err != nil {
1187-
framework.Logf("Unexpected error: %v", err)
1188-
}
1189-
return false, nil
1190-
})
1191-
if err != nil {
1192-
framework.Failf("Failed to remove the iptable REJECT rule. Manual intervention is "+
1193-
"required on host %s: remove rule %s, if exists", from, iptablesRule)
1194-
}
1195-
}
1196-
11971101
// WaitForService waits until the service appears (exist == true), or disappears (exist == false)
11981102
func WaitForService(ctx context.Context, c clientset.Interface, namespace, name string, exist bool, interval, timeout time.Duration) error {
11991103
err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {

test/e2e/framework/service/jig.go

Lines changed: 34 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,13 @@ import (
3333
policyv1 "k8s.io/api/policy/v1"
3434
apierrors "k8s.io/apimachinery/pkg/api/errors"
3535
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36-
"k8s.io/apimachinery/pkg/fields"
3736
"k8s.io/apimachinery/pkg/labels"
38-
"k8s.io/apimachinery/pkg/runtime"
3937
"k8s.io/apimachinery/pkg/util/intstr"
4038
utilnet "k8s.io/apimachinery/pkg/util/net"
4139
"k8s.io/apimachinery/pkg/util/sets"
4240
"k8s.io/apimachinery/pkg/util/uuid"
4341
"k8s.io/apimachinery/pkg/util/wait"
44-
"k8s.io/apimachinery/pkg/watch"
4542
clientset "k8s.io/client-go/kubernetes"
46-
"k8s.io/client-go/tools/cache"
4743
"k8s.io/kubernetes/test/e2e/framework"
4844
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
4945
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
@@ -404,42 +400,47 @@ func (j *TestJig) GetEndpointNodeNames(ctx context.Context) (sets.String, error)
404400
if err != nil {
405401
return nil, err
406402
}
407-
endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{})
403+
slices, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, metav1.ListOptions{LabelSelector: discoveryv1.LabelServiceName + "=" + j.Name})
408404
if err != nil {
409-
return nil, fmt.Errorf("get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err)
410-
}
411-
if len(endpoints.Subsets) == 0 {
412-
return nil, fmt.Errorf("endpoint has no subsets, cannot determine node addresses")
405+
return nil, fmt.Errorf("list endpointslices for service %s/%s failed (%w)", j.Namespace, j.Name, err)
413406
}
414407
epNodes := sets.NewString()
415-
for _, ss := range endpoints.Subsets {
416-
for _, e := range ss.Addresses {
417-
if e.NodeName != nil {
418-
epNodes.Insert(*e.NodeName)
408+
for _, slice := range slices.Items {
409+
for _, ep := range slice.Endpoints {
410+
if ep.NodeName != nil {
411+
epNodes.Insert(*ep.NodeName)
419412
}
420413
}
421414
}
415+
if len(epNodes) == 0 {
416+
return nil, fmt.Errorf("EndpointSlice has no endpoints, cannot determine node addresses")
417+
}
422418
return epNodes, nil
423419
}
424420

425-
// WaitForEndpointOnNode waits for a service endpoint on the given node.
421+
// WaitForEndpointOnNode waits for a service endpoint on the given node (which must be the service's only endpoint).
426422
func (j *TestJig) WaitForEndpointOnNode(ctx context.Context, nodeName string) error {
427423
return wait.PollUntilContextTimeout(ctx, framework.Poll, KubeProxyLagTimeout, true, func(ctx context.Context) (bool, error) {
428-
endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{})
424+
slices, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, metav1.ListOptions{LabelSelector: "kubernetes.io/service-name=" + j.Name})
429425
if err != nil {
430-
framework.Logf("Get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err)
426+
framework.Logf("List endpointslices for service %s/%s failed (%s)", j.Namespace, j.Name, err)
427+
return false, nil
428+
}
429+
if len(slices.Items) == 0 {
430+
framework.Logf("Expected 1 EndpointSlice for service %s/%s, got 0", j.Namespace, j.Name)
431431
return false, nil
432432
}
433-
if len(endpoints.Subsets) == 0 {
434-
framework.Logf("Expect endpoints with subsets, got none.")
433+
slice := slices.Items[0]
434+
if len(slice.Endpoints) == 0 {
435+
framework.Logf("Expected EndpointSlice with Endpoints, got none.")
435436
return false, nil
436437
}
437-
// TODO: Handle multiple endpoints
438-
if len(endpoints.Subsets[0].Addresses) == 0 {
438+
endpoint := slice.Endpoints[0]
439+
if len(endpoint.Addresses) == 0 || (endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready) {
439440
framework.Logf("Expected Ready endpoints - found none")
440441
return false, nil
441442
}
442-
epHostName := *endpoints.Subsets[0].Addresses[0].NodeName
443+
epHostName := *endpoint.NodeName
443444
framework.Logf("Pod for service %s/%s is on node %s", j.Namespace, j.Name, epHostName)
444445
if epHostName != nodeName {
445446
framework.Logf("Found endpoint on wrong node, expected %v, got %v", nodeName, epHostName)
@@ -451,91 +452,18 @@ func (j *TestJig) WaitForEndpointOnNode(ctx context.Context, nodeName string) er
451452

452453
// waitForAvailableEndpoint waits for at least 1 endpoint to be available till timeout
453454
func (j *TestJig) waitForAvailableEndpoint(ctx context.Context, timeout time.Duration) error {
454-
ctx, cancel := context.WithTimeout(ctx, timeout)
455-
defer cancel()
456-
//Wait for endpoints to be created, this may take longer time if service backing pods are taking longer time to run
457-
endpointSelector := fields.OneTermEqualSelector("metadata.name", j.Name)
458-
endpointAvailable := false
459-
endpointSliceAvailable := false
460-
461-
var controller cache.Controller
462-
_, controller = cache.NewInformer(
463-
&cache.ListWatch{
464-
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
465-
options.FieldSelector = endpointSelector.String()
466-
obj, err := j.Client.CoreV1().Endpoints(j.Namespace).List(ctx, options)
467-
return runtime.Object(obj), err
468-
},
469-
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
470-
options.FieldSelector = endpointSelector.String()
471-
return j.Client.CoreV1().Endpoints(j.Namespace).Watch(ctx, options)
472-
},
473-
},
474-
&v1.Endpoints{},
475-
0,
476-
cache.ResourceEventHandlerFuncs{
477-
AddFunc: func(obj interface{}) {
478-
if e, ok := obj.(*v1.Endpoints); ok {
479-
if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 {
480-
endpointAvailable = true
481-
}
482-
}
483-
},
484-
UpdateFunc: func(old, cur interface{}) {
485-
if e, ok := cur.(*v1.Endpoints); ok {
486-
if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 {
487-
endpointAvailable = true
488-
}
489-
}
490-
},
491-
},
492-
)
493-
494-
go controller.Run(ctx.Done())
495-
496-
var esController cache.Controller
497-
_, esController = cache.NewInformer(
498-
&cache.ListWatch{
499-
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
500-
options.LabelSelector = "kubernetes.io/service-name=" + j.Name
501-
obj, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, options)
502-
return runtime.Object(obj), err
503-
},
504-
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
505-
options.LabelSelector = "kubernetes.io/service-name=" + j.Name
506-
return j.Client.DiscoveryV1().EndpointSlices(j.Namespace).Watch(ctx, options)
507-
},
508-
},
509-
&discoveryv1.EndpointSlice{},
510-
0,
511-
cache.ResourceEventHandlerFuncs{
512-
AddFunc: func(obj interface{}) {
513-
if es, ok := obj.(*discoveryv1.EndpointSlice); ok {
514-
// TODO: currently we only consider addresses in 1 slice, but services with
515-
// a large number of endpoints (>1000) may have multiple slices. Some slices
516-
// with only a few addresses. We should check the addresses in all slices.
517-
if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 {
518-
endpointSliceAvailable = true
519-
}
520-
}
521-
},
522-
UpdateFunc: func(old, cur interface{}) {
523-
if es, ok := cur.(*discoveryv1.EndpointSlice); ok {
524-
// TODO: currently we only consider addresses in 1 slice, but services with
525-
// a large number of endpoints (>1000) may have multiple slices. Some slices
526-
// with only a few addresses. We should check the addresses in all slices.
527-
if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 {
528-
endpointSliceAvailable = true
529-
}
530-
}
531-
},
532-
},
533-
)
534-
535-
go esController.Run(ctx.Done())
536-
537-
err := wait.PollUntilContextCancel(ctx, 1*time.Second, false, func(ctx context.Context) (bool, error) {
538-
return endpointAvailable && endpointSliceAvailable, nil
455+
err := wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, true, func(ctx context.Context) (bool, error) {
456+
slices, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, metav1.ListOptions{LabelSelector: "kubernetes.io/service-name=" + j.Name})
457+
if err != nil || len(slices.Items) == 0 {
458+
// Retry
459+
return false, nil
460+
}
461+
for _, es := range slices.Items {
462+
if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 {
463+
return true, nil
464+
}
465+
}
466+
return false, nil
539467
})
540468
if err != nil {
541469
return fmt.Errorf("no subset of available IP address found for the endpoint %s within timeout %v", j.Name, timeout)

0 commit comments

Comments
 (0)