Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions internal/adc/translator/apisixroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"

Expand Down Expand Up @@ -305,6 +306,26 @@ func (t *Translator) buildService(ar *apiv2.ApisixRoute, rule apiv2.ApisixRouteH
return service
}

func getPortFromService(svc *v1.Service, backendSvcPort intstr.IntOrString) (int32, error) {
var port int32
if backendSvcPort.Type == intstr.Int {
port = int32(backendSvcPort.IntValue())
} else {
found := false
for _, servicePort := range svc.Spec.Ports {
if servicePort.Name == backendSvcPort.StrVal {
port = servicePort.Port
found = true
break
}
}
if !found {
return 0, errors.Errorf("named port '%s' not found in service %s", backendSvcPort.StrVal, svc.Name)
}
}
return port, nil
}

func (t *Translator) translateApisixRouteBackendResolveGranularityService(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, error) {
serviceNN := types.NamespacedName{
Namespace: arNN.Namespace,
Expand All @@ -317,24 +338,40 @@ func (t *Translator) translateApisixRouteBackendResolveGranularityService(tctx *
if svc.Spec.ClusterIP == "" {
return nil, errors.Errorf("conflict headless service and backend resolve granularity, ApisixRoute: %s, Service: %s", arNN, serviceNN)
}
port, err := getPortFromService(svc, backend.ServicePort)
if err != nil {
return nil, err
}
return adc.UpstreamNodes{
{
Host: svc.Spec.ClusterIP,
Port: backend.ServicePort.IntValue(),
Port: int(port),
Weight: *cmp.Or(backend.Weight, ptr.To(apiv2.DefaultWeight)),
},
}, nil
}

func (t *Translator) translateApisixRouteBackendResolveGranularityEndpoint(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, error) {
serviceNN := types.NamespacedName{
Namespace: arNN.Namespace,
Name: backend.ServiceName,
}
svc, ok := tctx.Services[serviceNN]
if !ok {
return nil, errors.Errorf("service not found, ApisixRoute: %s, Service: %s", arNN, serviceNN)
}
port, err := getPortFromService(svc, backend.ServicePort)
if err != nil {
return nil, err
}
weight := int32(*cmp.Or(backend.Weight, ptr.To(apiv2.DefaultWeight)))
backendRef := gatewayv1.BackendRef{
BackendObjectReference: gatewayv1.BackendObjectReference{
Group: (*gatewayv1.Group)(&apiv2.GroupVersion.Group),
Kind: (*gatewayv1.Kind)(ptr.To("Service")),
Name: gatewayv1.ObjectName(backend.ServiceName),
Namespace: (*gatewayv1.Namespace)(&arNN.Namespace),
Port: (*gatewayv1.PortNumber)(&backend.ServicePort.IntVal),
Port: (*gatewayv1.PortNumber)(&port),
},
Weight: &weight,
}
Expand Down
18 changes: 15 additions & 3 deletions internal/controller/apisixroute_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -417,10 +418,21 @@ func (r *ApisixRouteReconciler) validateBackends(ctx context.Context, tc *provid
}

if !slices.ContainsFunc(service.Spec.Ports, func(port corev1.ServicePort) bool {
return port.Port == int32(backend.ServicePort.IntValue())
if backend.ServicePort.Type == intstr.Int {
return port.Port == int32(backend.ServicePort.IntValue())
}

if backend.ServicePort.Type == intstr.String {
return port.Name == backend.ServicePort.StrVal
}
return false
}) {
r.Log.Error(errors.New("port not found in service"), "Service", serviceNN, "port", backend.ServicePort.String())
continue
if backend.ServicePort.Type == intstr.Int {
r.Log.Error(errors.New("port not found in service"), "Service", serviceNN, "port", backend.ServicePort.IntValue())
} else {
r.Log.Error(errors.New("named port not found in service"), "Service", serviceNN, "port", backend.ServicePort.StrVal)
}
return nil
}
tc.Services[serviceNN] = &service

Expand Down
133 changes: 94 additions & 39 deletions test/e2e/crds/v2/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var _ = Describe("Test ApisixRoute", Label("apisix.apache.org", "v2", "apisixrou

Context("Test ApisixRoute", func() {

It("Basic tests", func() {
Context("Basic tests", func() {
const apisixRouteSpec = `
apiVersion: apisix.apache.org/v2
kind: ApisixRoute
Expand All @@ -81,56 +81,111 @@ spec:
- serviceName: httpbin-service-e2e-test
servicePort: 80
`
request := func(path string) int {
return s.NewAPISIXClient().GET(path).WithHost("httpbin").Expect().Raw().StatusCode
}

By("apply ApisixRoute")
var apisixRoute apiv2.ApisixRoute
applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"},
&apisixRoute, fmt.Sprintf(apisixRouteSpec, s.Namespace(), s.Namespace(), "/get"))
const apisixRouteSpecWithNameServiceAndGranularity = `
apiVersion: apisix.apache.org/v2
kind: ApisixRoute
metadata:
name: default
namespace: %s
spec:
ingressClassName: %s
http:
- name: rule0
match:
hosts:
- httpbin
paths:
- %s
backends:
- serviceName: httpbin-service-e2e-test
servicePort: http
resolveGranularity: service
`

By("verify ApisixRoute works")
Eventually(request).WithArguments("/get").WithTimeout(20 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusOK))
const apisixRouteSpecWithNameServicePort = `
apiVersion: apisix.apache.org/v2
kind: ApisixRoute
metadata:
name: default
namespace: %s
spec:
ingressClassName: %s
http:
- name: rule0
match:
hosts:
- httpbin
paths:
- %s
backends:
- serviceName: httpbin-service-e2e-test
servicePort: http
`
test := func(apisixRouteSpec string) {
request := func(path string) int {
return s.NewAPISIXClient().GET(path).WithHost("httpbin").Expect().Raw().StatusCode
}

By("update ApisixRoute")
applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"},
&apisixRoute, fmt.Sprintf(apisixRouteSpec, s.Namespace(), s.Namespace(), "/headers"))
Eventually(request).WithArguments("/get").WithTimeout(20 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusNotFound))
s.NewAPISIXClient().GET("/headers").WithHost("httpbin").Expect().Status(http.StatusOK)
By("apply ApisixRoute")
var apisixRoute apiv2.ApisixRoute
applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"},
&apisixRoute, fmt.Sprintf(apisixRouteSpec, s.Namespace(), s.Namespace(), "/get"))

By("delete ApisixRoute")
err := s.DeleteResource("ApisixRoute", "default")
Expect(err).ShouldNot(HaveOccurred(), "deleting ApisixRoute")
Eventually(request).WithArguments("/headers").WithTimeout(20 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusNotFound))
By("verify ApisixRoute works")
Eventually(request).WithArguments("/get").WithTimeout(20 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusOK))

By("request /metrics endpoint from controller")
By("update ApisixRoute")
applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"},
&apisixRoute, fmt.Sprintf(apisixRouteSpec, s.Namespace(), s.Namespace(), "/headers"))
Eventually(request).WithArguments("/get").WithTimeout(20 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusNotFound))
s.NewAPISIXClient().GET("/headers").WithHost("httpbin").Expect().Status(http.StatusOK)

// Get the metrics service endpoint
metricsURL := s.GetMetricsEndpoint()
By("delete ApisixRoute")
err := s.DeleteResource("ApisixRoute", "default")
Expect(err).ShouldNot(HaveOccurred(), "deleting ApisixRoute")
Eventually(request).WithArguments("/headers").WithTimeout(20 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusNotFound))

By("verify metrics content")
resp, err := http.Get(metricsURL)
Expect(err).ShouldNot(HaveOccurred(), "request metrics endpoint")
defer func() {
_ = resp.Body.Close()
}()
By("request /metrics endpoint from controller")

Expect(resp.StatusCode).Should(Equal(http.StatusOK))
// Get the metrics service endpoint
metricsURL := s.GetMetricsEndpoint()

body, err := io.ReadAll(resp.Body)
Expect(err).ShouldNot(HaveOccurred(), "read metrics response")
By("verify metrics content")
resp, err := http.Get(metricsURL)
Expect(err).ShouldNot(HaveOccurred(), "request metrics endpoint")
defer func() {
_ = resp.Body.Close()
}()

bodyStr := string(body)
Expect(resp.StatusCode).Should(Equal(http.StatusOK))

// Verify prometheus format
Expect(resp.Header.Get("Content-Type")).Should(ContainSubstring("text/plain; version=0.0.4; charset=utf-8"))
body, err := io.ReadAll(resp.Body)
Expect(err).ShouldNot(HaveOccurred(), "read metrics response")

// Verify specific metrics from metrics.go exist
Expect(bodyStr).Should(ContainSubstring("apisix_ingress_adc_sync_duration_seconds"))
Expect(bodyStr).Should(ContainSubstring("apisix_ingress_adc_sync_total"))
Expect(bodyStr).Should(ContainSubstring("apisix_ingress_status_update_queue_length"))
Expect(bodyStr).Should(ContainSubstring("apisix_ingress_file_io_duration_seconds"))
bodyStr := string(body)

// Verify prometheus format
Expect(resp.Header.Get("Content-Type")).Should(ContainSubstring("text/plain; version=0.0.4; charset=utf-8"))

// Verify specific metrics from metrics.go exist
Expect(bodyStr).Should(ContainSubstring("apisix_ingress_adc_sync_duration_seconds"))
Expect(bodyStr).Should(ContainSubstring("apisix_ingress_adc_sync_total"))
Expect(bodyStr).Should(ContainSubstring("apisix_ingress_status_update_queue_length"))
Expect(bodyStr).Should(ContainSubstring("apisix_ingress_file_io_duration_seconds"))

// Log metrics for debugging
fmt.Printf("Metrics endpoint response:\n%s\n", bodyStr)
}
It("Basic", func() {
test(apisixRouteSpec)
})
It("Basic: with named service port", func() {
test(apisixRouteSpecWithNameServicePort)
})
It("Basic: with named service port and granularity service", func() {
test(apisixRouteSpecWithNameServiceAndGranularity)
})
})

It("Test plugins in ApisixRoute", func() {
Expand Down
Loading