Skip to content

Commit 6526714

Browse files
author
Antonio Ojea
committed
dualstack endpoints integration tests
add integration tests to verify the behaviour of the endpoints and endpointslices controller with dual stack services. Since services can be single or dual stack, endpoints should be generated for each IP family in the endpoint slice controller. The legacy endpoint controller only will generate endpoints in the first IP family configured in the service. integration fix
1 parent 1f0371b commit 6526714

File tree

2 files changed

+297
-0
lines changed

2 files changed

+297
-0
lines changed

test/integration/dualstack/BUILD

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,23 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test")
33
go_test(
44
name = "go_default_test",
55
srcs = [
6+
"dualstack_endpoints_test.go",
67
"dualstack_test.go",
78
"main_test.go",
89
],
910
tags = ["integration"],
1011
deps = [
12+
"//pkg/controller/endpoint:go_default_library",
13+
"//pkg/controller/endpointslice:go_default_library",
1114
"//pkg/features:go_default_library",
1215
"//staging/src/k8s.io/api/core/v1:go_default_library",
16+
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
1317
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
1418
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
1519
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
1620
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
1721
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
22+
"//staging/src/k8s.io/client-go/informers:go_default_library",
1823
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
1924
"//staging/src/k8s.io/client-go/rest:go_default_library",
2025
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package dualstack
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"net"
23+
"testing"
24+
"time"
25+
26+
v1 "k8s.io/api/core/v1"
27+
discovery "k8s.io/api/discovery/v1beta1"
28+
apierrors "k8s.io/apimachinery/pkg/api/errors"
29+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
"k8s.io/apimachinery/pkg/util/intstr"
31+
"k8s.io/apimachinery/pkg/util/wait"
32+
utilfeature "k8s.io/apiserver/pkg/util/feature"
33+
"k8s.io/client-go/informers"
34+
clientset "k8s.io/client-go/kubernetes"
35+
restclient "k8s.io/client-go/rest"
36+
featuregatetesting "k8s.io/component-base/featuregate/testing"
37+
"k8s.io/kubernetes/pkg/controller/endpoint"
38+
"k8s.io/kubernetes/pkg/controller/endpointslice"
39+
"k8s.io/kubernetes/pkg/features"
40+
"k8s.io/kubernetes/test/integration/framework"
41+
)
42+
43+
func TestDualStackEndpoints(t *testing.T) {
44+
// Create an IPv4IPv6 dual stack control-plane
45+
serviceCIDR := "10.0.0.0/16"
46+
secondaryServiceCIDR := "2001:db8:1::/48"
47+
labelMap := func() map[string]string {
48+
return map[string]string{"foo": "bar"}
49+
}
50+
51+
dualStack := true
52+
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, dualStack)()
53+
54+
cfg := framework.NewIntegrationTestMasterConfig()
55+
_, cidr, err := net.ParseCIDR(serviceCIDR)
56+
if err != nil {
57+
t.Fatalf("Bad cidr: %v", err)
58+
}
59+
cfg.ExtraConfig.ServiceIPRange = *cidr
60+
61+
_, secCidr, err := net.ParseCIDR(secondaryServiceCIDR)
62+
if err != nil {
63+
t.Fatalf("Bad cidr: %v", err)
64+
}
65+
cfg.ExtraConfig.SecondaryServiceIPRange = *secCidr
66+
67+
_, s, closeFn := framework.RunAMaster(cfg)
68+
defer closeFn()
69+
70+
client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL})
71+
72+
// Wait until the default "kubernetes" service is created.
73+
if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
74+
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
75+
if err != nil && !apierrors.IsNotFound(err) {
76+
return false, err
77+
}
78+
return !apierrors.IsNotFound(err), nil
79+
}); err != nil {
80+
t.Fatalf("Creating kubernetes service timed out")
81+
}
82+
83+
resyncPeriod := 0 * time.Hour
84+
informers := informers.NewSharedInformerFactory(client, resyncPeriod)
85+
86+
// Create fake node
87+
testNode := &v1.Node{
88+
ObjectMeta: metav1.ObjectMeta{
89+
Name: "fakenode",
90+
},
91+
Spec: v1.NodeSpec{Unschedulable: false},
92+
Status: v1.NodeStatus{
93+
Conditions: []v1.NodeCondition{
94+
{
95+
Type: v1.NodeReady,
96+
Status: v1.ConditionTrue,
97+
Reason: fmt.Sprintf("schedulable condition"),
98+
LastHeartbeatTime: metav1.Time{Time: time.Now()},
99+
},
100+
},
101+
},
102+
}
103+
if _, err := client.CoreV1().Nodes().Create(context.TODO(), testNode, metav1.CreateOptions{}); err != nil {
104+
t.Fatalf("Failed to create Node %q: %v", testNode.Name, err)
105+
}
106+
107+
epController := endpoint.NewEndpointController(
108+
informers.Core().V1().Pods(),
109+
informers.Core().V1().Services(),
110+
informers.Core().V1().Endpoints(),
111+
client,
112+
1*time.Second)
113+
114+
epsController := endpointslice.NewController(
115+
informers.Core().V1().Pods(),
116+
informers.Core().V1().Services(),
117+
informers.Core().V1().Nodes(),
118+
informers.Discovery().V1beta1().EndpointSlices(),
119+
int32(100),
120+
client,
121+
1*time.Second)
122+
123+
// Start informer and controllers
124+
stopCh := make(chan struct{})
125+
defer close(stopCh)
126+
informers.Start(stopCh)
127+
// use only one worker to serialize the updates
128+
go epController.Run(1, stopCh)
129+
go epsController.Run(1, stopCh)
130+
131+
var testcases = []struct {
132+
name string
133+
serviceType v1.ServiceType
134+
ipFamilies []v1.IPFamily
135+
ipFamilyPolicy v1.IPFamilyPolicyType
136+
}{
137+
{
138+
name: "Service IPv4 Only",
139+
serviceType: v1.ServiceTypeClusterIP,
140+
ipFamilies: []v1.IPFamily{v1.IPv4Protocol},
141+
ipFamilyPolicy: v1.IPFamilyPolicySingleStack,
142+
},
143+
{
144+
name: "Service IPv6 Only",
145+
serviceType: v1.ServiceTypeClusterIP,
146+
ipFamilies: []v1.IPFamily{v1.IPv6Protocol},
147+
ipFamilyPolicy: v1.IPFamilyPolicySingleStack,
148+
},
149+
{
150+
name: "Service IPv6 IPv4 Dual Stack",
151+
serviceType: v1.ServiceTypeClusterIP,
152+
ipFamilies: []v1.IPFamily{v1.IPv6Protocol, v1.IPv4Protocol},
153+
ipFamilyPolicy: v1.IPFamilyPolicyRequireDualStack,
154+
},
155+
{
156+
name: "Service IPv4 IPv6 Dual Stack",
157+
serviceType: v1.ServiceTypeClusterIP,
158+
ipFamilies: []v1.IPFamily{v1.IPv4Protocol, v1.IPv6Protocol},
159+
ipFamilyPolicy: v1.IPFamilyPolicyRequireDualStack,
160+
},
161+
}
162+
163+
for i, tc := range testcases {
164+
t.Run(tc.name, func(t *testing.T) {
165+
ns := framework.CreateTestingNamespace(fmt.Sprintf("test-endpointslice-dualstack-%d", i), s, t)
166+
defer framework.DeleteTestingNamespace(ns, s, t)
167+
168+
// Create a pod with labels
169+
pod := &v1.Pod{
170+
ObjectMeta: metav1.ObjectMeta{
171+
Name: "test-pod",
172+
Namespace: ns.Name,
173+
Labels: labelMap(),
174+
},
175+
Spec: v1.PodSpec{
176+
NodeName: "fakenode",
177+
Containers: []v1.Container{
178+
{
179+
Name: "fake-name",
180+
Image: "fakeimage",
181+
},
182+
},
183+
},
184+
}
185+
186+
createdPod, err := client.CoreV1().Pods(ns.Name).Create(context.TODO(), pod, metav1.CreateOptions{})
187+
if err != nil {
188+
t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
189+
}
190+
191+
// Set pod IPs
192+
podIPbyFamily := map[v1.IPFamily]string{v1.IPv4Protocol: "1.1.1.1", v1.IPv6Protocol: "2001:db2::65"}
193+
createdPod.Status = v1.PodStatus{
194+
Phase: v1.PodRunning,
195+
PodIPs: []v1.PodIP{{IP: podIPbyFamily[v1.IPv4Protocol]}, {IP: podIPbyFamily[v1.IPv6Protocol]}},
196+
}
197+
_, err = client.CoreV1().Pods(ns.Name).UpdateStatus(context.TODO(), createdPod, metav1.UpdateOptions{})
198+
if err != nil {
199+
t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err)
200+
}
201+
202+
svc := &v1.Service{
203+
ObjectMeta: metav1.ObjectMeta{
204+
Name: fmt.Sprintf("svc-test-%d", i), // use different services for each test
205+
Namespace: ns.Name,
206+
Labels: labelMap(),
207+
},
208+
Spec: v1.ServiceSpec{
209+
Type: v1.ServiceTypeClusterIP,
210+
IPFamilies: tc.ipFamilies,
211+
IPFamilyPolicy: &tc.ipFamilyPolicy,
212+
Selector: labelMap(),
213+
Ports: []v1.ServicePort{
214+
{
215+
Name: fmt.Sprintf("port-test-%d", i),
216+
Port: 443,
217+
TargetPort: intstr.IntOrString{IntVal: 443},
218+
Protocol: "TCP",
219+
},
220+
},
221+
},
222+
}
223+
224+
// create a service
225+
_, err = client.CoreV1().Services(ns.Name).Create(context.TODO(), svc, metav1.CreateOptions{})
226+
if err != nil {
227+
t.Fatalf("Error creating service: %v", err)
228+
}
229+
230+
// wait until endpoints are created
231+
// legacy endpoints are not dual stack
232+
// and use the address of the first IP family
233+
if err := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
234+
e, err := client.CoreV1().Endpoints(ns.Name).Get(context.TODO(), svc.Name, metav1.GetOptions{})
235+
if err != nil {
236+
t.Logf("Error fetching endpoints: %v", err)
237+
return false, nil
238+
}
239+
// check if the endpoint addresses match the pod IP of the first IPFamily of the service
240+
// since this is an integration test PodIPs are not "ready"
241+
if len(e.Subsets) > 0 && len(e.Subsets[0].NotReadyAddresses) > 0 {
242+
if e.Subsets[0].NotReadyAddresses[0].IP == podIPbyFamily[tc.ipFamilies[0]] {
243+
return true, nil
244+
}
245+
t.Logf("Endpoint address %s does not match PodIP %s ", e.Subsets[0].Addresses[0].IP, podIPbyFamily[tc.ipFamilies[0]])
246+
}
247+
t.Logf("Endpoint does not contain addresses: %s", e.Name)
248+
return false, nil
249+
}); err != nil {
250+
t.Fatalf("Endpoints not found: %v", err)
251+
}
252+
253+
// wait until the endpoint slices are created
254+
err = wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
255+
lSelector := discovery.LabelServiceName + "=" + svc.Name
256+
esList, err := client.DiscoveryV1beta1().EndpointSlices(ns.Name).List(context.TODO(), metav1.ListOptions{LabelSelector: lSelector})
257+
if err != nil {
258+
t.Logf("Error listing EndpointSlices: %v", err)
259+
return false, nil
260+
}
261+
// there must be an endpoint slice per ipFamily
262+
if len(esList.Items) != len(tc.ipFamilies) {
263+
t.Logf("Waiting for EndpointSlice to be created %v", esList)
264+
return false, nil
265+
}
266+
// there must be an endpoint address per each IP family
267+
for _, ipFamily := range tc.ipFamilies {
268+
found := false
269+
for _, slice := range esList.Items {
270+
// check if the endpoint addresses match the pod IPs
271+
if len(slice.Endpoints) > 0 && len(slice.Endpoints[0].Addresses) > 0 {
272+
if string(ipFamily) == string(slice.AddressType) &&
273+
slice.Endpoints[0].Addresses[0] == podIPbyFamily[ipFamily] {
274+
found = true
275+
break
276+
}
277+
}
278+
t.Logf("Waiting endpoint slice to contain addresses")
279+
}
280+
if !found {
281+
t.Logf("Endpoint slices does not contain PodIP %s", podIPbyFamily[ipFamily])
282+
return false, nil
283+
}
284+
}
285+
return true, nil
286+
})
287+
if err != nil {
288+
t.Fatalf("Error waiting for endpoint slices: %v", err)
289+
}
290+
})
291+
}
292+
}

0 commit comments

Comments
 (0)