diff --git a/api/adc/types.go b/api/adc/types.go index 472a12a1..fd59546f 100644 --- a/api/adc/types.go +++ b/api/adc/types.go @@ -491,10 +491,13 @@ func ComposeRouteName(namespace, name string, rule string) string { // ComposeStreamRouteName uses namespace, name and rule name to compose // the stream_route name. -func ComposeStreamRouteName(namespace, name string, rule string) string { +func ComposeStreamRouteName(namespace, name string, rule string, typ string) string { + if typ == "" { + typ = "TCP" + } // FIXME Use sync.Pool to reuse this buffer if the upstream // name composing code path is hot. - p := make([]byte, 0, len(namespace)+len(name)+len(rule)+6) + p := make([]byte, 0, len(namespace)+len(name)+len(rule)+len(typ)+3) buf := bytes.NewBuffer(p) buf.WriteString(namespace) @@ -502,7 +505,8 @@ func ComposeStreamRouteName(namespace, name string, rule string) string { buf.WriteString(name) buf.WriteByte('_') buf.WriteString(rule) - buf.WriteString("_tcp") + buf.WriteByte('_') + buf.WriteString(typ) return buf.String() } @@ -545,8 +549,8 @@ func ComposeServicesNameWithScheme(namespace, name string, rule string, scheme s return buf.String() } -func ComposeServiceNameWithStream(namespace, name string, rule string) string { - return ComposeServicesNameWithScheme(namespace, name, rule, "stream") +func ComposeServiceNameWithStream(namespace, name string, rule, typ string) string { + return ComposeServicesNameWithScheme(namespace, name, rule, typ) } func ComposeConsumerName(namespace, name string) string { diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 71b8a94b..d2bd08a0 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -94,6 +94,7 @@ rules: - httproutes/status - referencegrants/status - tcproutes/status + - udproutes/status verbs: - get - update @@ -103,8 +104,9 @@ rules: - gateways - grpcroutes - httproutes - - tcproutes - referencegrants + - tcproutes + - udproutes verbs: - get - list diff --git a/docs/en/latest/concepts/gateway-api.md b/docs/en/latest/concepts/gateway-api.md index 20240f10..4efb8a76 100644 --- a/docs/en/latest/concepts/gateway-api.md +++ b/docs/en/latest/concepts/gateway-api.md @@ -52,14 +52,14 @@ By supporting Gateway API, the APISIX Ingress controller can realize richer func | ReferenceGrant | Supported | Not supported | Not supported | v1beta1 | | TLSRoute | Not supported | Not supported | Not supported | v1alpha2 | | TCPRoute | Supported | Supported | Not supported | v1alpha2 | -| UDPRoute | Not supported | Not supported | Not supported | v1alpha2 | +| UDPRoute | Supported | Supported | Not supported | v1alpha2 | | BackendTLSPolicy | Not supported | Not supported | Not supported | v1alpha3 | ## Examples For configuration examples, see the Gateway API tabs in [Configuration Examples](../reference/example.md). -For a complete list of configuration options, refer to the [Gateway API Reference](https://gateway-api.sigs.k8s.io/reference/spec/). Be aware that some fields are not supported, or partially supported. +For a complete list of configuration options, refer to the [Gateway API Reference](https://gateway-api.sigs.k8s.io/reference/main/spec/). Be aware that some fields are not supported, or partially supported. ## Unsupported / Partially Supported Fields diff --git a/internal/adc/translator/apisixroute.go b/internal/adc/translator/apisixroute.go index 1cdcb520..77a9db54 100644 --- a/internal/adc/translator/apisixroute.go +++ b/internal/adc/translator/apisixroute.go @@ -446,14 +446,14 @@ func (t *Translator) translateStreamRule(tctx *provider.TranslateContext, ar *ap t.loadRoutePlugins(tctx, ar, part.Plugins, plugins) sr := adc.NewDefaultStreamRoute() - sr.Name = adc.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name) + sr.Name = adc.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name, part.Protocol) sr.ID = id.GenID(sr.Name) sr.ServerPort = part.Match.IngressPort sr.SNI = part.Match.Host sr.Plugins = plugins svc := adc.NewDefaultService() - svc.Name = adc.ComposeServiceNameWithStream(ar.Namespace, ar.Name, part.Name) + svc.Name = adc.ComposeServiceNameWithStream(ar.Namespace, ar.Name, part.Name, part.Protocol) svc.ID = id.GenID(svc.Name) svc.StreamRoutes = append(svc.StreamRoutes, sr) diff --git a/internal/adc/translator/tcproute.go b/internal/adc/translator/tcproute.go index 5c97f2dc..ffef927c 100644 --- a/internal/adc/translator/tcproute.go +++ b/internal/adc/translator/tcproute.go @@ -49,7 +49,7 @@ func (t *Translator) TranslateTCPRoute(tctx *provider.TranslateContext, tcpRoute for ruleIndex, rule := range rules { service := adctypes.NewDefaultService() service.Labels = labels - service.Name = adctypes.ComposeServiceNameWithStream(tcpRoute.Namespace, tcpRoute.Name, fmt.Sprintf("%d", ruleIndex)) + service.Name = adctypes.ComposeServiceNameWithStream(tcpRoute.Namespace, tcpRoute.Name, fmt.Sprintf("%d", ruleIndex), "TCP") service.ID = id.GenID(service.Name) var ( upstreams = make([]*adctypes.Upstream, 0) @@ -151,7 +151,7 @@ func (t *Translator) TranslateTCPRoute(tctx *provider.TranslateContext, tcpRoute } } streamRoute := adctypes.NewDefaultStreamRoute() - streamRouteName := adctypes.ComposeStreamRouteName(tcpRoute.Namespace, tcpRoute.Name, fmt.Sprintf("%d", ruleIndex)) + streamRouteName := adctypes.ComposeStreamRouteName(tcpRoute.Namespace, tcpRoute.Name, fmt.Sprintf("%d", ruleIndex), "TCP") streamRoute.Name = streamRouteName streamRoute.ID = id.GenID(streamRouteName) streamRoute.Labels = labels diff --git a/internal/adc/translator/udproute.go b/internal/adc/translator/udproute.go new file mode 100644 index 00000000..983130cb --- /dev/null +++ b/internal/adc/translator/udproute.go @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package translator + +import ( + "fmt" + + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + + adctypes "github.com/apache/apisix-ingress-controller/api/adc" + apiv2 "github.com/apache/apisix-ingress-controller/api/v2" + "github.com/apache/apisix-ingress-controller/internal/controller/label" + "github.com/apache/apisix-ingress-controller/internal/id" + "github.com/apache/apisix-ingress-controller/internal/provider" + "github.com/apache/apisix-ingress-controller/internal/types" +) + +func (t *Translator) TranslateUDPRoute(tctx *provider.TranslateContext, udpRoute *gatewayv1alpha2.UDPRoute) (*TranslateResult, error) { + result := &TranslateResult{} + rules := udpRoute.Spec.Rules + labels := label.GenLabel(udpRoute) + for ruleIndex, rule := range rules { + service := adctypes.NewDefaultService() + service.Labels = labels + service.Name = adctypes.ComposeServiceNameWithStream(udpRoute.Namespace, udpRoute.Name, fmt.Sprintf("%d", ruleIndex), "UDP") + service.ID = id.GenID(service.Name) + var ( + upstreams = make([]*adctypes.Upstream, 0) + weightedUpstreams = make([]adctypes.TrafficSplitConfigRuleWeightedUpstream, 0) + ) + for _, backend := range rule.BackendRefs { + if backend.Namespace == nil { + namespace := gatewayv1.Namespace(udpRoute.Namespace) + backend.Namespace = &namespace + } + upstream := newDefaultUpstreamWithoutScheme() + upNodes, err := t.translateBackendRef(tctx, backend, DefaultEndpointFilter) + if err != nil { + continue + } + if len(upNodes) == 0 { + continue + } + // TODO: Confirm BackendTrafficPolicy attachment with e2e test case. + t.AttachBackendTrafficPolicyToUpstream(backend, tctx.BackendTrafficPolicies, upstream) + upstream.Nodes = upNodes + var ( + kind string + port int32 + ) + if backend.Kind == nil { + kind = types.KindService + } else { + kind = string(*backend.Kind) + } + if backend.Port != nil { + port = int32(*backend.Port) + } + namespace := string(*backend.Namespace) + name := string(backend.Name) + upstreamName := adctypes.ComposeUpstreamNameForBackendRef(kind, namespace, name, port) + upstream.Name = upstreamName + upstream.ID = id.GenID(upstreamName) + upstreams = append(upstreams, upstream) + } + + // Handle multiple backends with traffic-split plugin + if len(upstreams) == 0 { + // Create a default upstream if no valid backends + upstream := adctypes.NewDefaultUpstream() + service.Upstream = upstream + } else if len(upstreams) == 1 { + // Single backend - use directly as service upstream + service.Upstream = upstreams[0] + // remove the id and name of the service.upstream, adc schema does not need id and name for it + service.Upstream.ID = "" + service.Upstream.Name = "" + } else { + // Multiple backends - use traffic-split plugin + service.Upstream = upstreams[0] + // remove the id and name of the service.upstream, adc schema does not need id and name for it + service.Upstream.ID = "" + service.Upstream.Name = "" + + upstreams = upstreams[1:] + + if len(upstreams) > 0 { + service.Upstreams = upstreams + } + + // Set weight in traffic-split for the default upstream + weight := apiv2.DefaultWeight + if rule.BackendRefs[0].Weight != nil { + weight = int(*rule.BackendRefs[0].Weight) + } + weightedUpstreams = append(weightedUpstreams, adctypes.TrafficSplitConfigRuleWeightedUpstream{ + Weight: weight, + }) + + // Set other upstreams in traffic-split using upstream_id + for i, upstream := range upstreams { + weight := apiv2.DefaultWeight + // get weight from the backend refs starting from the second backend + if i+1 < len(rule.BackendRefs) && rule.BackendRefs[i+1].Weight != nil { + weight = int(*rule.BackendRefs[i+1].Weight) + } + weightedUpstreams = append(weightedUpstreams, adctypes.TrafficSplitConfigRuleWeightedUpstream{ + UpstreamID: upstream.ID, + Weight: weight, + }) + } + + if len(weightedUpstreams) > 0 { + if service.Plugins == nil { + service.Plugins = make(map[string]any) + } + service.Plugins["traffic-split"] = &adctypes.TrafficSplitConfig{ + Rules: []adctypes.TrafficSplitConfigRule{ + { + WeightedUpstreams: weightedUpstreams, + }, + }, + } + } + } + streamRoute := adctypes.NewDefaultStreamRoute() + streamRouteName := adctypes.ComposeStreamRouteName(udpRoute.Namespace, udpRoute.Name, fmt.Sprintf("%d", ruleIndex), "UDP") + streamRoute.Name = streamRouteName + streamRoute.ID = id.GenID(streamRouteName) + streamRoute.Labels = labels + // TODO: support remote_addr, server_addr, sni, server_port + service.StreamRoutes = append(service.StreamRoutes, streamRoute) + result.Services = append(result.Services, service) + } + return result, nil +} diff --git a/internal/controller/indexer/indexer.go b/internal/controller/indexer/indexer.go index 386ccd1a..d8b7b61d 100644 --- a/internal/controller/indexer/indexer.go +++ b/internal/controller/indexer/indexer.go @@ -64,6 +64,7 @@ func SetupIndexer(mgr ctrl.Manager) error { &gatewayv1.HTTPRoute{}: setupHTTPRouteIndexer, &gatewayv1.GRPCRoute{}: setupGRPCRouteIndexer, &gatewayv1alpha2.TCPRoute{}: setupTCPRouteIndexer, + &gatewayv1alpha2.UDPRoute{}: setupUDPRouteIndexer, &gatewayv1.GatewayClass{}: setupGatewayClassIndexer, &v1alpha1.Consumer{}: setupConsumerIndexer, &networkingv1.Ingress{}: setupIngressIndexer, @@ -279,6 +280,28 @@ func setupTCPRouteIndexer(mgr ctrl.Manager) error { } return nil } + +func setupUDPRouteIndexer(mgr ctrl.Manager) error { + if err := mgr.GetFieldIndexer().IndexField( + context.Background(), + &gatewayv1alpha2.UDPRoute{}, + ParentRefs, + UDPRouteParentRefsIndexFunc, + ); err != nil { + return err + } + + if err := mgr.GetFieldIndexer().IndexField( + context.Background(), + &gatewayv1alpha2.UDPRoute{}, + ServiceIndexRef, + UDPRouteServiceIndexFunc, + ); err != nil { + return err + } + return nil +} + func setupIngressClassIndexer(mgr ctrl.Manager) error { // create IngressClass index if err := mgr.GetFieldIndexer().IndexField( @@ -578,6 +601,19 @@ func TCPRouteParentRefsIndexFunc(rawObj client.Object) []string { return keys } +func UDPRouteParentRefsIndexFunc(rawObj client.Object) []string { + ur := rawObj.(*gatewayv1alpha2.UDPRoute) + keys := make([]string, 0, len(ur.Spec.ParentRefs)) + for _, ref := range ur.Spec.ParentRefs { + ns := ur.GetNamespace() + if ref.Namespace != nil { + ns = string(*ref.Namespace) + } + keys = append(keys, GenIndexKey(ns, string(ref.Name))) + } + return keys +} + func HTTPRouteServiceIndexFunc(rawObj client.Object) []string { hr := rawObj.(*gatewayv1.HTTPRoute) keys := make([]string, 0, len(hr.Spec.Rules)) @@ -614,6 +650,24 @@ func TCPPRouteServiceIndexFunc(rawObj client.Object) []string { return keys } +func UDPRouteServiceIndexFunc(rawObj client.Object) []string { + ur := rawObj.(*gatewayv1alpha2.UDPRoute) + keys := make([]string, 0, len(ur.Spec.Rules)) + for _, rule := range ur.Spec.Rules { + for _, backend := range rule.BackendRefs { + namespace := ur.GetNamespace() + if backend.Kind != nil && *backend.Kind != internaltypes.KindService { + continue + } + if backend.Namespace != nil { + namespace = string(*backend.Namespace) + } + keys = append(keys, GenIndexKey(namespace, string(backend.Name))) + } + } + return keys +} + func ApisixRouteServiceIndexFunc(cli client.Client) func(client.Object) []string { return func(obj client.Object) (keys []string) { ar := obj.(*apiv2.ApisixRoute) diff --git a/internal/controller/udproute_controller.go b/internal/controller/udproute_controller.go new file mode 100644 index 00000000..88b11367 --- /dev/null +++ b/internal/controller/udproute_controller.go @@ -0,0 +1,505 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package controller + +import ( + "cmp" + "context" + "fmt" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + k8stypes "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + "sigs.k8s.io/gateway-api/apis/v1beta1" + + "github.com/apache/apisix-ingress-controller/api/v1alpha1" + "github.com/apache/apisix-ingress-controller/internal/controller/indexer" + "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" + "github.com/apache/apisix-ingress-controller/internal/provider" + "github.com/apache/apisix-ingress-controller/internal/types" + "github.com/apache/apisix-ingress-controller/internal/utils" +) + +// UDPRouteReconciler reconciles a UDPRoute object. +type UDPRouteReconciler struct { //nolint:revive + client.Client + Scheme *runtime.Scheme + + Log logr.Logger + + Provider provider.Provider + + Updater status.Updater + Readier readiness.ReadinessManager +} + +// SetupWithManager sets up the controller with the Manager. +func (r *UDPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { + + bdr := ctrl.NewControllerManagedBy(mgr). + For(&gatewayv1alpha2.UDPRoute{}). + WithEventFilter(predicate.GenerationChangedPredicate{}). + Watches(&discoveryv1.EndpointSlice{}, + handler.EnqueueRequestsFromMapFunc(r.listUDPRoutesByServiceRef), + ). + Watches(&gatewayv1.Gateway{}, + handler.EnqueueRequestsFromMapFunc(r.listUDPRoutesForGateway), + builder.WithPredicates( + predicate.Funcs{ + GenericFunc: func(e event.GenericEvent) bool { + return false + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return false + }, + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + return true + }, + }, + ), + ). + Watches(&v1alpha1.BackendTrafficPolicy{}, + handler.EnqueueRequestsFromMapFunc(r.listUDPRoutesForBackendTrafficPolicy), + ). + Watches(&v1alpha1.GatewayProxy{}, + handler.EnqueueRequestsFromMapFunc(r.listUDPRoutesForGatewayProxy), + ) + + if GetEnableReferenceGrant() { + bdr.Watches(&v1beta1.ReferenceGrant{}, + handler.EnqueueRequestsFromMapFunc(r.listUDPRoutesForReferenceGrant), + builder.WithPredicates(referenceGrantPredicates(KindUDPRoute)), + ) + } + + return bdr.Complete(r) +} + +func (r *UDPRouteReconciler) listUDPRoutesForBackendTrafficPolicy(ctx context.Context, obj client.Object) []reconcile.Request { + policy, ok := obj.(*v1alpha1.BackendTrafficPolicy) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to BackendTrafficPolicy") + return nil + } + + udprouteList := []gatewayv1alpha2.UDPRoute{} + for _, targetRef := range policy.Spec.TargetRefs { + service := &corev1.Service{} + if err := r.Get(ctx, client.ObjectKey{ + Namespace: policy.Namespace, + Name: string(targetRef.Name), + }, service); err != nil { + if client.IgnoreNotFound(err) != nil { + r.Log.Error(err, "failed to get service", "namespace", policy.Namespace, "name", targetRef.Name) + } + continue + } + udprList := &gatewayv1alpha2.UDPRouteList{} + if err := r.List(ctx, udprList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(policy.Namespace, string(targetRef.Name)), + }); err != nil { + r.Log.Error(err, "failed to list udproutes by service reference", "service", targetRef.Name) + return nil + } + udprouteList = append(udprouteList, udprList.Items...) + } + var namespacedNameMap = make(map[k8stypes.NamespacedName]struct{}) + requests := make([]reconcile.Request, 0, len(udprouteList)) + for _, tr := range udprouteList { + key := k8stypes.NamespacedName{ + Namespace: tr.Namespace, + Name: tr.Name, + } + if _, ok := namespacedNameMap[key]; !ok { + namespacedNameMap[key] = struct{}{} + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: tr.Namespace, + Name: tr.Name, + }, + }) + } + } + return requests +} + +func (r *UDPRouteReconciler) listUDPRoutesForGateway(ctx context.Context, obj client.Object) []reconcile.Request { + gateway, ok := obj.(*gatewayv1.Gateway) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to Gateway") + } + udprList := &gatewayv1alpha2.UDPRouteList{} + if err := r.List(ctx, udprList, client.MatchingFields{ + indexer.ParentRefs: indexer.GenIndexKey(gateway.Namespace, gateway.Name), + }); err != nil { + r.Log.Error(err, "failed to list udproutes by gateway", "gateway", gateway.Name) + return nil + } + + requests := make([]reconcile.Request, 0, len(udprList.Items)) + for _, tcr := range udprList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: tcr.Namespace, + Name: tcr.Name, + }, + }) + } + return requests +} + +// listUDPRoutesForGatewayProxy list all UDPRoute resources that are affected by a given GatewayProxy +func (r *UDPRouteReconciler) listUDPRoutesForGatewayProxy(ctx context.Context, obj client.Object) []reconcile.Request { + gatewayProxy, ok := obj.(*v1alpha1.GatewayProxy) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to GatewayProxy") + return nil + } + + namespace := gatewayProxy.GetNamespace() + name := gatewayProxy.GetName() + + // find all gateways that reference this gateway proxy + gatewayList := &gatewayv1.GatewayList{} + if err := r.List(ctx, gatewayList, client.MatchingFields{ + indexer.ParametersRef: indexer.GenIndexKey(namespace, name), + }); err != nil { + r.Log.Error(err, "failed to list gateways for gateway proxy", "gatewayproxy", gatewayProxy.GetName()) + return nil + } + + var requests []reconcile.Request + + // for each gateway, find all UDPRoute resources that reference it + for _, gateway := range gatewayList.Items { + udpRouteList := &gatewayv1alpha2.UDPRouteList{} + if err := r.List(ctx, udpRouteList, client.MatchingFields{ + indexer.ParentRefs: indexer.GenIndexKey(gateway.Namespace, gateway.Name), + }); err != nil { + r.Log.Error(err, "failed to list udproutes for gateway", "gateway", gateway.Name) + continue + } + + for _, udpRoute := range udpRouteList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: udpRoute.Namespace, + Name: udpRoute.Name, + }, + }) + } + } + + return requests +} + +func (r *UDPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&gatewayv1alpha2.UDPRoute{}, req.NamespacedName) + tr := new(gatewayv1alpha2.UDPRoute) + if err := r.Get(ctx, req.NamespacedName, tr); err != nil { + if client.IgnoreNotFound(err) == nil { + tr.Namespace = req.Namespace + tr.Name = req.Name + + tr.TypeMeta = metav1.TypeMeta{ + Kind: KindUDPRoute, + APIVersion: gatewayv1alpha2.GroupVersion.String(), + } + + if err := r.Provider.Delete(ctx, tr); err != nil { + r.Log.Error(err, "failed to delete udproute", "udproute", tr) + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + type ResourceStatus struct { + status bool + msg string + } + + acceptStatus := ResourceStatus{ + status: true, + msg: "Route is accepted", + } + + gateways, err := ParseRouteParentRefs(ctx, r.Client, tr, tr.Spec.ParentRefs) + if err != nil { + return ctrl.Result{}, err + } + + if len(gateways) == 0 { + return ctrl.Result{}, nil + } + + tctx := provider.NewDefaultTranslateContext(ctx) + + tctx.RouteParentRefs = tr.Spec.ParentRefs + rk := utils.NamespacedNameKind(tr) + for _, gateway := range gateways { + if err := ProcessGatewayProxy(r.Client, r.Log, tctx, gateway.Gateway, rk); err != nil { + acceptStatus.status = false + acceptStatus.msg = err.Error() + } + } + + var backendRefErr error + if err := r.processUDPRoute(tctx, tr); err != nil { + // When encountering a backend reference error, it should not affect the acceptance status + if types.IsSomeReasonError(err, gatewayv1.RouteReasonInvalidKind) { + backendRefErr = err + } else { + acceptStatus.status = false + acceptStatus.msg = err.Error() + } + } + + // Store the backend reference error for later use. + // If the backend reference error is because of an invalid kind, use this error first + if err := r.processUDPRouteBackendRefs(tctx, req.NamespacedName); err != nil && backendRefErr == nil { + backendRefErr = err + } + + ProcessBackendTrafficPolicy(r.Client, r.Log, tctx) + tr.Status.Parents = make([]gatewayv1.RouteParentStatus, 0, len(gateways)) + for _, gateway := range gateways { + parentStatus := gatewayv1.RouteParentStatus{} + SetRouteParentRef(&parentStatus, gateway.Gateway.Name, gateway.Gateway.Namespace) + for _, condition := range gateway.Conditions { + parentStatus.Conditions = MergeCondition(parentStatus.Conditions, condition) + } + SetRouteConditionAccepted(&parentStatus, tr.GetGeneration(), acceptStatus.status, acceptStatus.msg) + SetRouteConditionResolvedRefs(&parentStatus, tr.GetGeneration(), backendRefErr) + + tr.Status.Parents = append(tr.Status.Parents, parentStatus) + } + + r.Updater.Update(status.Update{ + NamespacedName: utils.NamespacedName(tr), + Resource: &gatewayv1alpha2.UDPRoute{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + t, ok := obj.(*gatewayv1alpha2.UDPRoute) + if !ok { + err := fmt.Errorf("unsupported object type %T", obj) + panic(err) + } + tCopy := t.DeepCopy() + tCopy.Status = tr.Status + return tCopy + }), + }) + UpdateStatus(r.Updater, r.Log, tctx) + if isRouteAccepted(gateways) { + routeToUpdate := tr + if err := r.Provider.Update(ctx, tctx, routeToUpdate); err != nil { + return ctrl.Result{}, err + } + } + return ctrl.Result{}, nil +} + +func (r *UDPRouteReconciler) processUDPRoute(tctx *provider.TranslateContext, udpRoute *gatewayv1alpha2.UDPRoute) error { + var terror error + for _, rule := range udpRoute.Spec.Rules { + for _, backend := range rule.BackendRefs { + if backend.Kind != nil && *backend.Kind != KindService { + terror = types.NewInvalidKindError(*backend.Kind) + continue + } + tctx.BackendRefs = append(tctx.BackendRefs, gatewayv1.BackendRef{ + BackendObjectReference: gatewayv1.BackendObjectReference{ + Name: backend.Name, + Namespace: cmp.Or(backend.Namespace, (*gatewayv1.Namespace)(&udpRoute.Namespace)), + Port: backend.Port, + }, + }) + } + } + + return terror +} + +func (r *UDPRouteReconciler) processUDPRouteBackendRefs(tctx *provider.TranslateContext, trNN k8stypes.NamespacedName) error { + var terr error + for _, backend := range tctx.BackendRefs { + targetNN := k8stypes.NamespacedName{ + Namespace: trNN.Namespace, + Name: string(backend.Name), + } + if backend.Namespace != nil { + targetNN.Namespace = string(*backend.Namespace) + } + + if backend.Kind != nil && *backend.Kind != KindService { + terr = types.NewInvalidKindError(*backend.Kind) + continue + } + + if backend.Port == nil { + terr = fmt.Errorf("port is required") + continue + } + + var service corev1.Service + if err := r.Get(tctx, targetNN, &service); err != nil { + terr = err + if client.IgnoreNotFound(err) == nil { + terr = types.ReasonError{ + Reason: string(gatewayv1.RouteReasonBackendNotFound), + Message: fmt.Sprintf("Service %s not found", targetNN), + } + } + continue + } + + // if cross namespaces between UDPRoute and referenced Service, check ReferenceGrant + if trNN.Namespace != targetNN.Namespace { + if permitted := checkReferenceGrant(tctx, + r.Client, + v1beta1.ReferenceGrantFrom{ + Group: gatewayv1.GroupName, + Kind: KindUDPRoute, + Namespace: v1beta1.Namespace(trNN.Namespace), + }, + gatewayv1.ObjectReference{ + Group: corev1.GroupName, + Kind: KindService, + Name: gatewayv1.ObjectName(targetNN.Name), + Namespace: (*gatewayv1.Namespace)(&targetNN.Namespace), + }, + ); !permitted { + terr = types.ReasonError{ + Reason: string(v1beta1.RouteReasonRefNotPermitted), + Message: fmt.Sprintf("%s is in a different namespace than the UDPRoute %s and no ReferenceGrant allowing reference is configured", targetNN, trNN), + } + continue + } + } + + if service.Spec.Type == corev1.ServiceTypeExternalName { + tctx.Services[targetNN] = &service + continue + } + + portExists := false + for _, port := range service.Spec.Ports { + if port.Port == int32(*backend.Port) { + portExists = true + break + } + } + if !portExists { + terr = fmt.Errorf("port %d not found in service %s", *backend.Port, targetNN.Name) + continue + } + tctx.Services[targetNN] = &service + + endpointSliceList := new(discoveryv1.EndpointSliceList) + if err := r.List(tctx, endpointSliceList, + client.InNamespace(targetNN.Namespace), + client.MatchingLabels{ + discoveryv1.LabelServiceName: targetNN.Name, + }, + ); err != nil { + r.Log.Error(err, "failed to list endpoint slices", "Service", targetNN) + terr = err + continue + } + + tctx.EndpointSlices[targetNN] = endpointSliceList.Items + } + return terr +} + +func (r *UDPRouteReconciler) listUDPRoutesForReferenceGrant(ctx context.Context, obj client.Object) (requests []reconcile.Request) { + grant, ok := obj.(*v1beta1.ReferenceGrant) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to ReferenceGrant") + return nil + } + + var udpRouteList gatewayv1alpha2.UDPRouteList + if err := r.List(ctx, &udpRouteList); err != nil { + r.Log.Error(err, "failed to list udproutes for reference ReferenceGrant", "ReferenceGrant", k8stypes.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()}) + return nil + } + + for _, udpRoute := range udpRouteList.Items { + tr := v1beta1.ReferenceGrantFrom{ + Group: gatewayv1.GroupName, + Kind: KindUDPRoute, + Namespace: v1beta1.Namespace(udpRoute.GetNamespace()), + } + for _, from := range grant.Spec.From { + if from == tr { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: udpRoute.GetNamespace(), + Name: udpRoute.GetName(), + }, + }) + } + } + } + return requests +} + +func (r *UDPRouteReconciler) listUDPRoutesByServiceRef(ctx context.Context, obj client.Object) []reconcile.Request { + endpointSlice, ok := obj.(*discoveryv1.EndpointSlice) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to EndpointSlice") + return nil + } + namespace := endpointSlice.GetNamespace() + serviceName := endpointSlice.Labels[discoveryv1.LabelServiceName] + + trList := &gatewayv1alpha2.UDPRouteList{} + if err := r.List(ctx, trList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(namespace, serviceName), + }); err != nil { + r.Log.Error(err, "failed to list udproutes by service", "service", serviceName) + return nil + } + requests := make([]reconcile.Request, 0, len(trList.Items)) + for _, tr := range trList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: tr.Namespace, + Name: tr.Name, + }, + }) + } + return requests +} diff --git a/internal/controller/utils.go b/internal/controller/utils.go index f02e7326..c2eb03d2 100644 --- a/internal/controller/utils.go +++ b/internal/controller/utils.go @@ -65,6 +65,7 @@ const ( KindGateway = "Gateway" KindHTTPRoute = "HTTPRoute" KindTCPRoute = "TCPRoute" + KindUDPRoute = "UDPRoute" KindGRPCRoute = "GRPCRoute" KindGatewayClass = "GatewayClass" KindIngress = "Ingress" @@ -504,8 +505,8 @@ func routeHostnamesIntersectsWithListenerHostname(route client.Object, listener switch r := route.(type) { case *gatewayv1.HTTPRoute: return listenerHostnameIntersectWithRouteHostnames(listener, r.Spec.Hostnames) - case *gatewayv1alpha2.TCPRoute: - return true // TCPRoute doesn't have Hostnames to match + case *gatewayv1alpha2.TCPRoute, *gatewayv1alpha2.UDPRoute: + return true // TCPRoute and UDPRoute don't have Hostnames to match case *gatewayv1.GRPCRoute: return listenerHostnameIntersectWithRouteHostnames(listener, r.Spec.Hostnames) default: @@ -676,6 +677,10 @@ func routeMatchesListenerType(route client.Object, listener gatewayv1.Listener) if listener.Protocol != gatewayv1.TCPProtocolType { return false } + case *gatewayv1alpha2.UDPRoute: + if listener.Protocol != gatewayv1.UDPProtocolType { + return false + } default: return false } diff --git a/internal/manager/controllers.go b/internal/manager/controllers.go index e77ca160..48908446 100644 --- a/internal/manager/controllers.go +++ b/internal/manager/controllers.go @@ -88,6 +88,8 @@ import ( // +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=httproutes/status,verbs=get;update // +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=tcproutes,verbs=get;list;watch // +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=tcproutes/status,verbs=get;update +// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=udproutes,verbs=get;list;watch +// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=udproutes/status,verbs=get;update // +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=referencegrants,verbs=get;list;watch // +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=referencegrants/status,verbs=get;update // +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=grpcroutes,verbs=get;list;watch @@ -153,6 +155,14 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Updater: updater, Readier: readier, }, + &gatewayv1alpha2.UDPRoute{}: &controller.UDPRouteReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName(types.KindUDPRoute), + Provider: pro, + Updater: updater, + Readier: readier, + }, &gatewayv1.GRPCRoute{}: &controller.GRPCRouteReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), @@ -305,6 +315,9 @@ func registerGatewayAPIForReadinessGVK(mgr manager.Manager, readier readiness.Re if utils.HasAPIResource(mgr, &gatewayv1alpha2.TCPRoute{}) { gvks = append(gvks, types.GvkOf(&gatewayv1alpha2.TCPRoute{})) } + if utils.HasAPIResource(mgr, &gatewayv1alpha2.UDPRoute{}) { + gvks = append(gvks, types.GvkOf(&gatewayv1alpha2.UDPRoute{})) + } if len(gvks) == 0 { return } diff --git a/internal/provider/api7ee/provider.go b/internal/provider/api7ee/provider.go index a19c50cb..4463f187 100644 --- a/internal/provider/api7ee/provider.go +++ b/internal/provider/api7ee/provider.go @@ -107,6 +107,9 @@ func (d *api7eeProvider) Update(ctx context.Context, tctx *provider.TranslateCon case *gatewayv1alpha2.TCPRoute: result, err = d.translator.TranslateTCPRoute(tctx, t.DeepCopy()) resourceTypes = append(resourceTypes, adctypes.TypeService) + case *gatewayv1alpha2.UDPRoute: + result, err = d.translator.TranslateUDPRoute(tctx, t.DeepCopy()) + resourceTypes = append(resourceTypes, adctypes.TypeService) case *gatewayv1.GRPCRoute: result, err = d.translator.TranslateGRPCRoute(tctx, t.DeepCopy()) resourceTypes = append(resourceTypes, "service") @@ -188,7 +191,7 @@ func (d *api7eeProvider) Delete(ctx context.Context, obj client.Object) error { var resourceTypes []string var labels map[string]string switch obj.(type) { - case *gatewayv1.HTTPRoute, *apiv2.ApisixRoute, *gatewayv1.GRPCRoute, *gatewayv1alpha2.TCPRoute: + case *gatewayv1.HTTPRoute, *apiv2.ApisixRoute, *gatewayv1.GRPCRoute, *gatewayv1alpha2.TCPRoute, *gatewayv1alpha2.UDPRoute: resourceTypes = append(resourceTypes, "service") labels = label.GenLabel(obj) case *gatewayv1.Gateway: diff --git a/internal/provider/api7ee/status.go b/internal/provider/api7ee/status.go index 03b63db4..6f95f315 100644 --- a/internal/provider/api7ee/status.go +++ b/internal/provider/api7ee/status.go @@ -68,6 +68,7 @@ func (d *api7eeProvider) handleStatusUpdate(statusUpdateMap map[types.Namespaced d.statusUpdateMap = statusUpdateMap } +//nolint:gocyclo func (d *api7eeProvider) updateStatus(nnk types.NamespacedNameKind, condition metav1.Condition) { switch nnk.Kind { case types.KindApisixRoute: @@ -144,6 +145,41 @@ func (d *api7eeProvider) updateStatus(nnk types.NamespacedNameKind, condition me return cp }), }) + case types.KindUDPRoute: + parentRefs := d.client.ConfigManager.GetConfigRefsByResourceKey(nnk) + log.Debugw("updating UDPRoute status", zap.Any("parentRefs", parentRefs)) + gatewayRefs := map[types.NamespacedNameKind]struct{}{} + for _, parentRef := range parentRefs { + if parentRef.Kind == types.KindGateway { + gatewayRefs[parentRef] = struct{}{} + } + } + d.updater.Update(status.Update{ + NamespacedName: nnk.NamespacedName(), + Resource: &gatewayv1alpha2.UDPRoute{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + cp := obj.(*gatewayv1alpha2.UDPRoute).DeepCopy() + gatewayNs := cp.GetNamespace() + for i, ref := range cp.Status.Parents { + ns := gatewayNs + if ref.ParentRef.Namespace != nil { + ns = string(*ref.ParentRef.Namespace) + } + if ref.ParentRef.Kind == nil || *ref.ParentRef.Kind == types.KindGateway { + nnk := types.NamespacedNameKind{ + Name: string(ref.ParentRef.Name), + Namespace: ns, + Kind: types.KindGateway, + } + if _, ok := gatewayRefs[nnk]; ok { + ref.Conditions = cutils.MergeCondition(ref.Conditions, condition) + cp.Status.Parents[i] = ref + } + } + } + return cp + }), + }) case types.KindTCPRoute: parentRefs := d.client.ConfigManager.GetConfigRefsByResourceKey(nnk) log.Debugw("updating TCPRoute status", zap.Any("parentRefs", parentRefs)) diff --git a/internal/provider/apisix/provider.go b/internal/provider/apisix/provider.go index b14df705..bdb39ab0 100644 --- a/internal/provider/apisix/provider.go +++ b/internal/provider/apisix/provider.go @@ -114,6 +114,9 @@ func (d *apisixProvider) Update(ctx context.Context, tctx *provider.TranslateCon case *gatewayv1alpha2.TCPRoute: result, err = d.translator.TranslateTCPRoute(tctx, t.DeepCopy()) resourceTypes = append(resourceTypes, adctypes.TypeService) + case *gatewayv1alpha2.UDPRoute: + result, err = d.translator.TranslateUDPRoute(tctx, t.DeepCopy()) + resourceTypes = append(resourceTypes, adctypes.TypeService) case *gatewayv1.GRPCRoute: result, err = d.translator.TranslateGRPCRoute(tctx, t.DeepCopy()) resourceTypes = append(resourceTypes, adctypes.TypeService) @@ -191,7 +194,7 @@ func (d *apisixProvider) Delete(ctx context.Context, obj client.Object) error { var resourceTypes []string var labels map[string]string switch obj.(type) { - case *gatewayv1.HTTPRoute, *apiv2.ApisixRoute, *gatewayv1.GRPCRoute, *gatewayv1alpha2.TCPRoute: + case *gatewayv1.HTTPRoute, *apiv2.ApisixRoute, *gatewayv1.GRPCRoute, *gatewayv1alpha2.TCPRoute, *gatewayv1alpha2.UDPRoute: resourceTypes = append(resourceTypes, adctypes.TypeService) labels = label.GenLabel(obj) case *gatewayv1.Gateway: diff --git a/internal/provider/apisix/status.go b/internal/provider/apisix/status.go index 465718c4..7525035e 100644 --- a/internal/provider/apisix/status.go +++ b/internal/provider/apisix/status.go @@ -68,6 +68,7 @@ func (d *apisixProvider) handleStatusUpdate(statusUpdateMap map[types.Namespaced d.statusUpdateMap = statusUpdateMap } +//nolint:gocyclo func (d *apisixProvider) updateStatus(nnk types.NamespacedNameKind, condition metav1.Condition) { switch nnk.Kind { case types.KindApisixRoute: @@ -145,6 +146,41 @@ func (d *apisixProvider) updateStatus(nnk types.NamespacedNameKind, condition me return cp }), }) + case types.KindUDPRoute: + parentRefs := d.client.ConfigManager.GetConfigRefsByResourceKey(nnk) + log.Debugw("updating UDPRoute status", zap.Any("parentRefs", parentRefs)) + gatewayRefs := map[types.NamespacedNameKind]struct{}{} + for _, parentRef := range parentRefs { + if parentRef.Kind == types.KindGateway { + gatewayRefs[parentRef] = struct{}{} + } + } + d.updater.Update(status.Update{ + NamespacedName: nnk.NamespacedName(), + Resource: &gatewayv1alpha2.UDPRoute{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + cp := obj.(*gatewayv1alpha2.UDPRoute).DeepCopy() + gatewayNs := cp.GetNamespace() + for i, ref := range cp.Status.Parents { + ns := gatewayNs + if ref.ParentRef.Namespace != nil { + ns = string(*ref.ParentRef.Namespace) + } + if ref.ParentRef.Kind == nil || *ref.ParentRef.Kind == types.KindGateway { + nnk := types.NamespacedNameKind{ + Name: string(ref.ParentRef.Name), + Namespace: ns, + Kind: types.KindGateway, + } + if _, ok := gatewayRefs[nnk]; ok { + ref.Conditions = cutils.MergeCondition(ref.Conditions, condition) + cp.Status.Parents[i] = ref + } + } + } + return cp + }), + }) case types.KindTCPRoute: parentRefs := d.client.ConfigManager.GetConfigRefsByResourceKey(nnk) log.Debugw("updating TCPRoute status", zap.Any("parentRefs", parentRefs)) diff --git a/internal/types/k8s.go b/internal/types/k8s.go index 42e1f7fb..321582c8 100644 --- a/internal/types/k8s.go +++ b/internal/types/k8s.go @@ -42,6 +42,7 @@ const ( KindGateway = "Gateway" KindHTTPRoute = "HTTPRoute" KindTCPRoute = "TCPRoute" + KindUDPRoute = "UDPRoute" KindGRPCRoute = "GRPCRoute" KindGatewayClass = "GatewayClass" KindIngress = "Ingress" @@ -68,6 +69,8 @@ func KindOf(obj any) string { return KindGateway case *gatewayv1alpha2.TCPRoute: return KindTCPRoute + case *gatewayv1alpha2.UDPRoute: + return KindUDPRoute case *gatewayv1.HTTPRoute: return KindHTTPRoute case *gatewayv1.GRPCRoute: @@ -136,6 +139,8 @@ func GvkOf(obj any) schema.GroupVersionKind { return gatewayv1.SchemeGroupVersion.WithKind(kind) case *gatewayv1alpha2.TCPRoute: return gatewayv1alpha2.SchemeGroupVersion.WithKind(kind) + case *gatewayv1alpha2.UDPRoute: + return gatewayv1alpha2.SchemeGroupVersion.WithKind(kind) case *gatewayv1beta1.ReferenceGrant: return gatewayv1beta1.SchemeGroupVersion.WithKind(kind) case *netv1.Ingress, *netv1.IngressClass: diff --git a/test/e2e/crds/v2/route.go b/test/e2e/crds/v2/route.go index 349bc049..4df104f9 100644 --- a/test/e2e/crds/v2/route.go +++ b/test/e2e/crds/v2/route.go @@ -1603,7 +1603,7 @@ spec: It("access third-party service directly", func() { upstreamName := s.Namespace() routeName := s.Namespace() - createApisixUpstream(apiv2.ExternalTypeDomain, "httpbin.org", upstreamName) + createApisixUpstream(apiv2.ExternalTypeDomain, "httpbin-service-e2e-test", upstreamName) createApisixRoute(routeName, upstreamName) verifyAccess() }) @@ -1611,7 +1611,7 @@ spec: It("access third-party service with host rewrite", func() { upstreamName := s.Namespace() routeName := s.Namespace() - createApisixUpstream(apiv2.ExternalTypeDomain, "httpbin.org", upstreamName) + createApisixUpstream(apiv2.ExternalTypeDomain, "httpbin-service-e2e-test", upstreamName) createApisixRouteWithHostRewrite(routeName, "httpbin.org", upstreamName) verifyAccess() }) @@ -1620,7 +1620,7 @@ spec: externalServiceName := s.Namespace() upstreamName := s.Namespace() routeName := s.Namespace() - createExternalService("httpbin.org", externalServiceName) + createExternalService("httpbin-service-e2e-test", externalServiceName) createApisixUpstream(apiv2.ExternalTypeService, externalServiceName, upstreamName) createApisixRoute(routeName, upstreamName) verifyAccess() diff --git a/test/e2e/framework/manifests/dp.yaml b/test/e2e/framework/manifests/dp.yaml index d6806ab0..b34d4363 100644 --- a/test/e2e/framework/manifests/dp.yaml +++ b/test/e2e/framework/manifests/dp.yaml @@ -34,6 +34,8 @@ data: stream_proxy: tcp: - 9100 + udp: + - 9200 nginx_config: worker_processes: 2 error_log_level: debug @@ -217,6 +219,9 @@ spec: - containerPort: 9100 name: stream-route protocol: TCP + - containerPort: 9200 + name: udp + protocol: UDP readinessProbe: failureThreshold: 10 initialDelaySeconds: 3 @@ -277,6 +282,10 @@ spec: - name: tcp port: 9100 protocol: TCP + - name: udp + port: 9200 + protocol: UDP + targetPort: 9200 selector: app.kubernetes.io/instance: api7ee3 app.kubernetes.io/name: apisix diff --git a/test/e2e/framework/manifests/ingress.yaml b/test/e2e/framework/manifests/ingress.yaml index 766bbb31..f854b04d 100644 --- a/test/e2e/framework/manifests/ingress.yaml +++ b/test/e2e/framework/manifests/ingress.yaml @@ -159,6 +159,7 @@ rules: - httproutes/status - referencegrants/status - tcproutes/status + - udproutes/status verbs: - get - update @@ -170,6 +171,7 @@ rules: - httproutes - referencegrants - tcproutes + - udproutes verbs: - get - list diff --git a/test/e2e/gatewayapi/udproute.go b/test/e2e/gatewayapi/udproute.go new file mode 100644 index 00000000..1c2ea7c5 --- /dev/null +++ b/test/e2e/gatewayapi/udproute.go @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package gatewayapi + +import ( + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/apache/apisix-ingress-controller/test/e2e/scaffold" +) + +var _ = Describe("UDPRoute E2E Test", Label("networking.k8s.io", "udproute"), func() { + s := scaffold.NewDefaultScaffold() + Context("UDPRoute Base", func() { + + var udpGateway = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: Gateway +metadata: + name: %s +spec: + gatewayClassName: %s + listeners: + - name: udp + protocol: UDP + port: 80 + allowedRoutes: + kinds: + - kind: UDPRoute + infrastructure: + parametersRef: + group: apisix.apache.org + kind: GatewayProxy + name: apisix-proxy-config +` + + var udpRoute = ` +apiVersion: gateway.networking.k8s.io/v1alpha2 +kind: UDPRoute +metadata: + name: udp-app-1 +spec: + parentRefs: + - name: %s + sectionName: udp + rules: + - backendRefs: + - name: %s + port: %d +` + + BeforeEach(func() { + Expect(s.CreateResourceFromString(s.GetGatewayProxySpec())).NotTo(HaveOccurred(), "creating GatewayProxy") + + Expect(s.CreateResourceFromString(s.GetGatewayClassYaml())).NotTo(HaveOccurred(), "creating GatewayClass") + + Expect(s.CreateResourceFromString(fmt.Sprintf(udpGateway, s.Namespace(), s.Namespace()))). + NotTo(HaveOccurred(), "creating Gateway") + }) + + It("should route UDP traffic to backend service", func() { + dnsSvc := s.NewCoreDNSService() + gatewayName := s.Namespace() + By("creating UDPRoute") + routeYaml := fmt.Sprintf(udpRoute, gatewayName, dnsSvc.Name, dnsSvc.Spec.Ports[0].Port) + s.ResourceApplied("UDPRoute", "udp-app-1", routeYaml, 1) + + svc := s.GetDataplaneService() + + // test dns query + output, err := s.RunDigDNSClientFromK8s(fmt.Sprintf("@%s", svc.Name), "-p", "9200", "github.com") + Expect(err).NotTo(HaveOccurred(), "dig github.com via apisix udp proxy") + Expect(output).To(ContainSubstring("ADDITIONAL SECTION")) + + time.Sleep(3 * time.Second) + output = s.GetDeploymentLogs(scaffold.CoreDNSDeployment) + Expect(output).To(ContainSubstring("github.com. udp")) + }) + }) +})