Skip to content

Commit d618452

Browse files
committed
Adding EndpointsAdapter for apiserver EndpointSlice support
1 parent 47214d6 commit d618452

File tree

9 files changed

+539
-26
lines changed

9 files changed

+539
-26
lines changed

pkg/master/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ go_library(
124124
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
125125
"//staging/src/k8s.io/client-go/informers:go_default_library",
126126
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
127+
"//staging/src/k8s.io/client-go/kubernetes/typed/discovery/v1alpha1:go_default_library",
127128
"//staging/src/k8s.io/client-go/rest:go_default_library",
128129
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
129130
"//vendor/k8s.io/klog:go_default_library",

pkg/master/controller_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,8 @@ func TestReconcileEndpoints(t *testing.T) {
392392
if test.endpoints != nil {
393393
fakeClient = fake.NewSimpleClientset(test.endpoints)
394394
}
395-
reconciler := reconcilers.NewMasterCountEndpointReconciler(test.additionalMasters+1, fakeClient.CoreV1())
395+
epAdapter := reconcilers.NewEndpointsAdapter(fakeClient.CoreV1(), nil)
396+
reconciler := reconcilers.NewMasterCountEndpointReconciler(test.additionalMasters+1, epAdapter)
396397
err := reconciler.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, true)
397398
if err != nil {
398399
t.Errorf("case %q: unexpected error: %v", test.testName, err)
@@ -510,7 +511,8 @@ func TestReconcileEndpoints(t *testing.T) {
510511
if test.endpoints != nil {
511512
fakeClient = fake.NewSimpleClientset(test.endpoints)
512513
}
513-
reconciler := reconcilers.NewMasterCountEndpointReconciler(test.additionalMasters+1, fakeClient.CoreV1())
514+
epAdapter := reconcilers.NewEndpointsAdapter(fakeClient.CoreV1(), nil)
515+
reconciler := reconcilers.NewMasterCountEndpointReconciler(test.additionalMasters+1, epAdapter)
514516
err := reconciler.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, false)
515517
if err != nil {
516518
t.Errorf("case %q: unexpected error: %v", test.testName, err)

pkg/master/master.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,12 @@ import (
7070
"k8s.io/apiserver/pkg/server/healthz"
7171
serverstorage "k8s.io/apiserver/pkg/server/storage"
7272
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
73+
utilfeature "k8s.io/apiserver/pkg/util/feature"
7374
"k8s.io/client-go/informers"
7475
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
76+
discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1alpha1"
7577
api "k8s.io/kubernetes/pkg/apis/core"
78+
"k8s.io/kubernetes/pkg/features"
7679
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
7780
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
7881
"k8s.io/kubernetes/pkg/master/reconcilers"
@@ -217,7 +220,13 @@ type Master struct {
217220

218221
func (c *Config) createMasterCountReconciler() reconcilers.EndpointReconciler {
219222
endpointClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
220-
return reconcilers.NewMasterCountEndpointReconciler(c.ExtraConfig.MasterCount, endpointClient)
223+
var endpointSliceClient discoveryclient.EndpointSlicesGetter
224+
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
225+
endpointSliceClient = discoveryclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
226+
}
227+
endpointsAdapter := reconcilers.NewEndpointsAdapter(endpointClient, endpointSliceClient)
228+
229+
return reconcilers.NewMasterCountEndpointReconciler(c.ExtraConfig.MasterCount, endpointsAdapter)
221230
}
222231

223232
func (c *Config) createNoneReconciler() reconcilers.EndpointReconciler {
@@ -226,6 +235,12 @@ func (c *Config) createNoneReconciler() reconcilers.EndpointReconciler {
226235

227236
func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler {
228237
endpointClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
238+
var endpointSliceClient discoveryclient.EndpointSlicesGetter
239+
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
240+
endpointSliceClient = discoveryclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
241+
}
242+
endpointsAdapter := reconcilers.NewEndpointsAdapter(endpointClient, endpointSliceClient)
243+
229244
ttl := c.ExtraConfig.MasterEndpointReconcileTTL
230245
config, err := c.ExtraConfig.StorageFactory.NewConfig(api.Resource("apiServerIPInfo"))
231246
if err != nil {
@@ -236,7 +251,8 @@ func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler {
236251
klog.Fatalf("Error creating storage factory: %v", err)
237252
}
238253
masterLeases := reconcilers.NewLeases(leaseStorage, "/masterleases/", ttl)
239-
return reconcilers.NewLeaseEndpointReconciler(endpointClient, masterLeases)
254+
255+
return reconcilers.NewLeaseEndpointReconciler(endpointsAdapter, masterLeases)
240256
}
241257

242258
func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler {

pkg/master/reconcilers/BUILD

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go_library(
44
name = "go_default_library",
55
srcs = [
66
"doc.go",
7+
"endpointsadapter.go",
78
"lease.go",
89
"mastercount.go",
910
"none.go",
@@ -14,25 +15,34 @@ go_library(
1415
deps = [
1516
"//pkg/api/v1/endpoints:go_default_library",
1617
"//staging/src/k8s.io/api/core/v1:go_default_library",
18+
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
1719
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
1820
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
1921
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
2022
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
2123
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
2224
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
2325
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
26+
"//staging/src/k8s.io/client-go/kubernetes/typed/discovery/v1alpha1:go_default_library",
2427
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
2528
"//vendor/k8s.io/klog:go_default_library",
2629
],
2730
)
2831

2932
go_test(
3033
name = "go_default_test",
31-
srcs = ["lease_test.go"],
34+
srcs = [
35+
"endpointsadapter_test.go",
36+
"lease_test.go",
37+
],
3238
embed = [":go_default_library"],
3339
deps = [
3440
"//staging/src/k8s.io/api/core/v1:go_default_library",
41+
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
42+
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
43+
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
3544
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
45+
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
3646
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
3747
],
3848
)
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package reconcilers
18+
19+
import (
20+
corev1 "k8s.io/api/core/v1"
21+
discovery "k8s.io/api/discovery/v1alpha1"
22+
"k8s.io/apimachinery/pkg/api/errors"
23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24+
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
25+
discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1alpha1"
26+
)
27+
28+
const (
29+
// serviceNameLabel is used to indicate the name of a Kubernetes service
30+
// associated with an EndpointSlice.
31+
serviceNameLabel = "kubernetes.io/service-name"
32+
)
33+
34+
// EndpointsAdapter provides a simple interface for reading and writing both
35+
// Endpoints and Endpoint Slices.
36+
// NOTE: This is an incomplete adapter implementation that is only suitable for
37+
// use in this package. This takes advantage of the Endpoints used in this
38+
// package always having a consistent set of ports, a single subset, and a small
39+
// set of addresses. Any more complex Endpoints resource would likely translate
40+
// into multiple Endpoint Slices creating significantly more complexity instead
41+
// of the 1:1 mapping this allows.
42+
type EndpointsAdapter struct {
43+
endpointClient corev1client.EndpointsGetter
44+
endpointSliceClient discoveryclient.EndpointSlicesGetter
45+
}
46+
47+
// NewEndpointsAdapter returns a new EndpointsAdapter.
48+
func NewEndpointsAdapter(endpointClient corev1client.EndpointsGetter, endpointSliceClient discoveryclient.EndpointSlicesGetter) EndpointsAdapter {
49+
return EndpointsAdapter{
50+
endpointClient: endpointClient,
51+
endpointSliceClient: endpointSliceClient,
52+
}
53+
}
54+
55+
// Get takes the name and namespace of the Endpoints resource, and returns a
56+
// corresponding Endpoints object if it exists, and an error if there is any.
57+
func (adapter *EndpointsAdapter) Get(namespace, name string, getOpts metav1.GetOptions) (*corev1.Endpoints, error) {
58+
return adapter.endpointClient.Endpoints(namespace).Get(name, getOpts)
59+
}
60+
61+
// Create accepts a namespace and Endpoints object and creates the Endpoints
62+
// object. If an endpointSliceClient exists, a matching EndpointSlice will also
63+
// be created or updated. The created Endpoints object or an error will be
64+
// returned.
65+
func (adapter *EndpointsAdapter) Create(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) {
66+
endpoints, err := adapter.endpointClient.Endpoints(namespace).Create(endpoints)
67+
if err == nil && adapter.endpointSliceClient != nil {
68+
_, err = adapter.ensureEndpointSliceFromEndpoints(namespace, endpoints)
69+
}
70+
return endpoints, err
71+
}
72+
73+
// Update accepts a namespace and Endpoints object and updates it. If an
74+
// endpointSliceClient exists, a matching EndpointSlice will also be created or
75+
// updated. The updated Endpoints object or an error will be returned.
76+
func (adapter *EndpointsAdapter) Update(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) {
77+
endpoints, err := adapter.endpointClient.Endpoints(namespace).Update(endpoints)
78+
if err == nil && adapter.endpointSliceClient != nil {
79+
_, err = adapter.ensureEndpointSliceFromEndpoints(namespace, endpoints)
80+
}
81+
return endpoints, err
82+
}
83+
84+
// ensureEndpointSliceFromEndpoints accepts a namespace and Endpoints resource
85+
// and creates or updates a corresponding EndpointSlice. The EndpointSlice
86+
// and/or an error will be returned.
87+
func (adapter *EndpointsAdapter) ensureEndpointSliceFromEndpoints(namespace string, endpoints *corev1.Endpoints) (*discovery.EndpointSlice, error) {
88+
endpointSlice := endpointSliceFromEndpoints(endpoints)
89+
_, err := adapter.endpointSliceClient.EndpointSlices(namespace).Get(endpointSlice.Name, metav1.GetOptions{})
90+
91+
if err != nil {
92+
if errors.IsNotFound(err) {
93+
return adapter.endpointSliceClient.EndpointSlices(namespace).Create(endpointSlice)
94+
}
95+
return nil, err
96+
}
97+
98+
return adapter.endpointSliceClient.EndpointSlices(namespace).Update(endpointSlice)
99+
}
100+
101+
// endpointSliceFromEndpoints generates an EndpointSlice from an Endpoints
102+
// resource.
103+
func endpointSliceFromEndpoints(endpoints *corev1.Endpoints) *discovery.EndpointSlice {
104+
endpointSlice := &discovery.EndpointSlice{}
105+
endpointSlice.Name = endpoints.Name
106+
endpointSlice.Labels = map[string]string{serviceNameLabel: endpoints.Name}
107+
endpointSlice.OwnerReferences = []metav1.OwnerReference{{Kind: "Service", Name: endpoints.Name}}
108+
109+
ipAddressType := discovery.AddressTypeIP
110+
endpointSlice.AddressType = &ipAddressType
111+
112+
if len(endpoints.Subsets) > 0 {
113+
subset := endpoints.Subsets[0]
114+
for i := range subset.Ports {
115+
endpointSlice.Ports = append(endpointSlice.Ports, discovery.EndpointPort{
116+
Port: &subset.Ports[i].Port,
117+
Name: &subset.Ports[i].Name,
118+
Protocol: &subset.Ports[i].Protocol,
119+
})
120+
}
121+
for _, address := range subset.Addresses {
122+
endpointSlice.Endpoints = append(endpointSlice.Endpoints, endpointFromAddress(address, true))
123+
}
124+
for _, address := range subset.NotReadyAddresses {
125+
endpointSlice.Endpoints = append(endpointSlice.Endpoints, endpointFromAddress(address, false))
126+
}
127+
}
128+
129+
return endpointSlice
130+
}
131+
132+
// endpointFromAddress generates an Endpoint from an EndpointAddress resource.
133+
func endpointFromAddress(address corev1.EndpointAddress, ready bool) discovery.Endpoint {
134+
topology := map[string]string{}
135+
if address.NodeName != nil {
136+
topology["kubernetes.io/hostname"] = *address.NodeName
137+
}
138+
139+
return discovery.Endpoint{
140+
Addresses: []string{address.IP},
141+
Conditions: discovery.EndpointConditions{Ready: &ready},
142+
TargetRef: address.TargetRef,
143+
Topology: topology,
144+
}
145+
}

0 commit comments

Comments
 (0)