From 61c6f5b1d16c39a094d8a8f9b094e6f18881b138 Mon Sep 17 00:00:00 2001 From: Ashish Tiwari Date: Tue, 16 Sep 2025 21:12:12 +0530 Subject: [PATCH] feat: add support for named serviceport in ApisixRoute backend --- internal/adc/translator/apisixroute.go | 41 +++++- internal/controller/apisixroute_controller.go | 18 ++- test/e2e/crds/v2/route.go | 133 +++++++++++++----- 3 files changed, 148 insertions(+), 44 deletions(-) diff --git a/internal/adc/translator/apisixroute.go b/internal/adc/translator/apisixroute.go index 4adcce6ab..6aed915db 100644 --- a/internal/adc/translator/apisixroute.go +++ b/internal/adc/translator/apisixroute.go @@ -29,6 +29,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/ptr" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" @@ -305,6 +306,26 @@ func (t *Translator) buildService(ar *apiv2.ApisixRoute, rule apiv2.ApisixRouteH return service } +func getPortFromService(svc *v1.Service, backendSvcPort intstr.IntOrString) (int32, error) { + var port int32 + if backendSvcPort.Type == intstr.Int { + port = int32(backendSvcPort.IntValue()) + } else { + found := false + for _, servicePort := range svc.Spec.Ports { + if servicePort.Name == backendSvcPort.StrVal { + port = servicePort.Port + found = true + break + } + } + if !found { + return 0, errors.Errorf("named port '%s' not found in service %s", backendSvcPort.StrVal, svc.Name) + } + } + return port, nil +} + func (t *Translator) translateApisixRouteBackendResolveGranularityService(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, error) { serviceNN := types.NamespacedName{ Namespace: arNN.Namespace, @@ -317,16 +338,32 @@ func (t *Translator) translateApisixRouteBackendResolveGranularityService(tctx * if svc.Spec.ClusterIP == "" { return nil, errors.Errorf("conflict headless service and backend resolve granularity, ApisixRoute: %s, Service: %s", arNN, serviceNN) } + port, err := getPortFromService(svc, backend.ServicePort) + if err != nil { + return nil, err + } return adc.UpstreamNodes{ { Host: svc.Spec.ClusterIP, - Port: backend.ServicePort.IntValue(), + Port: int(port), Weight: *cmp.Or(backend.Weight, ptr.To(apiv2.DefaultWeight)), }, }, nil } func (t *Translator) translateApisixRouteBackendResolveGranularityEndpoint(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, error) { + serviceNN := types.NamespacedName{ + Namespace: arNN.Namespace, + Name: backend.ServiceName, + } + svc, ok := tctx.Services[serviceNN] + if !ok { + return nil, errors.Errorf("service not found, ApisixRoute: %s, Service: %s", arNN, serviceNN) + } + port, err := getPortFromService(svc, backend.ServicePort) + if err != nil { + return nil, err + } weight := int32(*cmp.Or(backend.Weight, ptr.To(apiv2.DefaultWeight))) backendRef := gatewayv1.BackendRef{ BackendObjectReference: gatewayv1.BackendObjectReference{ @@ -334,7 +371,7 @@ func (t *Translator) translateApisixRouteBackendResolveGranularityEndpoint(tctx Kind: (*gatewayv1.Kind)(ptr.To("Service")), Name: gatewayv1.ObjectName(backend.ServiceName), Namespace: (*gatewayv1.Namespace)(&arNN.Namespace), - Port: (*gatewayv1.PortNumber)(&backend.ServicePort.IntVal), + Port: (*gatewayv1.PortNumber)(&port), }, Weight: &weight, } diff --git a/internal/controller/apisixroute_controller.go b/internal/controller/apisixroute_controller.go index d6f8c5150..99ff17691 100644 --- a/internal/controller/apisixroute_controller.go +++ b/internal/controller/apisixroute_controller.go @@ -36,6 +36,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -417,10 +418,21 @@ func (r *ApisixRouteReconciler) validateBackends(ctx context.Context, tc *provid } if !slices.ContainsFunc(service.Spec.Ports, func(port corev1.ServicePort) bool { - return port.Port == int32(backend.ServicePort.IntValue()) + if backend.ServicePort.Type == intstr.Int { + return port.Port == int32(backend.ServicePort.IntValue()) + } + + if backend.ServicePort.Type == intstr.String { + return port.Name == backend.ServicePort.StrVal + } + return false }) { - r.Log.Error(errors.New("port not found in service"), "Service", serviceNN, "port", backend.ServicePort.String()) - continue + if backend.ServicePort.Type == intstr.Int { + r.Log.Error(errors.New("port not found in service"), "Service", serviceNN, "port", backend.ServicePort.IntValue()) + } else { + r.Log.Error(errors.New("named port not found in service"), "Service", serviceNN, "port", backend.ServicePort.StrVal) + } + return nil } tc.Services[serviceNN] = &service diff --git a/test/e2e/crds/v2/route.go b/test/e2e/crds/v2/route.go index 826745c4b..6859822b7 100644 --- a/test/e2e/crds/v2/route.go +++ b/test/e2e/crds/v2/route.go @@ -61,7 +61,7 @@ var _ = Describe("Test ApisixRoute", Label("apisix.apache.org", "v2", "apisixrou Context("Test ApisixRoute", func() { - It("Basic tests", func() { + Context("Basic tests", func() { const apisixRouteSpec = ` apiVersion: apisix.apache.org/v2 kind: ApisixRoute @@ -81,56 +81,111 @@ spec: - serviceName: httpbin-service-e2e-test servicePort: 80 ` - request := func(path string) int { - return s.NewAPISIXClient().GET(path).WithHost("httpbin").Expect().Raw().StatusCode - } - By("apply ApisixRoute") - var apisixRoute apiv2.ApisixRoute - applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, - &apisixRoute, fmt.Sprintf(apisixRouteSpec, s.Namespace(), s.Namespace(), "/get")) + const apisixRouteSpecWithNameServiceAndGranularity = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: default + namespace: %s +spec: + ingressClassName: %s + http: + - name: rule0 + match: + hosts: + - httpbin + paths: + - %s + backends: + - serviceName: httpbin-service-e2e-test + servicePort: http + resolveGranularity: service +` - By("verify ApisixRoute works") - Eventually(request).WithArguments("/get").WithTimeout(20 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusOK)) + const apisixRouteSpecWithNameServicePort = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: default + namespace: %s +spec: + ingressClassName: %s + http: + - name: rule0 + match: + hosts: + - httpbin + paths: + - %s + backends: + - serviceName: httpbin-service-e2e-test + servicePort: http +` + test := func(apisixRouteSpec string) { + request := func(path string) int { + return s.NewAPISIXClient().GET(path).WithHost("httpbin").Expect().Raw().StatusCode + } - By("update ApisixRoute") - applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, - &apisixRoute, fmt.Sprintf(apisixRouteSpec, s.Namespace(), s.Namespace(), "/headers")) - Eventually(request).WithArguments("/get").WithTimeout(20 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusNotFound)) - s.NewAPISIXClient().GET("/headers").WithHost("httpbin").Expect().Status(http.StatusOK) + By("apply ApisixRoute") + var apisixRoute apiv2.ApisixRoute + applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, + &apisixRoute, fmt.Sprintf(apisixRouteSpec, s.Namespace(), s.Namespace(), "/get")) - By("delete ApisixRoute") - err := s.DeleteResource("ApisixRoute", "default") - Expect(err).ShouldNot(HaveOccurred(), "deleting ApisixRoute") - Eventually(request).WithArguments("/headers").WithTimeout(20 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusNotFound)) + By("verify ApisixRoute works") + Eventually(request).WithArguments("/get").WithTimeout(20 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusOK)) - By("request /metrics endpoint from controller") + By("update ApisixRoute") + applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, + &apisixRoute, fmt.Sprintf(apisixRouteSpec, s.Namespace(), s.Namespace(), "/headers")) + Eventually(request).WithArguments("/get").WithTimeout(20 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusNotFound)) + s.NewAPISIXClient().GET("/headers").WithHost("httpbin").Expect().Status(http.StatusOK) - // Get the metrics service endpoint - metricsURL := s.GetMetricsEndpoint() + By("delete ApisixRoute") + err := s.DeleteResource("ApisixRoute", "default") + Expect(err).ShouldNot(HaveOccurred(), "deleting ApisixRoute") + Eventually(request).WithArguments("/headers").WithTimeout(20 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusNotFound)) - By("verify metrics content") - resp, err := http.Get(metricsURL) - Expect(err).ShouldNot(HaveOccurred(), "request metrics endpoint") - defer func() { - _ = resp.Body.Close() - }() + By("request /metrics endpoint from controller") - Expect(resp.StatusCode).Should(Equal(http.StatusOK)) + // Get the metrics service endpoint + metricsURL := s.GetMetricsEndpoint() - body, err := io.ReadAll(resp.Body) - Expect(err).ShouldNot(HaveOccurred(), "read metrics response") + By("verify metrics content") + resp, err := http.Get(metricsURL) + Expect(err).ShouldNot(HaveOccurred(), "request metrics endpoint") + defer func() { + _ = resp.Body.Close() + }() - bodyStr := string(body) + Expect(resp.StatusCode).Should(Equal(http.StatusOK)) - // Verify prometheus format - Expect(resp.Header.Get("Content-Type")).Should(ContainSubstring("text/plain; version=0.0.4; charset=utf-8")) + body, err := io.ReadAll(resp.Body) + Expect(err).ShouldNot(HaveOccurred(), "read metrics response") - // Verify specific metrics from metrics.go exist - Expect(bodyStr).Should(ContainSubstring("apisix_ingress_adc_sync_duration_seconds")) - Expect(bodyStr).Should(ContainSubstring("apisix_ingress_adc_sync_total")) - Expect(bodyStr).Should(ContainSubstring("apisix_ingress_status_update_queue_length")) - Expect(bodyStr).Should(ContainSubstring("apisix_ingress_file_io_duration_seconds")) + bodyStr := string(body) + + // Verify prometheus format + Expect(resp.Header.Get("Content-Type")).Should(ContainSubstring("text/plain; version=0.0.4; charset=utf-8")) + + // Verify specific metrics from metrics.go exist + Expect(bodyStr).Should(ContainSubstring("apisix_ingress_adc_sync_duration_seconds")) + Expect(bodyStr).Should(ContainSubstring("apisix_ingress_adc_sync_total")) + Expect(bodyStr).Should(ContainSubstring("apisix_ingress_status_update_queue_length")) + Expect(bodyStr).Should(ContainSubstring("apisix_ingress_file_io_duration_seconds")) + + // Log metrics for debugging + fmt.Printf("Metrics endpoint response:\n%s\n", bodyStr) + } + It("Basic", func() { + test(apisixRouteSpec) + }) + It("Basic: with named service port", func() { + test(apisixRouteSpecWithNameServicePort) + }) + It("Basic: with named service port and granularity service", func() { + test(apisixRouteSpecWithNameServiceAndGranularity) + }) }) It("Test plugins in ApisixRoute", func() {