Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions internal/adc/translator/apisixroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"

Expand Down Expand Up @@ -313,6 +314,26 @@ func (t *Translator) buildService(ar *apiv2.ApisixRoute, rule apiv2.ApisixRouteH
return service
}

func getPortFromService(svc *v1.Service, backendSvcPort intstr.IntOrString) (int32, error) {
var port int32
if backendSvcPort.Type == intstr.Int {
port = int32(backendSvcPort.IntValue())
} else {
found := false
for _, servicePort := range svc.Spec.Ports {
if servicePort.Name == backendSvcPort.StrVal {
port = servicePort.Port
found = true
break
}
}
if !found {
return 0, errors.Errorf("named port '%s' not found in service %s", backendSvcPort.StrVal, svc.Name)
}
}
return port, nil
}

func (t *Translator) translateApisixRouteBackendResolveGranularityService(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, error) {
serviceNN := types.NamespacedName{
Namespace: arNN.Namespace,
Expand All @@ -325,10 +346,14 @@ func (t *Translator) translateApisixRouteBackendResolveGranularityService(tctx *
if svc.Spec.ClusterIP == "" {
return nil, errors.Errorf("conflict headless service and backend resolve granularity, ApisixRoute: %s, Service: %s", arNN, serviceNN)
}
port, err := getPortFromService(svc, backend.ServicePort)
if err != nil {
return nil, err
}
return adc.UpstreamNodes{
{
Host: svc.Spec.ClusterIP,
Port: backend.ServicePort.IntValue(),
Port: int(port),
Weight: *cmp.Or(backend.Weight, ptr.To(apiv2.DefaultWeight)),
},
}, nil
Expand All @@ -349,14 +374,26 @@ func (t *Translator) translateApisixRouteStreamBackendResolveGranularity(tctx *p
}

func (t *Translator) translateApisixRouteBackendResolveGranularityEndpoint(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, error) {
serviceNN := types.NamespacedName{
Namespace: arNN.Namespace,
Name: backend.ServiceName,
}
svc, ok := tctx.Services[serviceNN]
if !ok {
return nil, errors.Errorf("service not found, ApisixRoute: %s, Service: %s", arNN, serviceNN)
}
port, err := getPortFromService(svc, backend.ServicePort)
if err != nil {
return nil, err
}
weight := int32(*cmp.Or(backend.Weight, ptr.To(apiv2.DefaultWeight)))
backendRef := gatewayv1.BackendRef{
BackendObjectReference: gatewayv1.BackendObjectReference{
Group: (*gatewayv1.Group)(&apiv2.GroupVersion.Group),
Kind: (*gatewayv1.Kind)(ptr.To("Service")),
Name: gatewayv1.ObjectName(backend.ServiceName),
Namespace: (*gatewayv1.Namespace)(&arNN.Namespace),
Port: (*gatewayv1.PortNumber)(&backend.ServicePort.IntVal),
Port: (*gatewayv1.PortNumber)(&port),
},
Weight: &weight,
}
Expand Down
16 changes: 14 additions & 2 deletions internal/controller/apisixroute_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -464,9 +465,20 @@ func (r *ApisixRouteReconciler) validateHTTPBackend(tctx *provider.TranslateCont
}

if !slices.ContainsFunc(service.Spec.Ports, func(port corev1.ServicePort) bool {
return port.Port == int32(backend.ServicePort.IntValue())
if backend.ServicePort.Type == intstr.Int {
return port.Port == int32(backend.ServicePort.IntValue())
}

if backend.ServicePort.Type == intstr.String {
return port.Name == backend.ServicePort.StrVal
}
return false
}) {
r.Log.Error(errors.New("port not found in service"), "Service", serviceNN, "port", backend.ServicePort.String())
if backend.ServicePort.Type == intstr.Int {
r.Log.Error(errors.New("port not found in service"), "Service", serviceNN, "port", backend.ServicePort.IntValue())
} else {
r.Log.Error(errors.New("named port not found in service"), "Service", serviceNN, "port", backend.ServicePort.StrVal)
}
return nil
}
tctx.Services[serviceNN] = &service
Expand Down
130 changes: 91 additions & 39 deletions test/e2e/crds/v2/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var _ = Describe("Test ApisixRoute", Label("apisix.apache.org", "v2", "apisixrou

Context("Test ApisixRoute", func() {

It("Basic tests", func() {
Context("Basic tests", func() {
const apisixRouteSpec = `
apiVersion: apisix.apache.org/v2
kind: ApisixRoute
Expand All @@ -81,56 +81,108 @@ spec:
- serviceName: httpbin-service-e2e-test
servicePort: 80
`
request := func(path string) int {
return s.NewAPISIXClient().GET(path).WithHost("httpbin").Expect().Raw().StatusCode
}

By("apply ApisixRoute")
var apisixRoute apiv2.ApisixRoute
applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"},
&apisixRoute, fmt.Sprintf(apisixRouteSpec, s.Namespace(), s.Namespace(), "/get"))
const apisixRouteSpecWithNameServiceAndGranularity = `
apiVersion: apisix.apache.org/v2
kind: ApisixRoute
metadata:
name: default
namespace: %s
spec:
ingressClassName: %s
http:
- name: rule0
match:
hosts:
- httpbin
paths:
- %s
backends:
- serviceName: httpbin-service-e2e-test
servicePort: http
resolveGranularity: service
`

By("verify ApisixRoute works")
Eventually(request).WithArguments("/get").WithTimeout(20 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusOK))
const apisixRouteSpecWithNameServicePort = `
apiVersion: apisix.apache.org/v2
kind: ApisixRoute
metadata:
name: default
namespace: %s
spec:
ingressClassName: %s
http:
- name: rule0
match:
hosts:
- httpbin
paths:
- %s
backends:
- serviceName: httpbin-service-e2e-test
servicePort: http
`
test := func(apisixRouteSpec string) {
request := func(path string) int {
return s.NewAPISIXClient().GET(path).WithHost("httpbin").Expect().Raw().StatusCode
}

By("update ApisixRoute")
applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"},
&apisixRoute, fmt.Sprintf(apisixRouteSpec, s.Namespace(), s.Namespace(), "/headers"))
Eventually(request).WithArguments("/get").WithTimeout(20 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusNotFound))
s.NewAPISIXClient().GET("/headers").WithHost("httpbin").Expect().Status(http.StatusOK)
By("apply ApisixRoute")
var apisixRoute apiv2.ApisixRoute
applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"},
&apisixRoute, fmt.Sprintf(apisixRouteSpec, s.Namespace(), s.Namespace(), "/get"))

By("delete ApisixRoute")
err := s.DeleteResource("ApisixRoute", "default")
Expect(err).ShouldNot(HaveOccurred(), "deleting ApisixRoute")
Eventually(request).WithArguments("/headers").WithTimeout(20 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusNotFound))
By("verify ApisixRoute works")
Eventually(request).WithArguments("/get").WithTimeout(20 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusOK))

By("request /metrics endpoint from controller")
By("update ApisixRoute")
applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"},
&apisixRoute, fmt.Sprintf(apisixRouteSpec, s.Namespace(), s.Namespace(), "/headers"))
Eventually(request).WithArguments("/get").WithTimeout(20 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusNotFound))
s.NewAPISIXClient().GET("/headers").WithHost("httpbin").Expect().Status(http.StatusOK)

// Get the metrics service endpoint
metricsURL := s.GetMetricsEndpoint()
By("delete ApisixRoute")
err := s.DeleteResource("ApisixRoute", "default")
Expect(err).ShouldNot(HaveOccurred(), "deleting ApisixRoute")
Eventually(request).WithArguments("/headers").WithTimeout(20 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusNotFound))

By("verify metrics content")
resp, err := http.Get(metricsURL)
Expect(err).ShouldNot(HaveOccurred(), "request metrics endpoint")
defer func() {
_ = resp.Body.Close()
}()
By("request /metrics endpoint from controller")

Expect(resp.StatusCode).Should(Equal(http.StatusOK))
// Get the metrics service endpoint
metricsURL := s.GetMetricsEndpoint()

body, err := io.ReadAll(resp.Body)
Expect(err).ShouldNot(HaveOccurred(), "read metrics response")
By("verify metrics content")
resp, err := http.Get(metricsURL)
Expect(err).ShouldNot(HaveOccurred(), "request metrics endpoint")
defer func() {
_ = resp.Body.Close()
}()

bodyStr := string(body)
Expect(resp.StatusCode).Should(Equal(http.StatusOK))

// Verify prometheus format
Expect(resp.Header.Get("Content-Type")).Should(ContainSubstring("text/plain; version=0.0.4; charset=utf-8"))
body, err := io.ReadAll(resp.Body)
Expect(err).ShouldNot(HaveOccurred(), "read metrics response")

// Verify specific metrics from metrics.go exist
Expect(bodyStr).Should(ContainSubstring("apisix_ingress_adc_sync_duration_seconds"))
Expect(bodyStr).Should(ContainSubstring("apisix_ingress_adc_sync_total"))
Expect(bodyStr).Should(ContainSubstring("apisix_ingress_status_update_queue_length"))
Expect(bodyStr).Should(ContainSubstring("apisix_ingress_file_io_duration_seconds"))
bodyStr := string(body)

// Verify prometheus format
Expect(resp.Header.Get("Content-Type")).Should(ContainSubstring("text/plain; version=0.0.4; charset=utf-8"))

// Verify specific metrics from metrics.go exist
Expect(bodyStr).Should(ContainSubstring("apisix_ingress_adc_sync_duration_seconds"))
Expect(bodyStr).Should(ContainSubstring("apisix_ingress_adc_sync_total"))
Expect(bodyStr).Should(ContainSubstring("apisix_ingress_status_update_queue_length"))
Expect(bodyStr).Should(ContainSubstring("apisix_ingress_file_io_duration_seconds"))
}
It("Basic", func() {
test(apisixRouteSpec)
})
It("Basic: with named service port", func() {
test(apisixRouteSpecWithNameServicePort)
})
It("Basic: with named service port and granularity service", func() {
test(apisixRouteSpecWithNameServiceAndGranularity)
})
})

It("Test plugins in ApisixRoute", func() {
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/scaffold/httpbin.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ spec:
selector:
app: httpbin-deployment-e2e-test
ports:
- port: 80
- name: http
port: 80
protocol: TCP
targetPort: 80
type: ClusterIP
Expand Down
Loading