Skip to content

Commit 4495d09

Browse files
authored
Merge pull request kubernetes#81430 from robscott/endpointslice-proxy
Adding EndpointSlice support for kube-proxy ipvs and iptables proxiers
2 parents 3e7e12d + 9665c59 commit 4495d09

28 files changed

+1394
-133
lines changed

cmd/kube-proxy/app/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ go_library(
1919
importpath = "k8s.io/kubernetes/cmd/kube-proxy/app",
2020
deps = [
2121
"//pkg/apis/core:go_default_library",
22+
"//pkg/features:go_default_library",
2223
"//pkg/kubelet/qos:go_default_library",
2324
"//pkg/master/ports:go_default_library",
2425
"//pkg/proxy:go_default_library",

cmd/kube-proxy/app/server.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333
"github.com/spf13/pflag"
3434

3535
v1 "k8s.io/api/core/v1"
36-
v1meta "k8s.io/apimachinery/pkg/apis/meta/v1"
36+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3737
"k8s.io/apimachinery/pkg/labels"
3838
"k8s.io/apimachinery/pkg/runtime"
3939
"k8s.io/apimachinery/pkg/selection"
@@ -56,6 +56,7 @@ import (
5656
"k8s.io/klog"
5757
"k8s.io/kube-proxy/config/v1alpha1"
5858
api "k8s.io/kubernetes/pkg/apis/core"
59+
"k8s.io/kubernetes/pkg/features"
5960
"k8s.io/kubernetes/pkg/kubelet/qos"
6061
"k8s.io/kubernetes/pkg/master/ports"
6162
"k8s.io/kubernetes/pkg/proxy"
@@ -477,6 +478,7 @@ type ProxyServer struct {
477478
CleanupIPVS bool
478479
MetricsBindAddress string
479480
EnableProfiling bool
481+
UseEndpointSlices bool
480482
OOMScoreAdj *int32
481483
ConfigSyncPeriod time.Duration
482484
HealthzServer *healthcheck.HealthzServer
@@ -619,21 +621,27 @@ func (s *ProxyServer) Run() error {
619621
labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)
620622

621623
informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
622-
informers.WithTweakListOptions(func(options *v1meta.ListOptions) {
624+
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
623625
options.LabelSelector = labelSelector.String()
624626
}))
625627

626-
// Create configs (i.e. Watches for Services and Endpoints)
628+
// Create configs (i.e. Watches for Services and Endpoints or EndpointSlices)
627629
// Note: RegisterHandler() calls need to happen before creation of Sources because sources
628630
// only notify on changes, and the initial update (on process start) may be lost if no handlers
629631
// are registered yet.
630632
serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
631633
serviceConfig.RegisterEventHandler(s.Proxier)
632634
go serviceConfig.Run(wait.NeverStop)
633635

634-
endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
635-
endpointsConfig.RegisterEventHandler(s.Proxier)
636-
go endpointsConfig.Run(wait.NeverStop)
636+
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
637+
endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1alpha1().EndpointSlices(), s.ConfigSyncPeriod)
638+
endpointSliceConfig.RegisterEventHandler(s.Proxier)
639+
go endpointSliceConfig.Run(wait.NeverStop)
640+
} else {
641+
endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
642+
endpointsConfig.RegisterEventHandler(s.Proxier)
643+
go endpointsConfig.Run(wait.NeverStop)
644+
}
637645

638646
// This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
639647
// functions must configure their shared informer event handlers first.

pkg/kubemark/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ go_library(
2525
"//pkg/kubelet/dockershim:go_default_library",
2626
"//pkg/kubelet/types:go_default_library",
2727
"//pkg/proxy:go_default_library",
28+
"//pkg/proxy/config:go_default_library",
2829
"//pkg/proxy/iptables:go_default_library",
2930
"//pkg/util/iptables:go_default_library",
3031
"//pkg/util/mount:go_default_library",

pkg/kubemark/hollow_proxy.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@ import (
2020
"fmt"
2121
"time"
2222

23-
"k8s.io/api/core/v1"
23+
v1 "k8s.io/api/core/v1"
2424
"k8s.io/apimachinery/pkg/types"
2525
clientset "k8s.io/client-go/kubernetes"
2626
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
2727
"k8s.io/client-go/tools/record"
2828
proxyapp "k8s.io/kubernetes/cmd/kube-proxy/app"
2929
"k8s.io/kubernetes/pkg/proxy"
30+
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
3031
"k8s.io/kubernetes/pkg/proxy/iptables"
3132
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
3233
utilnode "k8s.io/kubernetes/pkg/util/node"
@@ -41,7 +42,9 @@ type HollowProxy struct {
4142
ProxyServer *proxyapp.ProxyServer
4243
}
4344

44-
type FakeProxier struct{}
45+
type FakeProxier struct {
46+
proxyconfig.NoopEndpointSliceHandler
47+
}
4548

4649
func (*FakeProxier) Sync() {}
4750
func (*FakeProxier) SyncLoop() {

pkg/proxy/BUILD

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ go_library(
1111
srcs = [
1212
"doc.go",
1313
"endpoints.go",
14+
"endpointslicecache.go",
1415
"service.go",
1516
"types.go",
1617
],
@@ -21,6 +22,7 @@ go_library(
2122
"//pkg/proxy/metrics:go_default_library",
2223
"//pkg/proxy/util:go_default_library",
2324
"//staging/src/k8s.io/api/core/v1:go_default_library",
25+
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
2426
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
2527
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
2628
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
@@ -58,15 +60,18 @@ go_test(
5860
name = "go_default_test",
5961
srcs = [
6062
"endpoints_test.go",
63+
"endpointslicecache_test.go",
6164
"service_test.go",
6265
],
6366
embed = [":go_default_library"],
6467
deps = [
6568
"//staging/src/k8s.io/api/core/v1:go_default_library",
69+
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
6670
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
6771
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
6872
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
6973
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
7074
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
75+
"//vendor/k8s.io/utils/pointer:go_default_library",
7176
],
7277
)

pkg/proxy/config/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@ go_library(
1515
importpath = "k8s.io/kubernetes/pkg/proxy/config",
1616
deps = [
1717
"//staging/src/k8s.io/api/core/v1:go_default_library",
18+
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
1819
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
1920
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
21+
"//staging/src/k8s.io/client-go/informers/discovery/v1alpha1:go_default_library",
2022
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
2123
"//vendor/k8s.io/klog:go_default_library",
2224
],

pkg/proxy/config/config.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import (
2121
"time"
2222

2323
"k8s.io/api/core/v1"
24+
discovery "k8s.io/api/discovery/v1alpha1"
2425
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2526
coreinformers "k8s.io/client-go/informers/core/v1"
27+
discoveryinformers "k8s.io/client-go/informers/discovery/v1alpha1"
2628
"k8s.io/client-go/tools/cache"
2729
"k8s.io/klog"
2830
)
@@ -61,6 +63,40 @@ type EndpointsHandler interface {
6163
OnEndpointsSynced()
6264
}
6365

66+
// EndpointSliceHandler is an abstract interface of objects which receive
67+
// notifications about endpoint slice object changes.
68+
type EndpointSliceHandler interface {
69+
// OnEndpointSliceAdd is called whenever creation of new endpoint slice
70+
// object is observed.
71+
OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice)
72+
// OnEndpointSliceUpdate is called whenever modification of an existing
73+
// endpoint slice object is observed.
74+
OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *discovery.EndpointSlice)
75+
// OnEndpointSliceDelete is called whenever deletion of an existing
76+
// endpoint slice object is observed.
77+
OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice)
78+
// OnEndpointSlicesSynced is called once all the initial event handlers were
79+
// called and the state is fully propagated to local cache.
80+
OnEndpointSlicesSynced()
81+
}
82+
83+
// NoopEndpointSliceHandler is a noop handler for proxiers that have not yet
84+
// implemented a full EndpointSliceHandler.
85+
type NoopEndpointSliceHandler struct{}
86+
87+
// OnEndpointSliceAdd is a noop handler for EndpointSlice creates.
88+
func (*NoopEndpointSliceHandler) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {}
89+
90+
// OnEndpointSliceUpdate is a noop handler for EndpointSlice updates.
91+
func (*NoopEndpointSliceHandler) OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *discovery.EndpointSlice) {
92+
}
93+
94+
// OnEndpointSliceDelete is a noop handler for EndpointSlice deletes.
95+
func (*NoopEndpointSliceHandler) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {}
96+
97+
// OnEndpointSlicesSynced is a noop handler for EndpointSlice syncs.
98+
func (*NoopEndpointSliceHandler) OnEndpointSlicesSynced() {}
99+
64100
// EndpointsConfig tracks a set of endpoints configurations.
65101
type EndpointsConfig struct {
66102
listerSynced cache.InformerSynced
@@ -152,6 +188,97 @@ func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) {
152188
}
153189
}
154190

191+
// EndpointSliceConfig tracks a set of endpoints configurations.
192+
type EndpointSliceConfig struct {
193+
listerSynced cache.InformerSynced
194+
eventHandlers []EndpointSliceHandler
195+
}
196+
197+
// NewEndpointSliceConfig creates a new EndpointSliceConfig.
198+
func NewEndpointSliceConfig(endpointSliceInformer discoveryinformers.EndpointSliceInformer, resyncPeriod time.Duration) *EndpointSliceConfig {
199+
result := &EndpointSliceConfig{
200+
listerSynced: endpointSliceInformer.Informer().HasSynced,
201+
}
202+
203+
endpointSliceInformer.Informer().AddEventHandlerWithResyncPeriod(
204+
cache.ResourceEventHandlerFuncs{
205+
AddFunc: result.handleAddEndpointSlice,
206+
UpdateFunc: result.handleUpdateEndpointSlice,
207+
DeleteFunc: result.handleDeleteEndpointSlice,
208+
},
209+
resyncPeriod,
210+
)
211+
212+
return result
213+
}
214+
215+
// RegisterEventHandler registers a handler which is called on every endpoint slice change.
216+
func (c *EndpointSliceConfig) RegisterEventHandler(handler EndpointSliceHandler) {
217+
c.eventHandlers = append(c.eventHandlers, handler)
218+
}
219+
220+
// Run waits for cache synced and invokes handlers after syncing.
221+
func (c *EndpointSliceConfig) Run(stopCh <-chan struct{}) {
222+
klog.Info("Starting endpoint slice config controller")
223+
224+
if !cache.WaitForNamedCacheSync("endpoint slice config", stopCh, c.listerSynced) {
225+
return
226+
}
227+
228+
for _, h := range c.eventHandlers {
229+
klog.V(3).Infof("Calling handler.OnEndpointSlicesSynced()")
230+
h.OnEndpointSlicesSynced()
231+
}
232+
}
233+
234+
func (c *EndpointSliceConfig) handleAddEndpointSlice(obj interface{}) {
235+
endpointSlice, ok := obj.(*discovery.EndpointSlice)
236+
if !ok {
237+
utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
238+
return
239+
}
240+
for _, h := range c.eventHandlers {
241+
klog.V(4).Infof("Calling handler.OnEndpointSliceUpdate %+v", endpointSlice)
242+
h.OnEndpointSliceAdd(endpointSlice)
243+
}
244+
}
245+
246+
func (c *EndpointSliceConfig) handleUpdateEndpointSlice(oldObj, newObj interface{}) {
247+
oldEndpointSlice, ok := oldObj.(*discovery.EndpointSlice)
248+
if !ok {
249+
utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", newObj))
250+
return
251+
}
252+
newEndpointSlice, ok := newObj.(*discovery.EndpointSlice)
253+
if !ok {
254+
utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", newObj))
255+
return
256+
}
257+
for _, h := range c.eventHandlers {
258+
klog.V(4).Infof("Calling handler.OnEndpointSliceUpdate")
259+
h.OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice)
260+
}
261+
}
262+
263+
func (c *EndpointSliceConfig) handleDeleteEndpointSlice(obj interface{}) {
264+
endpointSlice, ok := obj.(*discovery.EndpointSlice)
265+
if !ok {
266+
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
267+
if !ok {
268+
utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
269+
return
270+
}
271+
if endpointSlice, ok = tombstone.Obj.(*discovery.EndpointSlice); !ok {
272+
utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
273+
return
274+
}
275+
}
276+
for _, h := range c.eventHandlers {
277+
klog.V(4).Infof("Calling handler.OnEndpointsDelete")
278+
h.OnEndpointSliceDelete(endpointSlice)
279+
}
280+
}
281+
155282
// ServiceConfig tracks a set of service configurations.
156283
type ServiceConfig struct {
157284
listerSynced cache.InformerSynced

0 commit comments

Comments
 (0)