Skip to content

Commit dba4e42

Browse files
M00nF1shaojea
authored andcommitted
bugfix: endpoints controller track resource version conrrectly
The endpoints controller store the resource version of the previous Endpoints objects to avoid issues related to stale information on the cache. However, there can be update operations that succeed without increasing the resource version, causing the endpoints controller to declare stale the existing Resource Version and stopping the Endpoints to be updated. Co-Author-By: Quan Tian <[email protected]> Co-Author-By: Yang Yang <[email protected]>
1 parent 3baaac6 commit dba4e42

File tree

2 files changed

+204
-2
lines changed

2 files changed

+204
-2
lines changed

pkg/controller/endpoint/endpoints_controller.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -532,12 +532,13 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
532532
}
533533

534534
logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps)
535+
var updatedEndpoints *v1.Endpoints
535536
if createEndpoints {
536537
// No previous endpoints, create them
537538
_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{})
538539
} else {
539540
// Pre-existing
540-
_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{})
541+
updatedEndpoints, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{})
541542
}
542543
if err != nil {
543544
if createEndpoints && errors.IsForbidden(err) {
@@ -564,7 +565,10 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
564565
// If the current endpoints is updated we track the old resource version, so
565566
// if we obtain this resource version again from the lister we know is outdated
566567
// and we need to retry later to wait for the informer cache to be up-to-date.
567-
if !createEndpoints {
568+
// there are some operations (webhooks, truncated endpoints, ...) that can potentially cause endpoints updates became noop
569+
// and return the same resourceVersion.
570+
// Ref: https://issues.k8s.io/127370 , https://issues.k8s.io/126578
571+
if updatedEndpoints != nil && updatedEndpoints.ResourceVersion != currentEndpoints.ResourceVersion {
568572
e.staleEndpointsTracker.Stale(currentEndpoints)
569573
}
570574
return nil

test/integration/endpoints/endpoints_test.go

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@ import (
2828
apierrors "k8s.io/apimachinery/pkg/api/errors"
2929
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3030
"k8s.io/apimachinery/pkg/util/intstr"
31+
"k8s.io/apimachinery/pkg/util/sets"
3132
"k8s.io/apimachinery/pkg/util/wait"
3233
"k8s.io/client-go/informers"
3334
clientset "k8s.io/client-go/kubernetes"
3435
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
3536
"k8s.io/kubernetes/pkg/controller/endpoint"
3637
"k8s.io/kubernetes/test/integration/framework"
3738
"k8s.io/kubernetes/test/utils/ktesting"
39+
netutils "k8s.io/utils/net"
3840
)
3941

4042
func TestEndpointUpdates(t *testing.T) {
@@ -605,3 +607,199 @@ func newExternalNameService(namespace, name string) *v1.Service {
605607
svc.Spec.ExternalName = "google.com"
606608
return svc
607609
}
610+
611+
func TestEndpointTruncate(t *testing.T) {
612+
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
613+
server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
614+
defer server.TearDownFn()
615+
616+
client, err := clientset.NewForConfig(server.ClientConfig)
617+
if err != nil {
618+
t.Fatalf("Error creating clientset: %v", err)
619+
}
620+
621+
informers := informers.NewSharedInformerFactory(client, 0)
622+
623+
tCtx := ktesting.Init(t)
624+
epController := endpoint.NewEndpointController(
625+
tCtx,
626+
informers.Core().V1().Pods(),
627+
informers.Core().V1().Services(),
628+
informers.Core().V1().Endpoints(),
629+
client,
630+
0)
631+
632+
// Start informer and controllers
633+
informers.Start(tCtx.Done())
634+
go epController.Run(tCtx, 1)
635+
636+
// Create namespace
637+
ns := framework.CreateNamespaceOrDie(client, "test-endpoints-truncate", t)
638+
defer framework.DeleteNamespaceOrDie(client, ns, t)
639+
640+
// Create a pod with labels
641+
basePod := &v1.Pod{
642+
ObjectMeta: metav1.ObjectMeta{
643+
Name: "test-pod",
644+
Labels: labelMap(),
645+
},
646+
Spec: v1.PodSpec{
647+
NodeName: "fake-node",
648+
Containers: []v1.Container{
649+
{
650+
Name: "fakename",
651+
Image: "fakeimage",
652+
Ports: []v1.ContainerPort{
653+
{
654+
Name: "port-443",
655+
ContainerPort: 443,
656+
},
657+
},
658+
},
659+
},
660+
},
661+
Status: v1.PodStatus{
662+
Phase: v1.PodRunning,
663+
Conditions: []v1.PodCondition{
664+
{
665+
Type: v1.PodReady,
666+
Status: v1.ConditionTrue,
667+
},
668+
},
669+
PodIP: "10.0.0.1",
670+
PodIPs: []v1.PodIP{
671+
{
672+
IP: "10.0.0.1",
673+
},
674+
},
675+
},
676+
}
677+
678+
// create 1001 Pods to reach endpoint max capacity that is set to 1000
679+
allPodNames := sets.New[string]()
680+
baseIP := netutils.BigForIP(netutils.ParseIPSloppy("10.0.0.1"))
681+
for i := 0; i < 1001; i++ {
682+
pod := basePod.DeepCopy()
683+
pod.Name = fmt.Sprintf("%s-%d", basePod.Name, i)
684+
allPodNames.Insert(pod.Name)
685+
podIP := netutils.AddIPOffset(baseIP, i).String()
686+
pod.Status.PodIP = podIP
687+
pod.Status.PodIPs[0] = v1.PodIP{IP: podIP}
688+
createdPod, err := client.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{})
689+
if err != nil {
690+
t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
691+
}
692+
693+
createdPod.Status = pod.Status
694+
_, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, createdPod, metav1.UpdateOptions{})
695+
if err != nil {
696+
t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err)
697+
}
698+
}
699+
700+
// Create a service associated to the pod
701+
svc := &v1.Service{
702+
ObjectMeta: metav1.ObjectMeta{
703+
Name: "test-service",
704+
Namespace: ns.Name,
705+
Labels: map[string]string{
706+
"foo": "bar",
707+
},
708+
},
709+
Spec: v1.ServiceSpec{
710+
Selector: map[string]string{
711+
"foo": "bar",
712+
},
713+
Ports: []v1.ServicePort{
714+
{Name: "port-443", Port: 443, Protocol: "TCP", TargetPort: intstr.FromInt32(443)},
715+
},
716+
},
717+
}
718+
_, err = client.CoreV1().Services(ns.Name).Create(tCtx, svc, metav1.CreateOptions{})
719+
if err != nil {
720+
t.Fatalf("Failed to create service %s: %v", svc.Name, err)
721+
}
722+
723+
var truncatedPodName string
724+
// poll until associated Endpoints to the previously created Service exists
725+
if err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) {
726+
podNames := sets.New[string]()
727+
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{})
728+
if err != nil {
729+
return false, nil
730+
}
731+
732+
for _, subset := range endpoints.Subsets {
733+
for _, address := range subset.Addresses {
734+
podNames.Insert(address.TargetRef.Name)
735+
}
736+
}
737+
738+
if podNames.Len() != 1000 {
739+
return false, nil
740+
}
741+
742+
truncated, ok := endpoints.Annotations[v1.EndpointsOverCapacity]
743+
if !ok || truncated != "truncated" {
744+
return false, nil
745+
}
746+
// There is only 1 truncated Pod.
747+
truncatedPodName, _ = allPodNames.Difference(podNames).PopAny()
748+
return true, nil
749+
}); err != nil {
750+
t.Fatalf("endpoints not found: %v", err)
751+
}
752+
753+
// Update the truncated Pod several times to make endpoints controller resync the service.
754+
truncatedPod, err := client.CoreV1().Pods(ns.Name).Get(tCtx, truncatedPodName, metav1.GetOptions{})
755+
if err != nil {
756+
t.Fatalf("Failed to get pod %s: %v", truncatedPodName, err)
757+
}
758+
for i := 0; i < 10; i++ {
759+
truncatedPod.Status.Conditions[0].Status = v1.ConditionFalse
760+
truncatedPod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, truncatedPod, metav1.UpdateOptions{})
761+
if err != nil {
762+
t.Fatalf("Failed to update status of pod %s: %v", truncatedPod.Name, err)
763+
}
764+
truncatedPod.Status.Conditions[0].Status = v1.ConditionTrue
765+
truncatedPod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, truncatedPod, metav1.UpdateOptions{})
766+
if err != nil {
767+
t.Fatalf("Failed to update status of pod %s: %v", truncatedPod.Name, err)
768+
}
769+
}
770+
771+
// delete 501 Pods
772+
for i := 500; i < 1001; i++ {
773+
podName := fmt.Sprintf("%s-%d", basePod.Name, i)
774+
err = client.CoreV1().Pods(ns.Name).Delete(tCtx, podName, metav1.DeleteOptions{})
775+
if err != nil {
776+
t.Fatalf("error deleting test pod: %v", err)
777+
}
778+
}
779+
780+
// poll until endpoints for deleted Pod are no longer in Endpoints.
781+
if err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) {
782+
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{})
783+
if err != nil {
784+
return false, nil
785+
}
786+
787+
numEndpoints := 0
788+
for _, subset := range endpoints.Subsets {
789+
numEndpoints += len(subset.Addresses)
790+
}
791+
792+
if numEndpoints != 500 {
793+
return false, nil
794+
}
795+
796+
truncated, ok := endpoints.Annotations[v1.EndpointsOverCapacity]
797+
if ok || truncated == "truncated" {
798+
return false, nil
799+
}
800+
801+
return true, nil
802+
}); err != nil {
803+
t.Fatalf("error checking for no endpoints with terminating pods: %v", err)
804+
}
805+
}

0 commit comments

Comments
 (0)