Skip to content

Commit c9cf3f5

Browse files
committed
Service Topology implementation
* Implement Service Topology for ipvs and iptables proxier * Add test files * API validation
1 parent cdaeabf commit c9cf3f5

File tree

25 files changed

+1006
-43
lines changed

25 files changed

+1006
-43
lines changed

cmd/kube-proxy/app/server.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
gerrors "github.com/pkg/errors"
3636
v1 "k8s.io/api/core/v1"
3737
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38+
"k8s.io/apimachinery/pkg/fields"
3839
"k8s.io/apimachinery/pkg/labels"
3940
"k8s.io/apimachinery/pkg/runtime"
4041
"k8s.io/apimachinery/pkg/runtime/serializer"
@@ -663,6 +664,7 @@ func (s *ProxyServer) Run() error {
663664
labelSelector := labels.NewSelector()
664665
labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)
665666

667+
// Make informers that filter out objects that want a non-default service proxy.
666668
informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
667669
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
668670
options.LabelSelector = labelSelector.String()
@@ -690,6 +692,21 @@ func (s *ProxyServer) Run() error {
690692
// functions must configure their shared informer event handlers first.
691693
informerFactory.Start(wait.NeverStop)
692694

695+
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) {
696+
// Make an informer that selects for our nodename.
697+
currentNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
698+
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
699+
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", s.NodeRef.Name).String()
700+
}))
701+
nodeConfig := config.NewNodeConfig(currentNodeInformerFactory.Core().V1().Nodes(), s.ConfigSyncPeriod)
702+
nodeConfig.RegisterEventHandler(s.Proxier)
703+
go nodeConfig.Run(wait.NeverStop)
704+
705+
// This has to start after the calls to NewNodeConfig because that must
706+
// configure the shared informer event handler first.
707+
currentNodeInformerFactory.Start(wait.NeverStop)
708+
}
709+
693710
// Birth Cry after the birth is successful
694711
s.birthCry()
695712

pkg/apis/core/types.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3391,6 +3391,8 @@ const (
33913391
IPv4Protocol IPFamily = "IPv4"
33923392
// IPv6Protocol indicates that this IP is IPv6 protocol
33933393
IPv6Protocol IPFamily = "IPv6"
3394+
// MaxServiceTopologyKeys is the largest number of topology keys allowed on a service
3395+
MaxServiceTopologyKeys = 16
33943396
)
33953397

