Skip to content

Commit cf9bf47

Browse files
inju-songmurali-reddy
authored andcommitted
Integrate ip_vs_mh scheduler into kube-router (#564)
* Add to set ip_vs_mh scheduler and flags Signed-off-by: Inju Song <[email protected]> * Use scheduler flags when adding or updating service Signed-off-by: Inju Song <[email protected]> * Refactor with gofmt, generate moq file and fix test source Signed-off-by: Inju Song <[email protected]>
1 parent 3723d82 commit cf9bf47

File tree

3 files changed

+177
-32
lines changed

3 files changed

+177
-32
lines changed

pkg/controllers/proxy/network_services_controller.go

Lines changed: 154 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,23 @@ import (
3838
)
3939

4040
const (
41-
KUBE_DUMMY_IF = "kube-dummy-if"
42-
KUBE_TUNNEL_IF = "kube-tunnel-if"
43-
IFACE_NOT_FOUND = "Link not found"
44-
IFACE_HAS_ADDR = "file exists"
45-
IFACE_HAS_NO_ADDR = "cannot assign requested address"
46-
IPVS_SERVER_EXISTS = "file exists"
47-
48-
svcDSRAnnotation = "kube-router.io/service.dsr"
49-
svcSchedulerAnnotation = "kube-router.io/service.scheduler"
50-
svcHairpinAnnotation = "kube-router.io/service.hairpin"
51-
svcLocalAnnotation = "kube-router.io/service.local"
52-
svcSkipLbIpsAnnotation = "kube-router.io/service.skiplbips"
41+
KUBE_DUMMY_IF = "kube-dummy-if"
42+
KUBE_TUNNEL_IF = "kube-tunnel-if"
43+
IFACE_NOT_FOUND = "Link not found"
44+
IFACE_HAS_ADDR = "file exists"
45+
IFACE_HAS_NO_ADDR = "cannot assign requested address"
46+
IPVS_SERVER_EXISTS = "file exists"
47+
IPVS_MAGLEV_HASHING = "mh"
48+
IPVS_SVC_F_SCHED1 = "flag-1"
49+
IPVS_SVC_F_SCHED2 = "flag-2"
50+
IPVS_SVC_F_SCHED3 = "flag-3"
51+
52+
svcDSRAnnotation = "kube-router.io/service.dsr"
53+
svcSchedulerAnnotation = "kube-router.io/service.scheduler"
54+
svcHairpinAnnotation = "kube-router.io/service.hairpin"
55+
svcLocalAnnotation = "kube-router.io/service.local"
56+
svcSkipLbIpsAnnotation = "kube-router.io/service.skiplbips"
57+
svcSchedFlagsAnnotation = "kube-router.io/service.schedflags"
5358

5459
LeaderElectionRecordAnnotationKey = "control-plane.alpha.kubernetes.io/leader"
5560
)
@@ -61,7 +66,7 @@ var (
6166

6267
type ipvsCalls interface {
6368
ipvsNewService(ipvsSvc *ipvs.Service) error
64-
ipvsAddService(svcs []*ipvs.Service, vip net.IP, protocol, port uint16, persistent bool, scheduler string) (*ipvs.Service, error)
69+
ipvsAddService(svcs []*ipvs.Service, vip net.IP, protocol, port uint16, persistent bool, scheduler string, flags schedFlags) (*ipvs.Service, error)
6570
ipvsDelService(ipvsSvc *ipvs.Service) error
6671
ipvsUpdateService(ipvsSvc *ipvs.Service) error
6772
ipvsGetServices() ([]*ipvs.Service, error)
@@ -70,7 +75,7 @@ type ipvsCalls interface {
7075
ipvsUpdateDestination(ipvsSvc *ipvs.Service, ipvsDst *ipvs.Destination) error
7176
ipvsGetDestinations(ipvsSvc *ipvs.Service) ([]*ipvs.Destination, error)
7277
ipvsDelDestination(ipvsSvc *ipvs.Service, ipvsDst *ipvs.Destination) error
73-
ipvsAddFWMarkService(vip net.IP, protocol, port uint16, persistent bool, scheduler string) (*ipvs.Service, error)
78+
ipvsAddFWMarkService(vip net.IP, protocol, port uint16, persistent bool, scheduler string, flags schedFlags) (*ipvs.Service, error)
7479
}
7580

7681
type netlinkCalls interface {
@@ -234,6 +239,14 @@ type serviceInfo struct {
234239
externalIPs []string
235240
loadBalancerIPs []string
236241
local bool
242+
flags schedFlags
243+
}
244+
245+
// IPVS scheduler flags
246+
type schedFlags struct {
247+
flag1 bool /* ipvs scheduler flag-1 */
248+
flag2 bool /* ipvs scheduler flag-2 */
249+
flag3 bool /* ipvs scheduler flag-3 */
237250
}
238251

239252
// map of all services, with unique service id(namespace name, service name, port) as key
@@ -552,7 +565,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
552565
}
553566

554567
// create IPVS service for the service to be exposed through the cluster ip
555-
ipvsClusterVipSvc, err := nsc.ln.ipvsAddService(ipvsSvcs, svc.clusterIP, protocol, uint16(svc.port), svc.sessionAffinity, svc.scheduler)
568+
ipvsClusterVipSvc, err := nsc.ln.ipvsAddService(ipvsSvcs, svc.clusterIP, protocol, uint16(svc.port), svc.sessionAffinity, svc.scheduler, svc.flags)
556569
if err != nil {
557570
glog.Errorf("Failed to create ipvs service for cluster ip: %s", err.Error())
558571
continue
@@ -584,7 +597,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
584597
nodeServiceIds = make([]string, len(addrs))
585598

586599
for i, addr := range addrs {
587-
ipvsNodeportSvcs[i], err = nsc.ln.ipvsAddService(ipvsSvcs, addr.IP, protocol, uint16(svc.nodePort), svc.sessionAffinity, svc.scheduler)
600+
ipvsNodeportSvcs[i], err = nsc.ln.ipvsAddService(ipvsSvcs, addr.IP, protocol, uint16(svc.nodePort), svc.sessionAffinity, svc.scheduler, svc.flags)
588601
if err != nil {
589602
glog.Errorf("Failed to create ipvs service for node port due to: %s", err.Error())
590603
continue
@@ -595,7 +608,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
595608
}
596609
} else {
597610
ipvsNodeportSvcs = make([]*ipvs.Service, 1)
598-
ipvsNodeportSvcs[0], err = nsc.ln.ipvsAddService(ipvsSvcs, nsc.nodeIP, protocol, uint16(svc.nodePort), svc.sessionAffinity, svc.scheduler)
611+
ipvsNodeportSvcs[0], err = nsc.ln.ipvsAddService(ipvsSvcs, nsc.nodeIP, protocol, uint16(svc.nodePort), svc.sessionAffinity, svc.scheduler, svc.flags)
599612
if err != nil {
600613
glog.Errorf("Failed to create ipvs service for node port due to: %s", err.Error())
601614
continue
@@ -621,7 +634,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
621634
for _, externalIP := range extIPSet.List() {
622635
var externalIpServiceId string
623636
if svc.directServerReturn && svc.directServerReturnMethod == "tunnel" {
624-
ipvsExternalIPSvc, err := nsc.ln.ipvsAddFWMarkService(net.ParseIP(externalIP), protocol, uint16(svc.port), svc.sessionAffinity, svc.scheduler)
637+
ipvsExternalIPSvc, err := nsc.ln.ipvsAddFWMarkService(net.ParseIP(externalIP), protocol, uint16(svc.port), svc.sessionAffinity, svc.scheduler, svc.flags)
625638
if err != nil {
626639
glog.Errorf("Failed to create ipvs service for External IP: %s due to: %s", externalIP, err.Error())
627640
continue
@@ -655,7 +668,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
655668
}
656669

657670
// create IPVS service for the service to be exposed through the external ip
658-
ipvsExternalIPSvc, err := nsc.ln.ipvsAddService(ipvsSvcs, net.ParseIP(externalIP), protocol, uint16(svc.port), svc.sessionAffinity, svc.scheduler)
671+
ipvsExternalIPSvc, err := nsc.ln.ipvsAddService(ipvsSvcs, net.ParseIP(externalIP), protocol, uint16(svc.port), svc.sessionAffinity, svc.scheduler, svc.flags)
659672
if err != nil {
660673
glog.Errorf("Failed to create ipvs service for external ip: %s due to %s", externalIP, err.Error())
661674
continue
@@ -1079,8 +1092,16 @@ func (nsc *NetworkServicesController) buildServicesInfo() serviceInfoMap {
10791092
svcInfo.scheduler = ipvs.DestinationHashing
10801093
} else if schedulingMethod == ipvs.SourceHashing {
10811094
svcInfo.scheduler = ipvs.SourceHashing
1095+
} else if schedulingMethod == IPVS_MAGLEV_HASHING {
1096+
svcInfo.scheduler = IPVS_MAGLEV_HASHING
10821097
}
10831098
}
1099+
1100+
flags, ok := svc.ObjectMeta.Annotations[svcSchedFlagsAnnotation]
1101+
if ok && svcInfo.scheduler == IPVS_MAGLEV_HASHING {
1102+
svcInfo.flags = parseSchedFlags(flags)
1103+
}
1104+
10841105
copy(svcInfo.externalIPs, svc.Spec.ExternalIPs)
10851106
for _, lbIngress := range svc.Status.LoadBalancer.Ingress {
10861107
if len(lbIngress.IP) > 0 {
@@ -1102,6 +1123,32 @@ func (nsc *NetworkServicesController) buildServicesInfo() serviceInfoMap {
11021123
return serviceMap
11031124
}
11041125

1126+
func parseSchedFlags(value string) schedFlags {
1127+
var flag1, flag2, flag3 bool
1128+
1129+
if len(value) < 1 {
1130+
return schedFlags{}
1131+
}
1132+
1133+
flags := strings.Split(value, ",")
1134+
for _, flag := range flags {
1135+
switch strings.Trim(flag, " ") {
1136+
case IPVS_SVC_F_SCHED1:
1137+
flag1 = true
1138+
break
1139+
case IPVS_SVC_F_SCHED2:
1140+
flag2 = true
1141+
break
1142+
case IPVS_SVC_F_SCHED3:
1143+
flag3 = true
1144+
break
1145+
default:
1146+
}
1147+
}
1148+
1149+
return schedFlags{flag1, flag2, flag3}
1150+
}
1151+
11051152
func shuffle(endPoints []endpointsInfo) []endpointsInfo {
11061153
for index1 := range endPoints {
11071154
index2 := rand.Intn(index1 + 1)
@@ -1412,6 +1459,18 @@ func ipvsServiceString(s *ipvs.Service) string {
14121459
flags = flags + "[one-packet scheduling]"
14131460
}
14141461

1462+
if s.Flags&0x0008 != 0 {
1463+
flags = flags + "[flag-1(fallback)]"
1464+
}
1465+
1466+
if s.Flags&0x0010 != 0 {
1467+
flags = flags + "[flag-2(port)]"
1468+
}
1469+
1470+
if s.Flags&0x0020 != 0 {
1471+
flags = flags + "[flag-3]"
1472+
}
1473+
14151474
return fmt.Sprintf("%s:%s:%v (Flags: %s)", protocol, s.Address, s.Port, flags)
14161475
}
14171476

@@ -1432,21 +1491,79 @@ func ipvsSetPersistence(svc *ipvs.Service, p bool) {
14321491
}
14331492
}
14341493

1435-
func (ln *linuxNetworking) ipvsAddService(svcs []*ipvs.Service, vip net.IP, protocol, port uint16, persistent bool, scheduler string) (*ipvs.Service, error) {
1494+
func ipvsSetSchedFlags(svc *ipvs.Service, s schedFlags) {
1495+
if s.flag1 {
1496+
svc.Flags |= 0x0008
1497+
} else {
1498+
svc.Flags &^= 0x0008
1499+
}
1500+
1501+
if s.flag2 {
1502+
svc.Flags |= 0x0010
1503+
} else {
1504+
svc.Flags &^= 0x0010
1505+
}
1506+
1507+
if s.flag3 {
1508+
svc.Flags |= 0x0020
1509+
} else {
1510+
svc.Flags &^= 0x0020
1511+
}
1512+
1513+
/* Keep netmask which is set by ipvsSetPersistence() before */
1514+
if (svc.Netmask&0xFFFFFFFF != 0) || (s.flag1 || s.flag2 || s.flag3) {
1515+
svc.Netmask |= 0xFFFFFFFF
1516+
} else {
1517+
svc.Netmask &^= 0xFFFFFFFF
1518+
}
1519+
}
1520+
1521+
/* Compare service scheduler flags with ipvs service */
1522+
func changedIpvsSchedFlags(svc *ipvs.Service, s schedFlags) bool {
1523+
if (s.flag1 && (svc.Flags&0x0008) == 0) || (!s.flag1 && (svc.Flags&0x0008) != 0) {
1524+
return true
1525+
}
1526+
1527+
if (s.flag2 && (svc.Flags&0x0010) == 0) || (!s.flag2 && (svc.Flags&0x0010) != 0) {
1528+
return true
1529+
}
1530+
1531+
if (s.flag3 && (svc.Flags&0x0020) == 0) || (!s.flag3 && (svc.Flags&0x0020) != 0) {
1532+
return true
1533+
}
1534+
1535+
return false
1536+
}
1537+
1538+
func (ln *linuxNetworking) ipvsAddService(svcs []*ipvs.Service, vip net.IP, protocol, port uint16, persistent bool, scheduler string, flags schedFlags) (*ipvs.Service, error) {
14361539

14371540
var err error
14381541
for _, svc := range svcs {
14391542
if vip.Equal(svc.Address) && protocol == svc.Protocol && port == svc.Port {
14401543
if (persistent && (svc.Flags&0x0001) == 0) || (!persistent && (svc.Flags&0x0001) != 0) {
14411544
ipvsSetPersistence(svc, persistent)
14421545

1546+
if changedIpvsSchedFlags(svc, flags) {
1547+
ipvsSetSchedFlags(svc, flags)
1548+
}
1549+
14431550
err = ln.ipvsUpdateService(svc)
14441551
if err != nil {
14451552
return nil, err
14461553
}
14471554
glog.V(2).Infof("Updated persistence/session-affinity for service: %s", ipvsServiceString(svc))
14481555
}
14491556

1557+
if changedIpvsSchedFlags(svc, flags) {
1558+
ipvsSetSchedFlags(svc, flags)
1559+
1560+
err = ln.ipvsUpdateService(svc)
1561+
if err != nil {
1562+
return nil, err
1563+
}
1564+
glog.V(2).Infof("Updated scheduler flags for service: %s", ipvsServiceString(svc))
1565+
}
1566+
14501567
if scheduler != svc.SchedName {
14511568
svc.SchedName = scheduler
14521569
err = ln.ipvsUpdateService(svc)
@@ -1472,6 +1589,7 @@ func (ln *linuxNetworking) ipvsAddService(svcs []*ipvs.Service, vip net.IP, prot
14721589
}
14731590

14741591
ipvsSetPersistence(&svc, persistent)
1592+
ipvsSetSchedFlags(&svc, flags)
14751593

14761594
err = ln.ipvsNewService(&svc)
14771595
if err != nil {
@@ -1492,7 +1610,7 @@ func generateFwmark(ip, protocol, port string) uint32 {
14921610
}
14931611

14941612
// ipvsAddFWMarkService: creates a IPVS service using FWMARK
1495-
func (ln *linuxNetworking) ipvsAddFWMarkService(vip net.IP, protocol, port uint16, persistent bool, scheduler string) (*ipvs.Service, error) {
1613+
func (ln *linuxNetworking) ipvsAddFWMarkService(vip net.IP, protocol, port uint16, persistent bool, scheduler string, flags schedFlags) (*ipvs.Service, error) {
14961614

14971615
var protocolStr string
14981616
if protocol == syscall.IPPROTO_TCP {
@@ -1516,13 +1634,27 @@ func (ln *linuxNetworking) ipvsAddFWMarkService(vip net.IP, protocol, port uint1
15161634
if (persistent && (svc.Flags&0x0001) == 0) || (!persistent && (svc.Flags&0x0001) != 0) {
15171635
ipvsSetPersistence(svc, persistent)
15181636

1637+
if changedIpvsSchedFlags(svc, flags) {
1638+
ipvsSetSchedFlags(svc, flags)
1639+
}
1640+
15191641
err = ln.ipvsUpdateService(svc)
15201642
if err != nil {
15211643
return nil, err
15221644
}
15231645
glog.V(2).Infof("Updated persistence/session-affinity for service: %s", ipvsServiceString(svc))
15241646
}
15251647

1648+
if changedIpvsSchedFlags(svc, flags) {
1649+
ipvsSetSchedFlags(svc, flags)
1650+
1651+
err = ln.ipvsUpdateService(svc)
1652+
if err != nil {
1653+
return nil, err
1654+
}
1655+
glog.V(2).Infof("Updated scheduler flags for service: %s", ipvsServiceString(svc))
1656+
}
1657+
15261658
if scheduler != svc.SchedName {
15271659
svc.SchedName = scheduler
15281660
err = ln.ipvsUpdateService(svc)
@@ -1548,6 +1680,7 @@ func (ln *linuxNetworking) ipvsAddFWMarkService(vip net.IP, protocol, port uint1
15481680
}
15491681

15501682
ipvsSetPersistence(&svc, persistent)
1683+
ipvsSetSchedFlags(&svc, flags)
15511684

15521685
err = ln.ipvsNewService(&svc)
15531686
if err != nil {

0 commit comments

Comments
 (0)