Skip to content

Commit b96cebf

Browse files
committed
fix the remote endpoint cleanup logic
1 parent 363c3b8 commit b96cebf

File tree

2 files changed

+354
-11
lines changed

2 files changed

+354
-11
lines changed

pkg/proxy/winkernel/proxier.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -319,19 +319,21 @@ func (ep *endpointsInfo) Cleanup() {
319319
Log(ep, "Endpoint Cleanup", 3)
320320
if ep.refCount != nil {
321321
*ep.refCount--
322-
}
323322

324-
// Remove the remote hns endpoint, if no service is referring it
325-
// Never delete a Local Endpoint. Local Endpoints are already created by other entities.
326-
// Remove only remote endpoints created by this service
327-
if (ep.refCount == nil || *ep.refCount <= 0) && !ep.GetIsLocal() {
328-
klog.V(4).Infof("Removing endpoints for %v, since no one is referencing it", ep)
329-
err := ep.hns.deleteEndpoint(ep.hnsID)
330-
if err == nil {
331-
ep.hnsID = ""
332-
} else {
333-
klog.Errorf("Endpoint deletion failed for %v: %v", ep.IP(), err)
323+
// Remove the remote hns endpoint, if no service is referring it
324+
// Never delete a Local Endpoint. Local Endpoints are already created by other entities.
325+
// Remove only remote endpoints created by this service
326+
if *ep.refCount <= 0 && !ep.GetIsLocal() {
327+
klog.V(4).Infof("Removing endpoints for %v, since no one is referencing it", ep)
328+
err := ep.hns.deleteEndpoint(ep.hnsID)
329+
if err == nil {
330+
ep.hnsID = ""
331+
} else {
332+
klog.Errorf("Endpoint deletion failed for %v: %v", ep.IP(), err)
333+
}
334334
}
335+
336+
ep.refCount = nil
335337
}
336338
}
337339

pkg/proxy/winkernel/proxier_test.go

Lines changed: 341 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,329 @@ func TestCreateRemoteEndpointL2Bridge(t *testing.T) {
315315
t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount)
316316
}
317317
}
318+
func TestSharedRemoteEndpointDelete(t *testing.T) {
319+
syncPeriod := 30 * time.Second
320+
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge", false)
321+
if proxier == nil {
322+
t.Error()
323+
}
324+
325+
svcIP1 := "10.20.30.41"
326+
svcPort1 := 80
327+
svcNodePort1 := 3001
328+
svcPortName1 := proxy.ServicePortName{
329+
NamespacedName: makeNSN("ns1", "svc1"),
330+
Port: "p80",
331+
Protocol: v1.ProtocolTCP,
332+
}
333+
334+
svcIP2 := "10.20.30.42"
335+
svcPort2 := 80
336+
svcNodePort2 := 3002
337+
svcPortName2 := proxy.ServicePortName{
338+
NamespacedName: makeNSN("ns1", "svc2"),
339+
Port: "p80",
340+
Protocol: v1.ProtocolTCP,
341+
}
342+
343+
makeServiceMap(proxier,
344+
makeTestService(svcPortName1.Namespace, svcPortName1.Name, func(svc *v1.Service) {
345+
svc.Spec.Type = "NodePort"
346+
svc.Spec.ClusterIP = svcIP1
347+
svc.Spec.Ports = []v1.ServicePort{{
348+
Name: svcPortName1.Port,
349+
Port: int32(svcPort1),
350+
Protocol: v1.ProtocolTCP,
351+
NodePort: int32(svcNodePort1),
352+
}}
353+
}),
354+
makeTestService(svcPortName2.Namespace, svcPortName2.Name, func(svc *v1.Service) {
355+
svc.Spec.Type = "NodePort"
356+
svc.Spec.ClusterIP = svcIP2
357+
svc.Spec.Ports = []v1.ServicePort{{
358+
Name: svcPortName2.Port,
359+
Port: int32(svcPort2),
360+
Protocol: v1.ProtocolTCP,
361+
NodePort: int32(svcNodePort2),
362+
}}
363+
}),
364+
)
365+
makeEndpointsMap(proxier,
366+
makeTestEndpoints(svcPortName1.Namespace, svcPortName1.Name, func(ept *v1.Endpoints) {
367+
ept.Subsets = []v1.EndpointSubset{{
368+
Addresses: []v1.EndpointAddress{{
369+
IP: epIpAddressRemote,
370+
}},
371+
Ports: []v1.EndpointPort{{
372+
Name: svcPortName1.Port,
373+
Port: int32(svcPort1),
374+
Protocol: v1.ProtocolTCP,
375+
}},
376+
}}
377+
}),
378+
makeTestEndpoints(svcPortName2.Namespace, svcPortName2.Name, func(ept *v1.Endpoints) {
379+
ept.Subsets = []v1.EndpointSubset{{
380+
Addresses: []v1.EndpointAddress{{
381+
IP: epIpAddressRemote,
382+
}},
383+
Ports: []v1.EndpointPort{{
384+
Name: svcPortName2.Port,
385+
Port: int32(svcPort2),
386+
Protocol: v1.ProtocolTCP,
387+
}},
388+
}}
389+
}),
390+
)
391+
proxier.setInitialized(true)
392+
proxier.syncProxyRules()
393+
ep := proxier.endpointsMap[svcPortName1][0]
394+
epInfo, ok := ep.(*endpointsInfo)
395+
if !ok {
396+
t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String())
397+
398+
} else {
399+
if epInfo.hnsID != guid {
400+
t.Errorf("%v does not match %v", epInfo.hnsID, guid)
401+
}
402+
}
403+
404+
if *proxier.endPointsRefCount[guid] != 2 {
405+
t.Errorf("RefCount not incremented. Current value: %v", *proxier.endPointsRefCount[guid])
406+
}
407+
408+
if *proxier.endPointsRefCount[guid] != *epInfo.refCount {
409+
t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount)
410+
}
411+
412+
proxier.setInitialized(false)
413+
deleteServices(proxier,
414+
makeTestService(svcPortName2.Namespace, svcPortName2.Name, func(svc *v1.Service) {
415+
svc.Spec.Type = "NodePort"
416+
svc.Spec.ClusterIP = svcIP2
417+
svc.Spec.Ports = []v1.ServicePort{{
418+
Name: svcPortName2.Port,
419+
Port: int32(svcPort2),
420+
Protocol: v1.ProtocolTCP,
421+
NodePort: int32(svcNodePort2),
422+
}}
423+
}),
424+
)
425+
426+
deleteEndpoints(proxier,
427+
makeTestEndpoints(svcPortName2.Namespace, svcPortName2.Name, func(ept *v1.Endpoints) {
428+
ept.Subsets = []v1.EndpointSubset{{
429+
Addresses: []v1.EndpointAddress{{
430+
IP: epIpAddressRemote,
431+
}},
432+
Ports: []v1.EndpointPort{{
433+
Name: svcPortName2.Port,
434+
Port: int32(svcPort2),
435+
Protocol: v1.ProtocolTCP,
436+
}},
437+
}}
438+
}),
439+
)
440+
441+
proxier.setInitialized(true)
442+
proxier.syncProxyRules()
443+
444+
ep = proxier.endpointsMap[svcPortName1][0]
445+
epInfo, ok = ep.(*endpointsInfo)
446+
if !ok {
447+
t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String())
448+
449+
} else {
450+
if epInfo.hnsID != guid {
451+
t.Errorf("%v does not match %v", epInfo.hnsID, guid)
452+
}
453+
}
454+
455+
if *epInfo.refCount != 1 {
456+
t.Errorf("Incorrect Refcount. Current value: %v", *epInfo.refCount)
457+
}
458+
459+
if *proxier.endPointsRefCount[guid] != *epInfo.refCount {
460+
t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount)
461+
}
462+
}
463+
func TestSharedRemoteEndpointUpdate(t *testing.T) {
464+
syncPeriod := 30 * time.Second
465+
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge", false)
466+
if proxier == nil {
467+
t.Error()
468+
}
469+
470+
svcIP1 := "10.20.30.41"
471+
svcPort1 := 80
472+
svcNodePort1 := 3001
473+
svcPortName1 := proxy.ServicePortName{
474+
NamespacedName: makeNSN("ns1", "svc1"),
475+
Port: "p80",
476+
Protocol: v1.ProtocolTCP,
477+
}
478+
479+
svcIP2 := "10.20.30.42"
480+
svcPort2 := 80
481+
svcNodePort2 := 3002
482+
svcPortName2 := proxy.ServicePortName{
483+
NamespacedName: makeNSN("ns1", "svc2"),
484+
Port: "p80",
485+
Protocol: v1.ProtocolTCP,
486+
}
318487

488+
makeServiceMap(proxier,
489+
makeTestService(svcPortName1.Namespace, svcPortName1.Name, func(svc *v1.Service) {
490+
svc.Spec.Type = "NodePort"
491+
svc.Spec.ClusterIP = svcIP1
492+
svc.Spec.Ports = []v1.ServicePort{{
493+
Name: svcPortName1.Port,
494+
Port: int32(svcPort1),
495+
Protocol: v1.ProtocolTCP,
496+
NodePort: int32(svcNodePort1),
497+
}}
498+
}),
499+
makeTestService(svcPortName2.Namespace, svcPortName2.Name, func(svc *v1.Service) {
500+
svc.Spec.Type = "NodePort"
501+
svc.Spec.ClusterIP = svcIP2
502+
svc.Spec.Ports = []v1.ServicePort{{
503+
Name: svcPortName2.Port,
504+
Port: int32(svcPort2),
505+
Protocol: v1.ProtocolTCP,
506+
NodePort: int32(svcNodePort2),
507+
}}
508+
}),
509+
)
510+
511+
makeEndpointsMap(proxier,
512+
makeTestEndpoints(svcPortName1.Namespace, svcPortName1.Name, func(ept *v1.Endpoints) {
513+
ept.Subsets = []v1.EndpointSubset{{
514+
Addresses: []v1.EndpointAddress{{
515+
IP: epIpAddressRemote,
516+
}},
517+
Ports: []v1.EndpointPort{{
518+
Name: svcPortName1.Port,
519+
Port: int32(svcPort1),
520+
Protocol: v1.ProtocolTCP,
521+
}},
522+
}}
523+
}),
524+
makeTestEndpoints(svcPortName2.Namespace, svcPortName2.Name, func(ept *v1.Endpoints) {
525+
ept.Subsets = []v1.EndpointSubset{{
526+
Addresses: []v1.EndpointAddress{{
527+
IP: epIpAddressRemote,
528+
}},
529+
Ports: []v1.EndpointPort{{
530+
Name: svcPortName2.Port,
531+
Port: int32(svcPort2),
532+
Protocol: v1.ProtocolTCP,
533+
}},
534+
}}
535+
}),
536+
)
537+
proxier.setInitialized(true)
538+
proxier.syncProxyRules()
539+
ep := proxier.endpointsMap[svcPortName1][0]
540+
epInfo, ok := ep.(*endpointsInfo)
541+
if !ok {
542+
t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String())
543+
544+
} else {
545+
if epInfo.hnsID != guid {
546+
t.Errorf("%v does not match %v", epInfo.hnsID, guid)
547+
}
548+
}
549+
550+
if *proxier.endPointsRefCount[guid] != 2 {
551+
t.Errorf("RefCount not incremented. Current value: %v", *proxier.endPointsRefCount[guid])
552+
}
553+
554+
if *proxier.endPointsRefCount[guid] != *epInfo.refCount {
555+
t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount)
556+
}
557+
558+
proxier.setInitialized(false)
559+
560+
proxier.OnServiceUpdate(
561+
makeTestService(svcPortName1.Namespace, svcPortName1.Name, func(svc *v1.Service) {
562+
svc.Spec.Type = "NodePort"
563+
svc.Spec.ClusterIP = svcIP1
564+
svc.Spec.Ports = []v1.ServicePort{{
565+
Name: svcPortName1.Port,
566+
Port: int32(svcPort1),
567+
Protocol: v1.ProtocolTCP,
568+
NodePort: int32(svcNodePort1),
569+
}}
570+
}),
571+
makeTestService(svcPortName1.Namespace, svcPortName1.Name, func(svc *v1.Service) {
572+
svc.Spec.Type = "NodePort"
573+
svc.Spec.ClusterIP = svcIP1
574+
svc.Spec.Ports = []v1.ServicePort{{
575+
Name: svcPortName1.Port,
576+
Port: int32(svcPort1),
577+
Protocol: v1.ProtocolTCP,
578+
NodePort: int32(3003),
579+
}}
580+
}))
581+
582+
proxier.OnEndpointsUpdate(
583+
makeTestEndpoints(svcPortName1.Namespace, svcPortName1.Name, func(ept *v1.Endpoints) {
584+
ept.Subsets = []v1.EndpointSubset{{
585+
Addresses: []v1.EndpointAddress{{
586+
IP: epIpAddressRemote,
587+
}},
588+
Ports: []v1.EndpointPort{{
589+
Name: svcPortName1.Port,
590+
Port: int32(svcPort1),
591+
Protocol: v1.ProtocolTCP,
592+
}},
593+
}}
594+
}),
595+
makeTestEndpoints(svcPortName1.Namespace, svcPortName1.Name, func(ept *v1.Endpoints) {
596+
ept.Subsets = []v1.EndpointSubset{{
597+
Addresses: []v1.EndpointAddress{{
598+
IP: epIpAddressRemote,
599+
}},
600+
Ports: []v1.EndpointPort{
601+
{
602+
Name: svcPortName1.Port,
603+
Port: int32(svcPort1),
604+
Protocol: v1.ProtocolTCP,
605+
},
606+
{
607+
Name: "p443",
608+
Port: int32(443),
609+
Protocol: v1.ProtocolTCP,
610+
}},
611+
}}
612+
}))
613+
614+
proxier.mu.Lock()
615+
proxier.endpointsSynced = true
616+
proxier.mu.Unlock()
617+
618+
proxier.setInitialized(true)
619+
proxier.syncProxyRules()
620+
621+
ep = proxier.endpointsMap[svcPortName1][0]
622+
epInfo, ok = ep.(*endpointsInfo)
623+
624+
if !ok {
625+
t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String())
626+
627+
} else {
628+
if epInfo.hnsID != guid {
629+
t.Errorf("%v does not match %v", epInfo.hnsID, guid)
630+
}
631+
}
632+
633+
if *epInfo.refCount != 2 {
634+
t.Errorf("Incorrect refcount. Current value: %v", *epInfo.refCount)
635+
}
636+
637+
if *proxier.endPointsRefCount[guid] != *epInfo.refCount {
638+
t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount)
639+
}
640+
}
319641
func TestCreateLoadBalancer(t *testing.T) {
320642
syncPeriod := 30 * time.Second
321643
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", false)
@@ -487,6 +809,15 @@ func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) {
487809
defer proxier.mu.Unlock()
488810
proxier.servicesSynced = true
489811
}
812+
func deleteServices(proxier *Proxier, allServices ...*v1.Service) {
813+
for i := range allServices {
814+
proxier.OnServiceDelete(allServices[i])
815+
}
816+
817+
proxier.mu.Lock()
818+
defer proxier.mu.Unlock()
819+
proxier.servicesSynced = true
820+
}
490821
func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service {
491822
svc := &v1.Service{
492823
ObjectMeta: metav1.ObjectMeta{
@@ -511,6 +842,16 @@ func makeEndpointsMap(proxier *Proxier, allEndpoints ...*v1.Endpoints) {
511842
proxier.endpointsSynced = true
512843
}
513844

845+
func deleteEndpoints(proxier *Proxier, allEndpoints ...*v1.Endpoints) {
846+
for i := range allEndpoints {
847+
proxier.OnEndpointsDelete(allEndpoints[i])
848+
}
849+
850+
proxier.mu.Lock()
851+
defer proxier.mu.Unlock()
852+
proxier.endpointsSynced = true
853+
}
854+
514855
func makeTestEndpoints(namespace, name string, eptFunc func(*v1.Endpoints)) *v1.Endpoints {
515856
ept := &v1.Endpoints{
516857
ObjectMeta: metav1.ObjectMeta{

0 commit comments

Comments
 (0)