33963398
// ServiceSpec describes the attributes that a user creates on a service
@@ -3506,14 +3508,14 @@ type ServiceSpec struct {
35063508

35073509
// topologyKeys is a preference-order list of topology keys which
35083510
// implementations of services should use to preferentially sort endpoints
3509-
// when accessing this Service. Topology keys must be valid label keys and
3510-
// at most 16 keys may be specified.
3511-
// If any ready backends exist for index [0], they should always be chosen;
3512-
// only if no backends exist for index [0] should backends for index [1] be considered.
3511+
// when accessing this Service, it can not be used at the same time as
3512+
// externalTrafficPolicy=Local.
3513+
// Topology keys must be valid label keys and at most 16 keys may be specified.
3514+
// Endpoints are chosen based on the first topology key with available backends.
35133515
// If this field is specified and all entries have no backends that match
35143516
// the topology of the client, the service has no backends for that client
35153517
// and connections should fail.
3516-
// The special value "" may be used to mean "any node". This catch-all
3518+
// The special value "*" may be used to mean "any topology". This catch-all
35173519
// value, if used, only makes sense as the last value in the list.
35183520
// If this is not specified or empty, no topology constraints will be applied.
35193521
// +optional

pkg/apis/core/validation/validation.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4053,6 +4053,35 @@ func ValidateService(service *core.Service) field.ErrorList {
40534053
ports[key] = true
40544054
}
40554055

4056+
// Validate TopologyKeys
4057+
if len(service.Spec.TopologyKeys) > 0 {
4058+
topoPath := specPath.Child("topologyKeys")
4059+
// topologyKeys is mutually exclusive with 'externalTrafficPolicy=Local'
4060+
if service.Spec.ExternalTrafficPolicy == core.ServiceExternalTrafficPolicyTypeLocal {
4061+
allErrs = append(allErrs, field.Forbidden(topoPath, "may not be specified when `externalTrafficPolicy=Local`"))
4062+
}
4063+
if len(service.Spec.TopologyKeys) > core.MaxServiceTopologyKeys {
4064+
allErrs = append(allErrs, field.TooMany(topoPath, len(service.Spec.TopologyKeys), core.MaxServiceTopologyKeys))
4065+
}
4066+
topoKeys := sets.NewString()
4067+
for i, key := range service.Spec.TopologyKeys {
4068+
keyPath := topoPath.Index(i)
4069+
if topoKeys.Has(key) {
4070+
allErrs = append(allErrs, field.Duplicate(keyPath, key))
4071+
}
4072+
topoKeys.Insert(key)
4073+
// "Any" must be the last value specified
4074+
if key == v1.TopologyKeyAny && i != len(service.Spec.TopologyKeys)-1 {
4075+
allErrs = append(allErrs, field.Invalid(keyPath, key, `"*" must be the last value specified`))
4076+
}
4077+
if key != v1.TopologyKeyAny {
4078+
for _, msg := range validation.IsQualifiedName(key) {
4079+
allErrs = append(allErrs, field.Invalid(keyPath, service.Spec.TopologyKeys, msg))
4080+
}
4081+
}
4082+
}
4083+
}
4084+
40564085
// Validate SourceRange field and annotation
40574086
_, ok := service.Annotations[core.AnnotationLoadBalancerSourceRangesKey]
40584087
if len(service.Spec.LoadBalancerSourceRanges) > 0 || ok {
@@ -4143,6 +4172,10 @@ func validateServiceExternalTrafficFieldsValue(service *core.Service) field.Erro
41434172
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("externalTrafficPolicy"), service.Spec.ExternalTrafficPolicy,
41444173
fmt.Sprintf("ExternalTrafficPolicy must be empty, %v or %v", core.ServiceExternalTrafficPolicyTypeCluster, core.ServiceExternalTrafficPolicyTypeLocal)))
41454174
}
4175+
// 'externalTrafficPolicy=Local' is mutually exclusive with topologyKeys
4176+
if service.Spec.ExternalTrafficPolicy == core.ServiceExternalTrafficPolicyTypeLocal && len(service.Spec.TopologyKeys) > 0 {
4177+
allErrs = append(allErrs, field.Forbidden(field.NewPath("spec").Child("externalTrafficPolicy"), "externalTrafficPolicy must not be set to 'Local' when topologyKeys is specified"))
4178+
}
41464179
if service.Spec.HealthCheckNodePort < 0 {
41474180
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("healthCheckNodePort"), service.Spec.HealthCheckNodePort,
41484181
"HealthCheckNodePort must be not less than 0"))

pkg/apis/core/validation/validation_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package validation
1818

1919
import (
2020
"bytes"
21+
"fmt"
2122
"math"
2223
"reflect"
2324
"strings"
@@ -9380,6 +9381,7 @@ func TestValidatePodEphemeralContainersUpdate(t *testing.T) {
93809381

93819382
func TestValidateService(t *testing.T) {
93829383
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SCTPSupport, true)()
9384+
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceTopology, true)()
93839385

93849386
testCases := []struct {
93859387
name string
@@ -10058,6 +10060,66 @@ func TestValidateService(t *testing.T) {
1005810060
},
1005910061
numErrs: 1,
1006010062
},
10063+
{
10064+
name: "valid topology keys",
10065+
tweakSvc: func(s *core.Service) {
10066+
s.Spec.TopologyKeys = []string{
10067+
"kubernetes.io/hostname",
10068+
"failure-domain.beta.kubernetes.io/zone",
10069+
"failure-domain.beta.kubernetes.io/region",
10070+
v1.TopologyKeyAny,
10071+
}
10072+
},
10073+
numErrs: 0,
10074+
},
10075+
{
10076+
name: "invalid topology key",
10077+
tweakSvc: func(s *core.Service) {
10078+
s.Spec.TopologyKeys = []string{"NoUppercaseOrSpecialCharsLike=Equals"}
10079+
},
10080+
numErrs: 1,
10081+
},
10082+
{
10083+
name: "too many topology keys",
10084+
tweakSvc: func(s *core.Service) {
10085+
for i := 0; i < core.MaxServiceTopologyKeys+1; i++ {
10086+
s.Spec.TopologyKeys = append(s.Spec.TopologyKeys, fmt.Sprintf("topologykey-%d", i))
10087+
}
10088+
},
10089+
numErrs: 1,
10090+
},
10091+
{
10092+
name: `"Any" was not the last key`,
10093+
tweakSvc: func(s *core.Service) {
10094+
s.Spec.TopologyKeys = []string{
10095+
"kubernetes.io/hostname",
10096+
v1.TopologyKeyAny,
10097+
"failure-domain.beta.kubernetes.io/zone",
10098+
}
10099+
},
10100+
numErrs: 1,
10101+
},
10102+
{
10103+
name: `duplicate topology key`,
10104+
tweakSvc: func(s *core.Service) {
10105+
s.Spec.TopologyKeys = []string{
10106+
"kubernetes.io/hostname",
10107+
"kubernetes.io/hostname",
10108+
"failure-domain.beta.kubernetes.io/zone",
10109+
}
10110+
},
10111+
numErrs: 1,
10112+
},
10113+
{
10114+
name: `use topology keys with externalTrafficPolicy=Local`,
10115+
tweakSvc: func(s *core.Service) {
10116+
s.Spec.ExternalTrafficPolicy = "Local"
10117+
s.Spec.TopologyKeys = []string{
10118+
"kubernetes.io/hostname",
10119+
}
10120+
},
10121+
numErrs: 2,
10122+
},
1006110123
}
1006210124

