Skip to content

Commit 4902a0c

Browse files
andrewsykimmurali-reddy
authored andcommitted
route advertisement should account for services using externalTrafficPolicy=Local (#334)
1 parent 4e768b1 commit 4902a0c

File tree

3 files changed

+215
-7
lines changed

3 files changed

+215
-7
lines changed

app/controllers/network_routes_controller.go

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,16 @@ import (
3131
"github.com/osrg/gobgp/table"
3232
"github.com/prometheus/client_golang/prometheus"
3333
"github.com/vishvananda/netlink"
34+
v1core "k8s.io/api/core/v1"
3435
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3536
"k8s.io/client-go/kubernetes"
37+
"k8s.io/client-go/tools/cache"
3638
)
3739

3840
// NetworkRoutingController is struct to hold necessary information required by controller
3941
type NetworkRoutingController struct {
4042
nodeIP net.IP
41-
nodeHostName string
43+
nodeName string
4244
nodeSubnet net.IPNet
4345
nodeInterface string
4446
activeNodes map[string]bool
@@ -370,8 +372,20 @@ func (nrc *NetworkRoutingController) advertiseClusterIPs() {
370372
if svc.Spec.ClusterIP == "None" || svc.Spec.ClusterIP == "" {
371373
continue
372374
}
373-
374375
glog.V(2).Info("found a service of cluster ip type")
376+
377+
if svc.Spec.ExternalTrafficPolicy == v1core.ServiceExternalTrafficPolicyTypeLocal {
378+
nodeHasEndpoints, err := nrc.nodeHasEndpointsForService(svc)
379+
if err != nil {
380+
glog.Errorf("error determining if node has endpoints for svc: %q error: %v", svc.Name, err)
381+
continue
382+
}
383+
384+
if !nodeHasEndpoints {
385+
continue
386+
}
387+
}
388+
375389
err := nrc.AdvertiseClusterIp(svc.Spec.ClusterIP)
376390
if err != nil {
377391
glog.Errorf("error advertising cluster IP: %q error: %v", svc.Spec.ClusterIP, err)
@@ -388,6 +402,19 @@ func (nrc *NetworkRoutingController) advertiseExternalIPs() {
388402
if svc.Spec.ClusterIP == "None" || svc.Spec.ClusterIP == "" {
389403
continue
390404
}
405+
406+
if svc.Spec.ExternalTrafficPolicy == v1core.ServiceExternalTrafficPolicyTypeLocal {
407+
nodeHasEndpoints, err := nrc.nodeHasEndpointsForService(svc)
408+
if err != nil {
409+
glog.Errorf("error determining if node has endpoints for svc: %q error: %v", svc.Name, err)
410+
continue
411+
}
412+
413+
if !nodeHasEndpoints {
414+
continue
415+
}
416+
}
417+
391418
for _, externalIP := range svc.Spec.ExternalIPs {
392419
err := nrc.AdvertiseClusterIp(externalIP)
393420
if err != nil {
@@ -452,6 +479,40 @@ func (nrc *NetworkRoutingController) getExternalIps() ([]string, error) {
452479
return externalIpList, nil
453480
}
454481

482+
// nodeHasEndpointsForService will get the corresponding Endpoints resource for a given Service
483+
// return true if any endpoint addresses has NodeName matching the node name of the route controller
484+
func (nrc *NetworkRoutingController) nodeHasEndpointsForService(svc *v1core.Service) (bool, error) {
485+
// listers for endpoints and services should use the same keys since
486+
// endpoint and service resources share the same object name and namespace
487+
key, err := cache.MetaNamespaceKeyFunc(svc)
488+
if err != nil {
489+
return false, err
490+
}
491+
item, exists, err := watchers.EndpointsWatcher.GetByKey(key)
492+
if err != nil {
493+
return false, err
494+
}
495+
496+
if !exists {
497+
return false, fmt.Errorf("endpoint resource doesn't exist for service: %q", svc.Name)
498+
}
499+
500+
ep, ok := item.(*v1core.Endpoints)
501+
if !ok {
502+
return false, errors.New("failed to convert cache item to Endpoints type")
503+
}
504+
505+
for _, subset := range ep.Subsets {
506+
for _, address := range subset.Addresses {
507+
if *address.NodeName == nrc.nodeName {
508+
return true, nil
509+
}
510+
}
511+
}
512+
513+
return false, nil
514+
}
515+
455516
// Used for processing Annotations that may contain multiple items
456517
// Pass this the string and the delimiter
457518
func stringToSlice(s, d string) []string {
@@ -1563,7 +1624,7 @@ func NewNetworkRoutingController(clientset *kubernetes.Clientset,
15631624
return nil, errors.New("Failed getting node object from API server: " + err.Error())
15641625
}
15651626

1566-
nrc.nodeHostName = node.Name
1627+
nrc.nodeName = node.Name
15671628

15681629
nodeIP, err := utils.GetNodeIP(node)
15691630
if err != nil {

app/controllers/network_routes_controller_test.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,137 @@ func Test_advertiseExternalIPs(t *testing.T) {
394394
}
395395
}
396396

397+
func Test_nodeHasEndpointsForService(t *testing.T) {
398+
testcases := []struct {
399+
name string
400+
nrc *NetworkRoutingController
401+
existingService *v1core.Service
402+
existingEndpoint *v1core.Endpoints
403+
nodeHasEndpoints bool
404+
err error
405+
}{
406+
{
407+
"node has endpoints for service",
408+
&NetworkRoutingController{
409+
nodeName: "node-1",
410+
},
411+
&v1core.Service{
412+
ObjectMeta: metav1.ObjectMeta{
413+
Name: "svc-1",
414+
Namespace: "default",
415+
},
416+
Spec: v1core.ServiceSpec{
417+
Type: "ClusterIP",
418+
ClusterIP: "10.0.0.1",
419+
ExternalIPs: []string{"1.1.1.1", "2.2.2.2"},
420+
},
421+
},
422+
&v1core.Endpoints{
423+
ObjectMeta: metav1.ObjectMeta{
424+
Name: "svc-1",
425+
Namespace: "default",
426+
},
427+
Subsets: []v1core.EndpointSubset{
428+
{
429+
Addresses: []v1core.EndpointAddress{
430+
{
431+
IP: "172.20.1.1",
432+
NodeName: ptrToString("node-1"),
433+
},
434+
{
435+
IP: "172.20.1.2",
436+
NodeName: ptrToString("node-2"),
437+
},
438+
},
439+
},
440+
},
441+
},
442+
true,
443+
nil,
444+
},
445+
{
446+
"node has no endpoints for service",
447+
&NetworkRoutingController{
448+
nodeName: "node-1",
449+
},
450+
&v1core.Service{
451+
ObjectMeta: metav1.ObjectMeta{
452+
Name: "svc-1",
453+
Namespace: "default",
454+
},
455+
Spec: v1core.ServiceSpec{
456+
Type: "ClusterIP",
457+
ClusterIP: "10.0.0.1",
458+
ExternalIPs: []string{"1.1.1.1", "2.2.2.2"},
459+
},
460+
},
461+
&v1core.Endpoints{
462+
ObjectMeta: metav1.ObjectMeta{
463+
Name: "svc-1",
464+
Namespace: "default",
465+
},
466+
Subsets: []v1core.EndpointSubset{
467+
{
468+
Addresses: []v1core.EndpointAddress{
469+
{
470+
IP: "172.20.1.1",
471+
NodeName: ptrToString("node-2"),
472+
},
473+
{
474+
IP: "172.20.1.2",
475+
NodeName: ptrToString("node-3"),
476+
},
477+
},
478+
},
479+
},
480+
},
481+
false,
482+
nil,
483+
},
484+
}
485+
486+
for _, testcase := range testcases {
487+
t.Run(testcase.name, func(t *testing.T) {
488+
clientset := fake.NewSimpleClientset()
489+
490+
_, err := watchers.StartServiceWatcher(clientset, 0)
491+
if err != nil {
492+
t.Fatalf("failed to initialize service watcher: %v", err)
493+
}
494+
495+
_, err = watchers.StartEndpointsWatcher(clientset, 0)
496+
if err != nil {
497+
t.Fatalf("failed to initialize endpoints watcher: %v", err)
498+
}
499+
500+
_, err = clientset.CoreV1().Endpoints("default").Create(testcase.existingEndpoint)
501+
if err != nil {
502+
t.Fatalf("failed to create existing endpoints: %v", err)
503+
}
504+
505+
_, err = clientset.CoreV1().Services("default").Create(testcase.existingService)
506+
if err != nil {
507+
t.Fatalf("failed to create existing services: %v", err)
508+
}
509+
510+
waitForListerWithTimeout(time.Second*10, t)
511+
512+
nodeHasEndpoints, err := testcase.nrc.nodeHasEndpointsForService(testcase.existingService)
513+
if !reflect.DeepEqual(err, testcase.err) {
514+
t.Logf("actual err: %v", err)
515+
t.Logf("expected err: %v", testcase.err)
516+
t.Error("unexpected error")
517+
}
518+
if nodeHasEndpoints != testcase.nodeHasEndpoints {
519+
t.Logf("expected nodeHasEndpoints: %v", testcase.nodeHasEndpoints)
520+
t.Logf("actual nodeHasEndpoints: %v", nodeHasEndpoints)
521+
t.Error("unexpected nodeHasEndpoints")
522+
}
523+
524+
})
525+
}
526+
}
527+
397528
func Test_advertiseRoute(t *testing.T) {
398529
testcases := []struct {
399530
name string
@@ -969,3 +1100,7 @@ func waitForBGPWatchEventWithTimeout(timeout time.Duration, expectedNumEvents in
9691100
}
9701101
}
9711102
}
1103+
1104+
func ptrToString(str string) *string {
1105+
return &str
1106+
}

app/watchers/endpoints_watcher.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ import (
77
"github.com/cloudnativelabs/kube-router/utils"
88
api "k8s.io/api/core/v1"
99
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10-
"k8s.io/apimachinery/pkg/fields"
10+
"k8s.io/apimachinery/pkg/runtime"
11+
"k8s.io/apimachinery/pkg/watch"
1112
"k8s.io/client-go/kubernetes"
1213
"k8s.io/client-go/tools/cache"
1314
)
@@ -31,7 +32,7 @@ var (
3132
)
3233

3334
type endpointsWatcher struct {
34-
clientset *kubernetes.Clientset
35+
clientset kubernetes.Interface
3536
endpointsController cache.Controller
3637
endpointsLister cache.Indexer
3738
broadcaster *utils.Broadcaster
@@ -84,13 +85,17 @@ func (ew *endpointsWatcher) List() []*api.Endpoints {
8485
return epInstances
8586
}
8687

88+
func (ew *endpointsWatcher) GetByKey(key string) (item interface{}, exists bool, err error) {
89+
return ew.endpointsLister.GetByKey(key)
90+
}
91+
8792
func (ew *endpointsWatcher) HasSynced() bool {
8893
return ew.endpointsController.HasSynced()
8994
}
9095

9196
var endpointsStopCh chan struct{}
9297

93-
func StartEndpointsWatcher(clientset *kubernetes.Clientset, resyncPeriod time.Duration) (*endpointsWatcher, error) {
98+
func StartEndpointsWatcher(clientset kubernetes.Interface, resyncPeriod time.Duration) (*endpointsWatcher, error) {
9499

95100
ew := endpointsWatcher{}
96101
EndpointsWatcher = &ew
@@ -103,7 +108,14 @@ func StartEndpointsWatcher(clientset *kubernetes.Clientset, resyncPeriod time.Du
103108

104109
ew.clientset = clientset
105110
ew.broadcaster = utils.NewBroadcaster()
106-
lw := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "endpoints", metav1.NamespaceAll, fields.Everything())
111+
lw := &cache.ListWatch{
112+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
113+
return clientset.CoreV1().Endpoints(metav1.NamespaceAll).List(options)
114+
},
115+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
116+
return clientset.CoreV1().Endpoints(metav1.NamespaceAll).Watch(options)
117+
},
118+
}
107119
ew.endpointsLister, ew.endpointsController = cache.NewIndexerInformer(
108120
lw,
109121
&api.Endpoints{}, resyncPeriod, eventHandler,

0 commit comments

Comments
 (0)