Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions internal/controller/apisixroute_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package controller
import (
"cmp"
"context"
"errors"
"fmt"
"slices"

Expand Down Expand Up @@ -209,30 +210,27 @@ func (r *ApisixRouteReconciler) processApisixRoute(ctx context.Context, tc *prov
backends[serviceNN] = struct{}{}

if err := r.Get(ctx, serviceNN, &service); err != nil {
return ReasonError{
Reason: string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf("failed to get Service: %s", serviceNN),
if err := client.IgnoreNotFound(err); err == nil {
r.Log.Error(errors.New("service not found"), "Service", serviceNN)
continue
}
return err
}
if service.Spec.Type == corev1.ServiceTypeExternalName {
tc.Services[serviceNN] = &service
continue
}

if backend.ResolveGranularity == "service" && service.Spec.ClusterIP == "" {
return ReasonError{
Reason: string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf("service %s has no cluster IP", serviceNN),
}
r.Log.Error(errors.New("service has no ClusterIP"), "Service", serviceNN, "ResolveGranularity", backend.ResolveGranularity)
continue
}

if !slices.ContainsFunc(service.Spec.Ports, func(port corev1.ServicePort) bool {
return port.Port == int32(backend.ServicePort.IntValue())
}) {
return ReasonError{
Reason: string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf("port %s not found in service %s", backend.ServicePort.String(), serviceNN),
}
r.Log.Error(errors.New("port not found in service"), "Service", serviceNN, "port", backend.ServicePort.String())
continue
}
tc.Services[serviceNN] = &service

Expand Down
75 changes: 57 additions & 18 deletions internal/provider/adc/translator/apisixroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"encoding/json"
"fmt"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
Expand All @@ -26,8 +27,9 @@ import (
apiv2 "github.com/apache/apisix-ingress-controller/api/v2"
"github.com/apache/apisix-ingress-controller/internal/controller/label"
"github.com/apache/apisix-ingress-controller/internal/provider"
"github.com/apache/apisix-ingress-controller/internal/utils"
"github.com/apache/apisix-ingress-controller/pkg/id"
"github.com/apache/apisix-ingress-controller/pkg/utils"
pkgutils "github.com/apache/apisix-ingress-controller/pkg/utils"
)

func (t *Translator) TranslateApisixRoute(tctx *provider.TranslateContext, ar *apiv2.ApisixRoute) (result *TranslateResult, err error) {
Expand Down Expand Up @@ -58,7 +60,7 @@ func (t *Translator) TranslateApisixRoute(tctx *provider.TranslateContext, ar *a
if plugin.SecretRef != "" {
if secret, ok := tctx.Secrets[types.NamespacedName{Namespace: ar.Namespace, Name: plugin.SecretRef}]; ok {
for key, value := range secret.Data {
utils.InsertKeyInMap(key, string(value), config)
pkgutils.InsertKeyInMap(key, string(value), config)
}
}
}
Expand Down Expand Up @@ -115,25 +117,26 @@ func (t *Translator) TranslateApisixRoute(tctx *provider.TranslateContext, ar *a
// translate to adc.Upstream
var backendErr error
for _, backend := range rule.Backends {
weight := int32(*cmp.Or(backend.Weight, ptr.To(apiv2.DefaultWeight)))
backendRef := gatewayv1.BackendRef{
BackendObjectReference: gatewayv1.BackendObjectReference{
Group: (*gatewayv1.Group)(&apiv2.GroupVersion.Group),
Kind: (*gatewayv1.Kind)(ptr.To("Service")),
Name: gatewayv1.ObjectName(backend.ServiceName),
Namespace: (*gatewayv1.Namespace)(&ar.Namespace),
Port: (*gatewayv1.PortNumber)(&backend.ServicePort.IntVal),
},
Weight: &weight,
}
upNodes, err := t.translateBackendRef(tctx, backendRef)
if err != nil {
backendErr = err
continue
var (
upNodes adc.UpstreamNodes
)
if backend.ResolveGranularity == "service" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does "service" consider defining a constant?

upNodes, backendErr = t.translateApisixRouteBackendResolveGranularityService(tctx, utils.NamespacedName(ar), backend)
if backendErr != nil {
t.Log.Error(backendErr, "failed to translate ApisixRoute backend with ResolveGranularity Service")
continue
}
} else {
upNodes, backendErr = t.translateApisixRouteBackendResolveGranularityEndpoint(tctx, utils.NamespacedName(ar), backend)
if backendErr != nil {
t.Log.Error(backendErr, "failed to translate ApisixRoute backend with ResolveGranularity Endpoint")
continue
}
}
t.AttachBackendTrafficPolicyToUpstream(backendRef, tctx.BackendTrafficPolicies, upstream)

upstream.Nodes = append(upstream.Nodes, upNodes...)
}

//nolint:staticcheck
if len(rule.Backends) == 0 && len(rule.Upstreams) > 0 {
// FIXME: when the API ApisixUpstream is supported
Expand Down Expand Up @@ -164,3 +167,39 @@ func (t *Translator) TranslateApisixRoute(tctx *provider.TranslateContext, ar *a

return result, nil
}

func (t *Translator) translateApisixRouteBackendResolveGranularityService(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, error) {
serviceNN := types.NamespacedName{
Namespace: arNN.Namespace,
Name: backend.ServiceName,
}
svc, ok := tctx.Services[serviceNN]
if !ok {
return nil, errors.Errorf("service not found, ApisixRoute: %s, Service: %s", arNN, serviceNN)
}
if svc.Spec.ClusterIP == "" {
return nil, errors.Errorf("conflict headless service and backend resolve granularity, ApisixRoute: %s, Service: %s", arNN, serviceNN)
}
return adc.UpstreamNodes{
{
Host: svc.Spec.ClusterIP,
Port: backend.ServicePort.IntValue(),
Weight: *cmp.Or(backend.Weight, ptr.To(apiv2.DefaultWeight)),
},
}, nil
}

func (t *Translator) translateApisixRouteBackendResolveGranularityEndpoint(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, error) {
weight := int32(*cmp.Or(backend.Weight, ptr.To(apiv2.DefaultWeight)))
backendRef := gatewayv1.BackendRef{
BackendObjectReference: gatewayv1.BackendObjectReference{
Group: (*gatewayv1.Group)(&apiv2.GroupVersion.Group),
Kind: (*gatewayv1.Kind)(ptr.To("Service")),
Name: gatewayv1.ObjectName(backend.ServiceName),
Namespace: (*gatewayv1.Namespace)(&arNN.Namespace),
Port: (*gatewayv1.PortNumber)(&backend.ServicePort.IntVal),
},
Weight: &weight,
}
return t.translateBackendRef(tctx, backendRef)
}
79 changes: 74 additions & 5 deletions test/e2e/apisix/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package apisix

import (
"fmt"
"net"
"net/http"
"time"

Expand Down Expand Up @@ -213,12 +214,80 @@ spec:
s.NewAPISIXClient().GET("/get").Expect().Status(http.StatusNotFound)
})

PIt("Test ApisixRoute resolveGranularity", func() {
// The `.Spec.HTTP[0].Backends[0].ResolveGranularity` can be "endpoints" or "service",
// when set to "endpoints", the pod ips will be used; or the service ClusterIP or ExternalIP will be used when it set to "service",
It("Test ApisixRoute service not found", func() {
const apisixRouteSpec = `
apiVersion: apisix.apache.org/v2
kind: ApisixRoute
metadata:
name: default
spec:
ingressClassName: apisix
http:
- name: rule0
match:
hosts:
- httpbin
paths:
- %s
backends:
- serviceName: service-not-found
servicePort: 80
`
request := func(path string) int {
return s.NewAPISIXClient().GET(path).WithHost("httpbin").Expect().Raw().StatusCode
}

// In the current implementation, pod ips are always used.
// So the case is pending for now.
By("apply ApisixRoute")
var apisixRoute apiv2.ApisixRoute
applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, &apisixRoute, fmt.Sprintf(apisixRouteSpec, "/get"))

By("when there is no replica got 500 by fault-injection")
err := s.ScaleHTTPBIN(0)
Expect(err).ShouldNot(HaveOccurred(), "scale httpbin to 0")
Eventually(request).WithArguments("/get").WithTimeout(8 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusInternalServerError))
s.NewAPISIXClient().GET("/get").WithHost("httpbin").Expect().Body().IsEqual("No existing backendRef provided")
})

It("Test ApisixRoute resolveGranularity", func() {
const apisixRouteSpec = `
apiVersion: apisix.apache.org/v2
kind: ApisixRoute
metadata:
name: default
spec:
ingressClassName: apisix
http:
- name: rule0
match:
paths:
- /*
backends:
- serviceName: httpbin-service-e2e-test
servicePort: 80
resolveGranularity: service
plugins:
- name: response-rewrite
enable: true
config:
headers:
set:
"X-Upstream-IP": "$upstream_addr"
`
By("apply ApisixRoute")
var apisixRoute apiv2.ApisixRoute
applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, &apisixRoute, apisixRouteSpec)

By("verify ApisixRoute works")
request := func() int {
return s.NewAPISIXClient().GET("/get").Expect().Raw().StatusCode
}
Eventually(request).WithTimeout(8 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusOK))

By("assert that the request is proxied to the Service ClusterIP")
service, err := s.GetServiceByName("httpbin-service-e2e-test")
Expect(err).ShouldNot(HaveOccurred(), "get service")
clusterIP := net.JoinHostPort(service.Spec.ClusterIP, "80")
s.NewAPISIXClient().GET("/get").Expect().Header("X-Upstream-IP").IsEqual(clusterIP)
})

PIt("Test ApisixRoute subset", func() {
Expand Down
3 changes: 0 additions & 3 deletions test/e2e/framework/assertion.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,6 @@ func PollUntilHTTPRoutePolicyHaveStatus(cli client.Client, timeout time.Duration

func APIv2MustHaveCondition(t testing.TestingT, cli client.Client, timeout time.Duration, nn types.NamespacedName, obj client.Object, cond metav1.Condition) {
f := func(object client.Object) bool {
if !apiv2.Is(object) {
return false
}
value := reflect.Indirect(reflect.ValueOf(object))
status, ok := value.FieldByName("Status").Interface().(apiv2.ApisixStatus)
if !ok {
Expand Down
Loading