1006310125
for _, tc := range testCases {

pkg/kubemark/hollow_proxy.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type HollowProxy struct {
4545

4646
type FakeProxier struct {
4747
proxyconfig.NoopEndpointSliceHandler
48+
proxyconfig.NoopNodeHandler
4849
}
4950

5051
func (*FakeProxier) Sync() {}

pkg/proxy/config/config.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,3 +369,128 @@ func (c *ServiceConfig) handleDeleteService(obj interface{}) {
369369
c.eventHandlers[i].OnServiceDelete(service)
370370
}
371371
}
372+
373+
// NodeHandler is an abstract interface of objects which receive
374+
// notifications about node object changes.
375+
type NodeHandler interface {
376+
// OnNodeAdd is called whenever creation of new node object
377+
// is observed.
378+
OnNodeAdd(node *v1.Node)
379+
// OnNodeUpdate is called whenever modification of an existing
380+
// node object is observed.
381+
OnNodeUpdate(oldNode, node *v1.Node)
382+
// OnNodeDelete is called whever deletion of an existing node
383+
// object is observed.
384+
OnNodeDelete(node *v1.Node)
385+
// OnNodeSynced is called once all the initial event handlers were
386+
// called and the state is fully propagated to local cache.
387+
OnNodeSynced()
388+
}
389+
390+
// NoopNodeHandler is a noop handler for proxiers that have not yet
391+
// implemented a full NodeHandler.
392+
type NoopNodeHandler struct{}
393+
394+
// OnNodeAdd is a noop handler for Node creates.
395+
func (*NoopNodeHandler) OnNodeAdd(node *v1.Node) {}
396+
397+
// OnNodeUpdate is a noop handler for Node updates.
398+
func (*NoopNodeHandler) OnNodeUpdate(oldNode, node *v1.Node) {}
399+
400+
// OnNodeDelete is a noop handler for Node deletes.
401+
func (*NoopNodeHandler) OnNodeDelete(node *v1.Node) {}
402+
403+
// OnNodeSynced is a noop handler for Node syncs.
404+
func (*NoopNodeHandler) OnNodeSynced() {}
405+
406+
// NodeConfig tracks a set of node configurations.
407+
// It accepts "set", "add" and "remove" operations of node via channels, and invokes registered handlers on change.
408+
type NodeConfig struct {
409+
listerSynced cache.InformerSynced
410+
eventHandlers []NodeHandler
411+
}
412+
413+
// NewNodeConfig creates a new NodeConfig.
414+
func NewNodeConfig(nodeInformer coreinformers.NodeInformer, resyncPeriod time.Duration) *NodeConfig {
415+
result := &NodeConfig{
416+
listerSynced: nodeInformer.Informer().HasSynced,
417+
}
418+
419+
nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
420+
cache.ResourceEventHandlerFuncs{
421+
AddFunc: result.handleAddNode,
422+
UpdateFunc: result.handleUpdateNode,
423+
DeleteFunc: result.handleDeleteNode,
424+
},
425+
resyncPeriod,
426+
)
427+
428+
return result
429+
}
430+
431+
// RegisterEventHandler registers a handler which is called on every node change.
432+
func (c *NodeConfig) RegisterEventHandler(handler NodeHandler) {
433+
c.eventHandlers = append(c.eventHandlers, handler)
434+
}
435+
436+
// Run starts the goroutine responsible for calling registered handlers.
437+
func (c *NodeConfig) Run(stopCh <-chan struct{}) {
438+
klog.Info("Starting node config controller")
439+
440+
if !cache.WaitForNamedCacheSync("node config", stopCh, c.listerSynced) {
441+
return
442+
}
443+
444+
for i := range c.eventHandlers {
445+
klog.V(3).Infof("Calling handler.OnNodeSynced()")
446+
c.eventHandlers[i].OnNodeSynced()
447+
}
448+
}
449+
450+
func (c *NodeConfig) handleAddNode(obj interface{}) {
451+
node, ok := obj.(*v1.Node)
452+
if !ok {
453+
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
454+
return
455+
}
456+
for i := range c.eventHandlers {
457+
klog.V(4).Infof("Calling handler.OnNodeAdd")
458+
c.eventHandlers[i].OnNodeAdd(node)
459+
}
460+
}
461+
462+
func (c *NodeConfig) handleUpdateNode(oldObj, newObj interface{}) {
463+
oldNode, ok := oldObj.(*v1.Node)
464+
if !ok {
465+
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
466+
return
467+
}
468+
node, ok := newObj.(*v1.Node)
469+
if !ok {
470+
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
471+
return
472+
}
473+
for i := range c.eventHandlers {
474+
klog.V(5).Infof("Calling handler.OnNodeUpdate")
475+
c.eventHandlers[i].OnNodeUpdate(oldNode, node)
476+
}
477+
}
478+
479+
func (c *NodeConfig) handleDeleteNode(obj interface{}) {
480+
node, ok := obj.(*v1.Node)
481+
if !ok {
482+
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
483+
if !ok {
484+
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
485+
return
486+
}
487+
if node, ok = tombstone.Obj.(*v1.Node); !ok {
488+
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
489+
return
490+
}
491+
}
492+
for i := range c.eventHandlers {
493+
klog.V(4).Infof("Calling handler.OnNodeDelete")
494+
c.eventHandlers[i].OnNodeDelete(node)
495+
}
496+
}

