diff --git a/internal/controller/apisixroute_controller.go b/internal/controller/apisixroute_controller.go index a86a663c2..64760d1d5 100644 --- a/internal/controller/apisixroute_controller.go +++ b/internal/controller/apisixroute_controller.go @@ -373,44 +373,15 @@ func (r *ApisixRouteReconciler) validateBackends(ctx context.Context, tc *provid } tc.Services[serviceNN] = &service - // Conditionally collect EndpointSlice or Endpoints based on cluster API support - if r.supportsEndpointSlice { - var endpoints discoveryv1.EndpointSliceList - if err := r.List(ctx, &endpoints, - client.InNamespace(service.Namespace), - client.MatchingLabels{ - discoveryv1.LabelServiceName: service.Name, - }, - ); err != nil { - return types.ReasonError{ - Reason: string(apiv2.ConditionReasonInvalidSpec), - Message: fmt.Sprintf("failed to list endpoint slices: %v", err), - } - } + // backend.subset specifies a subset of upstream nodes. + // It specifies that the target pod's label should be a superset of the subset labels of the ApisixUpstream of the serviceName + subsetLabels := r.getSubsetLabels(tc, serviceNN, backend) - // backend.subset specifies a subset of upstream nodes. - // It specifies that the target pod's label should be a superset of the subset labels of the ApisixUpstream of the serviceName - subsetLabels := r.getSubsetLabels(tc, serviceNN, backend) - tc.EndpointSlices[serviceNN] = r.filterEndpointSlicesBySubsetLabels(ctx, endpoints.Items, subsetLabels) - } else { - // Fallback to Endpoints API for Kubernetes 1.18 compatibility - var ep corev1.Endpoints - if err := r.Get(ctx, serviceNN, &ep); err != nil { - if client.IgnoreNotFound(err) != nil { - return types.ReasonError{ - Reason: string(apiv2.ConditionReasonInvalidSpec), - Message: fmt.Sprintf("failed to get endpoints: %v", err), - } - } - // If endpoints not found, create empty EndpointSlice list - tc.EndpointSlices[serviceNN] = []discoveryv1.EndpointSlice{} - } else { - // Convert Endpoints to EndpointSlice format for internal consistency - convertedEndpointSlices := pkgutils.ConvertEndpointsToEndpointSlice(&ep) - - // Apply subset filtering to converted EndpointSlices - subsetLabels := r.getSubsetLabels(tc, serviceNN, backend) - tc.EndpointSlices[serviceNN] = r.filterEndpointSlicesBySubsetLabels(ctx, convertedEndpointSlices, subsetLabels) + // Collect endpoints with EndpointSlice support and subset filtering + if err := resolveServiceEndpoints(ctx, r.Client, tc, serviceNN, r.supportsEndpointSlice, subsetLabels); err != nil { + return types.ReasonError{ + Reason: string(apiv2.ConditionReasonInvalidSpec), + Message: err.Error(), } } } @@ -691,41 +662,3 @@ func (r *ApisixRouteReconciler) getSubsetLabels(tctx *provider.TranslateContext, return nil } - -func (r *ApisixRouteReconciler) filterEndpointSlicesBySubsetLabels(ctx context.Context, in []discoveryv1.EndpointSlice, labels map[string]string) []discoveryv1.EndpointSlice { - if len(labels) == 0 { - return in - } - - for i := range in { - in[i] = r.filterEndpointSliceByTargetPod(ctx, in[i], labels) - } - - return utils.Filter(in, func(v discoveryv1.EndpointSlice) bool { - return len(v.Endpoints) > 0 - }) -} - -// filterEndpointSliceByTargetPod filters item.Endpoints which is not a subset of labels -func (r *ApisixRouteReconciler) filterEndpointSliceByTargetPod(ctx context.Context, item discoveryv1.EndpointSlice, labels map[string]string) discoveryv1.EndpointSlice { - item.Endpoints = utils.Filter(item.Endpoints, func(v discoveryv1.Endpoint) bool { - if v.TargetRef == nil || v.TargetRef.Kind != KindPod { - return true - } - - var ( - pod corev1.Pod - podNN = k8stypes.NamespacedName{ - Namespace: v.TargetRef.Namespace, - Name: v.TargetRef.Name, - } - ) - if err := r.Get(ctx, podNN, &pod); err != nil { - return false - } - - return utils.IsSubsetOf(labels, pod.GetLabels()) - }) - - return item -} diff --git a/internal/controller/gatewayproxy_controller.go b/internal/controller/gatewayproxy_controller.go index 498094000..5a7fcc66f 100644 --- a/internal/controller/gatewayproxy_controller.go +++ b/internal/controller/gatewayproxy_controller.go @@ -114,10 +114,16 @@ func (r *GatewayProxyController) Reconcile(ctx context.Context, req ctrl.Request if providerService == nil { tctx.EndpointSlices[req.NamespacedName] = nil } else { - if err := addProviderEndpointsToTranslateContextWithEndpointSliceSupport(tctx, r.Client, k8stypes.NamespacedName{ + serviceNN := k8stypes.NamespacedName{ Namespace: gp.Namespace, Name: providerService.Name, - }, r.supportsEndpointSlice); err != nil { + } + service := &corev1.Service{} + if err := r.Get(ctx, serviceNN, service); err != nil { + return reconcile.Result{}, err + } + tctx.Services[serviceNN] = service + if err := resolveServiceEndpoints(tctx, r.Client, tctx, serviceNN, r.supportsEndpointSlice, nil); err != nil { return reconcile.Result{}, err } } diff --git a/internal/controller/httproute_controller.go b/internal/controller/httproute_controller.go index f924f2880..0a4630f63 100644 --- a/internal/controller/httproute_controller.go +++ b/internal/controller/httproute_controller.go @@ -570,36 +570,11 @@ func (r *HTTPRouteReconciler) processHTTPRouteBackendRefs(tctx *provider.Transla } tctx.Services[targetNN] = &service - // Conditionally collect EndpointSlice or Endpoints based on cluster API support - if r.supportsEndpointSlice { - endpointSliceList := new(discoveryv1.EndpointSliceList) - if err := r.List(tctx, endpointSliceList, - client.InNamespace(targetNN.Namespace), - client.MatchingLabels{ - discoveryv1.LabelServiceName: targetNN.Name, - }, - ); err != nil { - r.Log.Error(err, "failed to list endpoint slices", "Service", targetNN) - terr = err - continue - } - tctx.EndpointSlices[targetNN] = endpointSliceList.Items - } else { - // Fallback to Endpoints API for Kubernetes 1.18 compatibility - var endpoints corev1.Endpoints - if err := r.Get(tctx, targetNN, &endpoints); err != nil { - if client.IgnoreNotFound(err) != nil { - r.Log.Error(err, "failed to get endpoints", "Service", targetNN) - terr = err - continue - } - // If endpoints not found, create empty EndpointSlice list - tctx.EndpointSlices[targetNN] = []discoveryv1.EndpointSlice{} - } else { - // Convert Endpoints to EndpointSlice format for internal consistency - convertedEndpointSlices := pkgutils.ConvertEndpointsToEndpointSlice(&endpoints) - tctx.EndpointSlices[targetNN] = convertedEndpointSlices - } + // Collect endpoints with EndpointSlice support + if err := resolveServiceEndpoints(tctx, r.Client, tctx, targetNN, r.supportsEndpointSlice, nil); err != nil { + r.Log.Error(err, "failed to collect endpoints", "Service", targetNN) + terr = err + continue } } return terr diff --git a/internal/controller/ingress_controller.go b/internal/controller/ingress_controller.go index 9b08fe6aa..e7d2cb7dc 100644 --- a/internal/controller/ingress_controller.go +++ b/internal/controller/ingress_controller.go @@ -606,37 +606,10 @@ func (r *IngressReconciler) processBackendService(tctx *provider.TranslateContex return err } - // Conditionally get EndpointSlice or Endpoints based on cluster API support - if r.supportsEndpointSlice { - // get the endpoint slices - endpointSliceList := &discoveryv1.EndpointSliceList{} - if err := r.List(tctx, endpointSliceList, - client.InNamespace(namespace), - client.MatchingLabels{ - discoveryv1.LabelServiceName: backendService.Name, - }, - ); err != nil { - r.Log.Error(err, "failed to list endpoint slices", "namespace", namespace, "name", backendService.Name) - return err - } - - // save the endpoint slices to the translate context - tctx.EndpointSlices[serviceNS] = endpointSliceList.Items - } else { - // Fallback to Endpoints API for Kubernetes 1.18 compatibility - var endpoints corev1.Endpoints - if err := r.Get(tctx, serviceNS, &endpoints); err != nil { - if client.IgnoreNotFound(err) != nil { - r.Log.Error(err, "failed to get endpoints", "namespace", namespace, "name", backendService.Name) - return err - } - // If endpoints not found, create empty EndpointSlice list - tctx.EndpointSlices[serviceNS] = []discoveryv1.EndpointSlice{} - } else { - // Convert Endpoints to EndpointSlice format for internal consistency - convertedEndpointSlices := pkgutils.ConvertEndpointsToEndpointSlice(&endpoints) - tctx.EndpointSlices[serviceNS] = convertedEndpointSlices - } + // Collect endpoints with EndpointSlice support + if err := resolveServiceEndpoints(tctx, r.Client, tctx, serviceNS, r.supportsEndpointSlice, nil); err != nil { + r.Log.Error(err, "failed to collect endpoints", "namespace", namespace, "name", backendService.Name) + return err } tctx.Services[serviceNS] = &service return nil diff --git a/internal/controller/utils.go b/internal/controller/utils.go index 7df386283..aaf304ec3 100644 --- a/internal/controller/utils.go +++ b/internal/controller/utils.go @@ -961,10 +961,11 @@ func ProcessGatewayProxy(r client.Client, log logr.Logger, tctx *provider.Transl } if cp.Service != nil { - if err := addProviderEndpointsToTranslateContext(tctx, r, k8stypes.NamespacedName{ + serviceNN := k8stypes.NamespacedName{ Namespace: gatewayProxy.GetNamespace(), Name: cp.Service.Name, - }); err != nil { + } + if err := addProviderEndpointsToTranslateContext(tctx, r, serviceNN); err != nil { return err } } @@ -1371,10 +1372,11 @@ func ProcessIngressClassParameters(tctx *provider.TranslateContext, c client.Cli // process control plane provider service if cp.Service != nil { - if err := addProviderEndpointsToTranslateContext(tctx, c, client.ObjectKey{ + serviceNN := k8stypes.NamespacedName{ Namespace: gatewayProxy.GetNamespace(), Name: cp.Service.Name, - }); err != nil { + } + if err := addProviderEndpointsToTranslateContext(tctx, c, serviceNN); err != nil { return err } } @@ -1480,10 +1482,6 @@ func distinctRequests(requests []reconcile.Request) []reconcile.Request { } func addProviderEndpointsToTranslateContext(tctx *provider.TranslateContext, c client.Client, serviceNN k8stypes.NamespacedName) error { - return addProviderEndpointsToTranslateContextWithEndpointSliceSupport(tctx, c, serviceNN, true) -} - -func addProviderEndpointsToTranslateContextWithEndpointSliceSupport(tctx *provider.TranslateContext, c client.Client, serviceNN k8stypes.NamespacedName, supportsEndpointSlice bool) error { log.Debugw("to process provider endpoints by provider.service", zap.Any("service", serviceNN)) var ( service corev1.Service @@ -1494,39 +1492,7 @@ func addProviderEndpointsToTranslateContextWithEndpointSliceSupport(tctx *provid } tctx.Services[serviceNN] = &service - // Conditionally get EndpointSlice or Endpoints based on cluster API support - if supportsEndpointSlice { - // get es - var ( - esList discoveryv1.EndpointSliceList - ) - if err := c.List(tctx, &esList, - client.InNamespace(serviceNN.Namespace), - client.MatchingLabels{ - discoveryv1.LabelServiceName: serviceNN.Name, - }); err != nil { - log.Errorw("failed to get endpoints for GatewayProxy provider", zap.Error(err), zap.Any("endpoints", serviceNN)) - return err - } - tctx.EndpointSlices[serviceNN] = esList.Items - } else { - // Fallback to Endpoints API for Kubernetes 1.18 compatibility - var endpoints corev1.Endpoints - if err := c.Get(tctx, serviceNN, &endpoints); err != nil { - if client.IgnoreNotFound(err) != nil { - log.Errorw("failed to get endpoints for GatewayProxy provider", zap.Error(err), zap.Any("endpoints", serviceNN)) - return err - } - // If endpoints not found, create empty EndpointSlice list - tctx.EndpointSlices[serviceNN] = []discoveryv1.EndpointSlice{} - } else { - // Convert Endpoints to EndpointSlice format for internal consistency - convertedEndpointSlices := pkgutils.ConvertEndpointsToEndpointSlice(&endpoints) - tctx.EndpointSlices[serviceNN] = convertedEndpointSlices - } - } - - return nil + return resolveServiceEndpoints(tctx, c, tctx, serviceNN, true, nil) } func TypePredicate[T client.Object]() func(obj client.Object) bool { @@ -1579,3 +1545,94 @@ func watchEndpointSliceOrEndpoints(bdr *ctrl.Builder, supportsEndpointSlice bool return bdr.Watches(&corev1.Endpoints{}, handler.EnqueueRequestsFromMapFunc(endpointsMapFunc)) } } + +// resolveServiceEndpoints collects endpoints and adds them to the translate context +// It handles both EndpointSlice (K8s 1.19+) and Endpoints (K8s 1.18) APIs with automatic fallback +func resolveServiceEndpoints( + ctx context.Context, + c client.Client, + tctx *provider.TranslateContext, + serviceNN k8stypes.NamespacedName, + supportsEndpointSlice bool, + subsetLabels map[string]string, +) error { + if supportsEndpointSlice { + var endpoints discoveryv1.EndpointSliceList + if err := c.List(ctx, &endpoints, + client.InNamespace(serviceNN.Namespace), + client.MatchingLabels{ + discoveryv1.LabelServiceName: serviceNN.Name, + }, + ); err != nil { + return fmt.Errorf("failed to list endpoint slices: %v", err) + } + + if len(subsetLabels) == 0 { + tctx.EndpointSlices[serviceNN] = endpoints.Items + } else { + // Apply subset filtering + tctx.EndpointSlices[serviceNN] = filterEndpointSlicesBySubsetLabels(ctx, c, endpoints.Items, subsetLabels) + } + } else { + // Fallback to Endpoints API for Kubernetes 1.18 compatibility + var ep corev1.Endpoints + if err := c.Get(ctx, serviceNN, &ep); err != nil { + if client.IgnoreNotFound(err) != nil { + return fmt.Errorf("failed to get endpoints: %v", err) + } + // If endpoints not found, create empty EndpointSlice list + tctx.EndpointSlices[serviceNN] = []discoveryv1.EndpointSlice{} + } else { + // Convert Endpoints to EndpointSlice format for internal consistency + convertedEndpointSlices := pkgutils.ConvertEndpointsToEndpointSlice(&ep) + + if len(subsetLabels) == 0 { + tctx.EndpointSlices[serviceNN] = convertedEndpointSlices + } else { + // Apply subset filtering to converted EndpointSlices + tctx.EndpointSlices[serviceNN] = filterEndpointSlicesBySubsetLabels(ctx, c, convertedEndpointSlices, subsetLabels) + } + } + } + + return nil +} + +// filterEndpointSlicesBySubsetLabels filters EndpointSlices by subset labels +func filterEndpointSlicesBySubsetLabels(ctx context.Context, c client.Client, endpointSlices []discoveryv1.EndpointSlice, labels map[string]string) []discoveryv1.EndpointSlice { + if len(labels) == 0 { + return endpointSlices + } + + for i := range endpointSlices { + endpointSlices[i] = filterEndpointSliceByTargetPod(ctx, c, endpointSlices[i], labels) + } + + return utils.Filter(endpointSlices, func(v discoveryv1.EndpointSlice) bool { + return len(v.Endpoints) > 0 + }) +} + +// filterEndpointSliceByTargetPod filters item.Endpoints which is not a subset of labels +func filterEndpointSliceByTargetPod(ctx context.Context, c client.Client, item discoveryv1.EndpointSlice, labels map[string]string) discoveryv1.EndpointSlice { + item.Endpoints = utils.Filter(item.Endpoints, func(v discoveryv1.Endpoint) bool { + if v.TargetRef == nil || v.TargetRef.Kind != KindPod { + return true + } + + var ( + pod corev1.Pod + podNN = k8stypes.NamespacedName{ + Namespace: v.TargetRef.Namespace, + Name: v.TargetRef.Name, + } + ) + if err := c.Get(ctx, podNN, &pod); err != nil { + return false + } + + return utils.IsSubsetOf(labels, pod.GetLabels()) + }) + + return item +} diff --git a/test/e2e/crds/v1alpha1/gatewayproxy.go b/test/e2e/crds/v1alpha1/gatewayproxy.go index 8a934150d..20594ca82 100644 --- a/test/e2e/crds/v1alpha1/gatewayproxy.go +++ b/test/e2e/crds/v1alpha1/gatewayproxy.go @@ -168,20 +168,16 @@ spec: By("scale apisix to replicas 2") s.Deployer.DeployDataplane(scaffold.DeployDataplaneOptions{ - Replicas: ptr.To(2), + Replicas: ptr.To(2), + SkipCreateTunnels: true, }) By("check pod ready") - err = wait.PollUntilContextTimeout(context.Background(), time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) { - pods := s.GetPods(s.Namespace(), "app.kubernetes.io/name=apisix") - if len(pods) != 2 { + err = wait.PollUntilContextTimeout(context.Background(), time.Second, 60*time.Second, true, func(ctx context.Context) (done bool, err error) { + endpoints := s.GetEndpoints(s.Namespace(), framework.ProviderType) + if len(endpoints.Subsets) != 1 || len(endpoints.Subsets[0].Addresses) != 2 { return false, nil } - for _, pod := range pods { - if pod.Status.PodIP == "" { - return false, nil - } - } return true, nil }) Expect(err).NotTo(HaveOccurred(), "check pods ready") diff --git a/test/e2e/framework/k8s.go b/test/e2e/framework/k8s.go index 077622cf6..3447f7b27 100644 --- a/test/e2e/framework/k8s.go +++ b/test/e2e/framework/k8s.go @@ -215,6 +215,12 @@ func (f *Framework) GetPods(namespace, selector string) []corev1.Pod { return podList.Items } +func (f *Framework) GetEndpoints(namespace, name string) *corev1.Endpoints { + endpoints, err := f.clientset.CoreV1().Endpoints(cmp.Or(namespace, _namespace)).Get(f.Context, name, metav1.GetOptions{}) + f.GomegaT.Expect(err).ShouldNot(HaveOccurred()) + return endpoints +} + func (f *Framework) applySSLSecret(namespace, name string, cert, pkey, caCert []byte) { kind := "Secret" apiVersion := "v1"