@@ -27,6 +27,7 @@ import (
2727)
2828
2929const (
30+ // name of the dummy interface to which cluster ip are assigned
3031 KUBE_DUMMY_IF = "kube-dummy-if"
3132 IFACE_NOT_FOUND = "Link not found"
3233 IFACE_HAS_ADDR = "file exists"
@@ -58,13 +59,14 @@ var (
5859 }, []string {"namespace" , "service_name" , "backend" })
5960)
6061
61- // Network services controller enables local node as network service proxy through IPVS/LVS.
62+ // NetworkServicesController enables local node as network service proxy through IPVS/LVS.
6263// Support only Kubernetes network services of type NodePort, ClusterIP, and LoadBalancer. For each service a
6364// IPVS service is created and for each service endpoint a server is added to the IPVS service.
6465// As services and endpoints are updated, network service controller gets the updates from
6566// the kubernetes api server and syncs the ipvs configuration to reflect state of services
6667// and endpoints
6768
69+ // struct for storing information needed by the controller
6870type NetworkServicesController struct {
6971 nodeIP net.IP
7072 nodeHostName string
@@ -102,7 +104,7 @@ type endpointsInfo struct {
102104// map of all endpoints, with unique service id(namespace name, service name, port) as key
103105type endpointsInfoMap map [string ][]endpointsInfo
104106
105- // periodically sync ipvs configuration to reflect desired state of services and endpoints
107+ // Run: periodically sync ipvs configuration to reflect desired state of services and endpoints
106108func (nsc * NetworkServicesController ) Run (stopCh <- chan struct {}, wg * sync.WaitGroup ) error {
107109
108110 t := time .NewTicker (nsc .syncPeriod )
@@ -170,7 +172,7 @@ func (nsc *NetworkServicesController) sync() {
170172 nsc .publishMetrics (nsc .serviceMap )
171173}
172174
173- // handle change in endpoints update from the API server
175+ // OnEndpointsUpdate: handle change in endpoints update from the API server
174176func (nsc * NetworkServicesController ) OnEndpointsUpdate (endpointsUpdate * watchers.EndpointsUpdate ) {
175177
176178 nsc .mu .Lock ()
@@ -192,7 +194,7 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(endpointsUpdate *watcher
192194 }
193195}
194196
195- // handle change in service update from the API server
197+ // OnServiceUpdate: handle change in service update from the API server
196198func (nsc * NetworkServicesController ) OnServiceUpdate (serviceUpdate * watchers.ServiceUpdate ) {
197199
198200 nsc .mu .Lock ()
@@ -241,15 +243,15 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
241243 }
242244
243245 // assign cluster IP of the service to the dummy interface so that its routable from the pod's on the node
244- vip := & netlink.Addr {IPNet : & net.IPNet {svc .clusterIP , net .IPv4Mask (255 , 255 , 255 , 255 )}, Scope : syscall .RT_SCOPE_LINK }
246+ vip := & netlink.Addr {IPNet : & net.IPNet {IP : svc .clusterIP , Mask : net .IPv4Mask (255 , 255 , 255 , 255 )}, Scope : syscall .RT_SCOPE_LINK }
245247 err := netlink .AddrAdd (dummyVipInterface , vip )
246248 if err != nil && err .Error () != IFACE_HAS_ADDR {
247249 glog .Errorf ("Failed to assign cluster ip to dummy interface %s" , err )
248250 continue
249251 }
250252
251253 // create IPVS service for the service to be exposed through the cluster ip
252- ipvs_cluster_vip_svc , err := ipvsAddService (svc .clusterIP , protocol , uint16 (svc .port ), svc .sessionAffinity )
254+ ipvsClusterVipSvc , err := ipvsAddService (svc .clusterIP , protocol , uint16 (svc .port ), svc .sessionAffinity )
253255 if err != nil {
254256 glog .Errorf ("Failed to create ipvs service for cluster ip: %s" , err .Error ())
255257 continue
@@ -258,10 +260,10 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
258260 activeServiceEndpointMap [clusterServiceId ] = make ([]string , 0 )
259261
260262 // create IPVS service for the service to be exposed through the nodeport
261- var ipvs_nodeport_svc * ipvs.Service
263+ var ipvsNodeportSvc * ipvs.Service
262264 var nodeServiceId string
263265 if svc .nodePort != 0 {
264- ipvs_nodeport_svc , err = ipvsAddService (nsc .nodeIP , protocol , uint16 (svc .nodePort ), svc .sessionAffinity )
266+ ipvsNodeportSvc , err = ipvsAddService (nsc .nodeIP , protocol , uint16 (svc .nodePort ), svc .sessionAffinity )
265267 if err != nil {
266268 glog .Errorf ("Failed to create ipvs service for node port" )
267269 continue
@@ -280,7 +282,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
280282 Weight : 1 ,
281283 }
282284
283- err := ipvsAddServer (ipvs_cluster_vip_svc , & dst )
285+ err := ipvsAddServer (ipvsClusterVipSvc , & dst )
284286 if err != nil {
285287 glog .Errorf (err .Error ())
286288 }
@@ -289,7 +291,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
289291 append (activeServiceEndpointMap [clusterServiceId ], endpoint .ip )
290292
291293 if svc .nodePort != 0 {
292- err := ipvsAddServer (ipvs_nodeport_svc , & dst )
294+ err := ipvsAddServer (ipvsNodeportSvc , & dst )
293295 if err != nil {
294296 glog .Errorf (err .Error ())
295297 }
@@ -434,11 +436,11 @@ func shuffle(endPoints []endpointsInfo) []endpointsInfo {
434436func buildEndpointsInfo () endpointsInfoMap {
435437 endpointsMap := make (endpointsInfoMap )
436438 for _ , ep := range watchers .EndpointsWatcher .List () {
437- for _ , ep_subset := range ep .Subsets {
438- for _ , port := range ep_subset .Ports {
439+ for _ , epSubset := range ep .Subsets {
440+ for _ , port := range epSubset .Ports {
439441 svcId := generateServiceId (ep .Namespace , ep .Name , port .Name )
440442 endpoints := make ([]endpointsInfo , 0 )
441- for _ , addr := range ep_subset .Addresses {
443+ for _ , addr := range epSubset .Addresses {
442444 endpoints = append (endpoints , endpointsInfo {ip : addr .IP , port : int (port .Port )})
443445 }
444446 endpointsMap [svcId ] = shuffle (endpoints )
@@ -726,7 +728,7 @@ func ipvsAddService(vip net.IP, protocol, port uint16, persistent bool) (*ipvs.S
726728 svc .Timeout = 180 * 60
727729 }
728730 if err := h .NewService (& svc ); err != nil {
729- return nil , fmt .Errorf ("Failed to create service: %s:%s:%s" , vip .String (), protocol , strconv .Itoa (int (port )))
731+ return nil , fmt .Errorf ("Failed to create service: %s:%s:%s" , vip .String (), strconv . Itoa ( int ( protocol )) , strconv .Itoa (int (port )))
730732 }
731733 glog .Infof ("Successfully added service: %s:%s:%s" , vip .String (), protocol , strconv .Itoa (int (port )))
732734 return & svc , nil
@@ -743,10 +745,10 @@ func ipvsAddServer(service *ipvs.Service, dest *ipvs.Destination) error {
743745
744746 if strings .Contains (err .Error (), IPVS_SERVER_EXISTS ) {
745747 glog .Infof ("ipvs destination %s:%s already exists in the ipvs service %s:%s:%s so not adding destination" , dest .Address ,
746- strconv .Itoa (int (dest .Port )), service .Address , service .Protocol , strconv .Itoa (int (service .Port )))
748+ strconv .Itoa (int (dest .Port )), service .Address , strconv . Itoa ( int ( service .Protocol )) , strconv .Itoa (int (service .Port )))
747749 } else {
748750 return fmt .Errorf ("Failed to add ipvs destination %s:%s to the ipvs service %s:%s:%s due to : %s" , dest .Address ,
749- strconv .Itoa (int (dest .Port )), service .Address , service .Protocol , strconv .Itoa (int (service .Port )), err .Error ())
751+ strconv .Itoa (int (dest .Port )), service .Address , strconv . Itoa ( int ( service .Protocol )) , strconv .Itoa (int (service .Port )), err .Error ())
750752 }
751753 return nil
752754}
@@ -779,7 +781,7 @@ func getKubeDummyInterface() (netlink.Link, error) {
779781 return dummyVipInterface , nil
780782}
781783
782- // clean up all the configurations (IPVS, iptables, links)
784+ // Cleanup: clean all the configurations (IPVS, iptables, links) done
783785func (nsc * NetworkServicesController ) Cleanup () {
784786 // cleanup ipvs rules by flush
785787 glog .Infof ("Cleaning up IPVS configuration permanently" )
0 commit comments