Skip to content

Commit 8e17456

Browse files
authored
Merge pull request kubernetes#127417 from aojea/endpoints_updated_rv
bugfix: endpoints controller track resource version conrrectly
2 parents 203293e + dba4e42 commit 8e17456

File tree

2 files changed

+204
-2
lines changed

2 files changed

+204
-2
lines changed

pkg/controller/endpoint/endpoints_controller.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -532,12 +532,13 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
532532
}
533533

534534
logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps)
535+
var updatedEndpoints *v1.Endpoints
535536
if createEndpoints {
536537
// No previous endpoints, create them
537538
_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{})
538539
} else {
539540
// Pre-existing
540-
_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{})
541+
updatedEndpoints, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{})
541542
}
542543
if err != nil {
543544
if createEndpoints && errors.IsForbidden(err) {
@@ -564,7 +565,10 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
564565
// If the current endpoints is updated we track the old resource version, so
565566
// if we obtain this resource version again from the lister we know is outdated
566567
// and we need to retry later to wait for the informer cache to be up-to-date.
567-
if !createEndpoints {
568+
// there are some operations (webhooks, truncated endpoints, ...) that can potentially cause endpoints updates became noop
569+
// and return the same resourceVersion.
570+
// Ref: https://issues.k8s.io/127370 , https://issues.k8s.io/126578
571+
if updatedEndpoints != nil && updatedEndpoints.ResourceVersion != currentEndpoints.ResourceVersion {
568572
e.staleEndpointsTracker.Stale(currentEndpoints)
569573
}
570574
return nil

test/integration/endpoints/endpoints_test.go

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@ import (
2828
apierrors "k8s.io/apimachinery/pkg/api/errors"
2929
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3030
"k8s.io/apimachinery/pkg/util/intstr"
31+
"k8s.io/apimachinery/pkg/util/sets"
3132
"k8s.io/apimachinery/pkg/util/wait"
3233
"k8s.io/client-go/informers"
3334
clientset "k8s.io/client-go/kubernetes"
3435
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
3536
"k8s.io/kubernetes/pkg/controller/endpoint"
3637
"k8s.io/kubernetes/test/integration/framework"
3738
"k8s.io/kubernetes/test/utils/ktesting"
39+
netutils "k8s.io/utils/net"
3840
)
3941

4042
func TestEndpointUpdates(t *testing.T) {
@@ -605,3 +607,199 @@ func newExternalNameService(namespace, name string) *v1.Service {
605607
svc.Spec.ExternalName = "google.com"
606608
return svc
607609
}
610+
611+
func TestEndpointTruncate(t *testing.T) {
612+
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
613+
server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
614+
defer server.TearDownFn()
615+
616+
client, err := clientset.NewForConfig(server.ClientConfig)
617+
if err != nil {
618+
t.Fatalf("Error creating clientset: %v", err)
619+
}
620+
621+
informers := informers.NewSharedInformerFactory(client, 0)
622+
623+
tCtx := ktesting.Init(t)
624+
epController := endpoint.NewEndpointController(
625+
tCtx,
626+
informers.Core().V1().Pods(),
627+
informers.Core().V1().Services(),
628+
informers.Core().V1().Endpoints(),
629+
client,
630+
0)
631+
632+
// Start informer and controllers
633+
informers.Start(tCtx.Done())
634+
go epController.Run(tCtx, 1)
635+
636+
// Create namespace
637+
ns := framework.CreateNamespaceOrDie(client, "test-endpoints-truncate", t)
638+
defer framework.DeleteNamespaceOrDie(client, ns, t)
639+
640+
// Create a pod with labels
641+
basePod := &v1.Pod{
642+
ObjectMeta: metav1.ObjectMeta{
643+
Name: "test-pod",
644+
Labels: labelMap(),
645+
},
646+
Spec: v1.PodSpec{
647+
NodeName: "fake-node",
648+
Containers: []v1.Container{
649+
{
650+
Name: "fakename",
651+
Image: "fakeimage",
652+
Ports: []v1.ContainerPort{
653+
{
654+
Name: "port-443",
655+
ContainerPort: 443,
656+
},
657+
},
658+
},
659+
},
660+
},
661+
Status: v1.PodStatus{
662+
Phase: v1.PodRunning,
663+
Conditions: []v1.PodCondition{
664+
{
665+
Type: v1.PodReady,
666+
Status: v1.ConditionTrue,
667+
},
668+
},
669+
PodIP: "10.0.0.1",
670+
PodIPs: []v1.PodIP{
671+
{
672+
IP: "10.0.0.1",
673+
},
674+
},
675+
},
676+
}
677+
678+
// create 1001 Pods to reach endpoint max capacity that is set to 1000
679+
allPodNames := sets.New[string]()
680+
baseIP := netutils.BigForIP(netutils.ParseIPSloppy("10.0.0.1"))
681+
for i := 0; i < 1001; i++ {
682+
pod := basePod.DeepCopy()
683+
pod.Name = fmt.Sprintf("%s-%d", basePod.Name, i)
684+
allPodNames.Insert(pod.Name)
685+
podIP := netutils.AddIPOffset(baseIP, i).String()
686+
pod.Status.PodIP = podIP
687+
pod.Status.PodIPs[0] = v1.PodIP{IP: podIP}
688+
createdPod, err := client.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{})
689+
if err != nil {
690+
t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
691+
}
692+
693+
createdPod.Status = pod.Status
694+
_, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, createdPod, metav1.UpdateOptions{})
695+
if err != nil {
696+
t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err)
697+
}
698+
}
699+
700+
// Create a service associated to the pod
701+
svc := &v1.Service{
702+
ObjectMeta: metav1.ObjectMeta{
703+
Name: "test-service",
704+
Namespace: ns.Name,
705+
Labels: map[string]string{
706+
"foo": "bar",
707+
},
708+
},
709+
Spec: v1.ServiceSpec{
710+
Selector: map[string]string{
711+
"foo": "bar",
712+
},
713+
Ports: []v1.ServicePort{
714+
{Name: "port-443", Port: 443, Protocol: "TCP", TargetPort: intstr.FromInt32(443)},
715+
},
716+
},
717+
}
718+
_, err = client.CoreV1().Services(ns.Name).Create(tCtx, svc, metav1.CreateOptions{})
719+
if err != nil {
720+
t.Fatalf("Failed to create service %s: %v", svc.Name, err)
721+
}
722+
723+
var truncatedPodName string
724+
// poll until associated Endpoints to the previously created Service exists
725+
if err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) {
726+
podNames := sets.New[string]()
727+
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{})
728+
if err != nil {
729+
return false, nil
730+
}
731+
732+
for _, subset := range endpoints.Subsets {
733+
for _, address := range subset.Addresses {
734+
podNames.Insert(address.TargetRef.Name)
735+
}
736+
}
737+
738+
if podNames.Len() != 1000 {
739+
return false, nil
740+
}
741+
742+
truncated, ok := endpoints.Annotations[v1.EndpointsOverCapacity]
743+
if !ok || truncated != "truncated" {
744+
return false, nil
745+
}
746+
// There is only 1 truncated Pod.
747+
truncatedPodName, _ = allPodNames.Difference(podNames).PopAny()
748+
return true, nil
749+
}); err != nil {
750+
t.Fatalf("endpoints not found: %v", err)
751+
}
752+
753+
// Update the truncated Pod several times to make endpoints controller resync the service.
754+
truncatedPod, err := client.CoreV1().Pods(ns.Name).Get(tCtx, truncatedPodName, metav1.GetOptions{})
755+
if err != nil {
756+
t.Fatalf("Failed to get pod %s: %v", truncatedPodName, err)
757+
}
758+
for i := 0; i < 10; i++ {
759+
truncatedPod.Status.Conditions[0].Status = v1.ConditionFalse
760+
truncatedPod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, truncatedPod, metav1.UpdateOptions{})
761+
if err != nil {
762+
t.Fatalf("Failed to update status of pod %s: %v", truncatedPod.Name, err)
763+
}
764+
truncatedPod.Status.Conditions[0].Status = v1.ConditionTrue
765+
truncatedPod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, truncatedPod, metav1.UpdateOptions{})
766+
if err != nil {
767+
t.Fatalf("Failed to update status of pod %s: %v", truncatedPod.Name, err)
768+
}
769+
}
770+
771+
// delete 501 Pods
772+
for i := 500; i < 1001; i++ {
773+
podName := fmt.Sprintf("%s-%d", basePod.Name, i)
774+
err = client.CoreV1().Pods(ns.Name).Delete(tCtx, podName, metav1.DeleteOptions{})
775+
if err != nil {
776+
t.Fatalf("error deleting test pod: %v", err)
777+
}
778+
}
779+
780+
// poll until endpoints for deleted Pod are no longer in Endpoints.
781+
if err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) {
782+
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{})
783+
if err != nil {
784+
return false, nil
785+
}
786+
787+
numEndpoints := 0
788+
for _, subset := range endpoints.Subsets {
789+
numEndpoints += len(subset.Addresses)
790+
}
791+
792+
if numEndpoints != 500 {
793+
return false, nil
794+
}
795+
796+
truncated, ok := endpoints.Annotations[v1.EndpointsOverCapacity]
797+
if ok || truncated == "truncated" {
798+
return false, nil
799+
}
800+
801+
return true, nil
802+
}); err != nil {
803+
t.Fatalf("error checking for no endpoints with terminating pods: %v", err)
804+
}
805+
}

0 commit comments

Comments
 (0)