pkg/proxy/endpoints.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ var supportedEndpointSliceAddressTypes = sets.NewString(
4848
type BaseEndpointInfo struct {
4949
Endpoint string // TODO: should be an endpointString type
5050
// IsLocal indicates whether the endpoint is running in same host as kube-proxy.
51-
IsLocal bool
51+
IsLocal bool
52+
Topology map[string]string
5253
}
5354

5455
var _ Endpoint = &BaseEndpointInfo{}
@@ -63,6 +64,11 @@ func (info *BaseEndpointInfo) GetIsLocal() bool {
6364
return info.IsLocal
6465
}
6566

67+
// GetTopology returns the topology information of the endpoint.
68+
func (info *BaseEndpointInfo) GetTopology() map[string]string {
69+
return info.Topology
70+
}
71+
6672
// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface.
6773
func (info *BaseEndpointInfo) IP() string {
6874
return utilproxy.IPPart(info.Endpoint)
@@ -78,10 +84,11 @@ func (info *BaseEndpointInfo) Equal(other Endpoint) bool {
7884
return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal()
7985
}
8086

81-
func newBaseEndpointInfo(IP string, port int, isLocal bool) *BaseEndpointInfo {
87+
func newBaseEndpointInfo(IP string, port int, isLocal bool, topology map[string]string) *BaseEndpointInfo {
8288
return &BaseEndpointInfo{
8389
Endpoint: net.JoinHostPort(IP, strconv.Itoa(port)),
8490
IsLocal: isLocal,
91+
Topology: topology,
8592
}
8693
}
8794

@@ -358,7 +365,7 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint
358365
continue
359366
}
360367
isLocal := addr.NodeName != nil && *addr.NodeName == ect.hostname
361-
baseEndpointInfo := newBaseEndpointInfo(addr.IP, int(port.Port), isLocal)
368+
baseEndpointInfo := newBaseEndpointInfo(addr.IP, int(port.Port), isLocal, nil)
362369
if ect.makeEndpointInfo != nil {
363370
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], ect.makeEndpointInfo(baseEndpointInfo))
364371
} else {

0 commit comments

Comments
 (0)