diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 2d5e8c225..71b8a94b1 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -93,6 +93,7 @@ rules: - grpcroutes/status - httproutes/status - referencegrants/status + - tcproutes/status verbs: - get - update @@ -102,6 +103,7 @@ rules: - gateways - grpcroutes - httproutes + - tcproutes - referencegrants verbs: - get diff --git a/docs/en/latest/concepts/gateway-api.md b/docs/en/latest/concepts/gateway-api.md index c0fcbb752..20240f102 100644 --- a/docs/en/latest/concepts/gateway-api.md +++ b/docs/en/latest/concepts/gateway-api.md @@ -51,7 +51,7 @@ By supporting Gateway API, the APISIX Ingress controller can realize richer func | GRPCRoute | Supported | Supported | Not supported | v1 | | ReferenceGrant | Supported | Not supported | Not supported | v1beta1 | | TLSRoute | Not supported | Not supported | Not supported | v1alpha2 | -| TCPRoute | Not supported | Not supported | Not supported | v1alpha2 | +| TCPRoute | Supported | Supported | Not supported | v1alpha2 | | UDPRoute | Not supported | Not supported | Not supported | v1alpha2 | | BackendTLSPolicy | Not supported | Not supported | Not supported | v1alpha3 | diff --git a/examples/httpbin/tcproute.yaml b/examples/httpbin/tcproute.yaml new file mode 100644 index 000000000..0d68cc071 --- /dev/null +++ b/examples/httpbin/tcproute.yaml @@ -0,0 +1,75 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +apiVersion: gateway.networking.k8s.io/v1 +kind: GatewayClass +metadata: + name: apisix +spec: + controllerName: "apisix.apache.org/apisix-ingress-controller" + +--- + +apiVersion: apisix.apache.org/v1alpha1 +kind: GatewayProxy +metadata: + name: apisix-proxy-config +spec: + provider: + type: ControlPlane + controlPlane: + endpoints: + - ${ADMIN_ENDPOINT} # https://127.0.0.1:7443 + auth: + type: AdminKey + adminKey: + value: "${ADMIN_KEY}" + +--- + +apiVersion: gateway.networking.k8s.io/v1 +kind: Gateway +metadata: + name: apisix +spec: + gatewayClassName: apisix + listeners: + - name: foo + protocol: TCP + port: 80 + allowedRoutes: + kinds: + - kind: TCPRoute + infrastructure: + parametersRef: + group: apisix.apache.org + kind: GatewayProxy + name: apisix-proxy-config +--- + +apiVersion: gateway.networking.k8s.io/v1alpha2 +kind: TCPRoute +metadata: + name: tcp-app-1 +spec: + parentRefs: + - name: apisix + sectionName: foo + rules: + - backendRefs: + - name: httpbin + port: 80 diff --git a/internal/adc/translator/tcproute.go b/internal/adc/translator/tcproute.go new file mode 100644 index 000000000..5c97f2dcc --- /dev/null +++ b/internal/adc/translator/tcproute.go @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package translator + +import ( + "fmt" + + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + + adctypes "github.com/apache/apisix-ingress-controller/api/adc" + apiv2 "github.com/apache/apisix-ingress-controller/api/v2" + "github.com/apache/apisix-ingress-controller/internal/controller/label" + "github.com/apache/apisix-ingress-controller/internal/id" + "github.com/apache/apisix-ingress-controller/internal/provider" + "github.com/apache/apisix-ingress-controller/internal/types" +) + +func newDefaultUpstreamWithoutScheme() *adctypes.Upstream { + return &adctypes.Upstream{ + Metadata: adctypes.Metadata{ + Labels: map[string]string{ + "managed-by": "apisix-ingress-controller", + }, + }, + Nodes: make(adctypes.UpstreamNodes, 0), + } +} + +func (t *Translator) TranslateTCPRoute(tctx *provider.TranslateContext, tcpRoute *gatewayv1alpha2.TCPRoute) (*TranslateResult, error) { + result := &TranslateResult{} + rules := tcpRoute.Spec.Rules + labels := label.GenLabel(tcpRoute) + for ruleIndex, rule := range rules { + service := adctypes.NewDefaultService() + service.Labels = labels + service.Name = adctypes.ComposeServiceNameWithStream(tcpRoute.Namespace, tcpRoute.Name, fmt.Sprintf("%d", ruleIndex)) + service.ID = id.GenID(service.Name) + var ( + upstreams = make([]*adctypes.Upstream, 0) + weightedUpstreams = make([]adctypes.TrafficSplitConfigRuleWeightedUpstream, 0) + ) + for _, backend := range rule.BackendRefs { + if backend.Namespace == nil { + namespace := gatewayv1.Namespace(tcpRoute.Namespace) + backend.Namespace = &namespace + } + upstream := newDefaultUpstreamWithoutScheme() + upNodes, err := t.translateBackendRef(tctx, backend, DefaultEndpointFilter) + if err != nil { + continue + } + if len(upNodes) == 0 { + continue + } + // TODO: Confirm BackendTrafficPolicy attachment with e2e test case. + t.AttachBackendTrafficPolicyToUpstream(backend, tctx.BackendTrafficPolicies, upstream) + upstream.Nodes = upNodes + var ( + kind string + port int32 + ) + if backend.Kind == nil { + kind = types.KindService + } else { + kind = string(*backend.Kind) + } + if backend.Port != nil { + port = int32(*backend.Port) + } + namespace := string(*backend.Namespace) + name := string(backend.Name) + upstreamName := adctypes.ComposeUpstreamNameForBackendRef(kind, namespace, name, port) + upstream.Name = upstreamName + upstream.ID = id.GenID(upstreamName) + upstreams = append(upstreams, upstream) + } + + // Handle multiple backends with traffic-split plugin + if len(upstreams) == 0 { + // Create a default upstream if no valid backends + upstream := adctypes.NewDefaultUpstream() + service.Upstream = upstream + } else if len(upstreams) == 1 { + // Single backend - use directly as service upstream + service.Upstream = upstreams[0] + // remove the id and name of the service.upstream, adc schema does not need id and name for it + service.Upstream.ID = "" + service.Upstream.Name = "" + } else { + // Multiple backends - use traffic-split plugin + service.Upstream = upstreams[0] + // remove the id and name of the service.upstream, adc schema does not need id and name for it + service.Upstream.ID = "" + service.Upstream.Name = "" + + upstreams = upstreams[1:] + + if len(upstreams) > 0 { + service.Upstreams = upstreams + } + + // Set weight in traffic-split for the default upstream + weight := apiv2.DefaultWeight + if rule.BackendRefs[0].Weight != nil { + weight = int(*rule.BackendRefs[0].Weight) + } + weightedUpstreams = append(weightedUpstreams, adctypes.TrafficSplitConfigRuleWeightedUpstream{ + Weight: weight, + }) + + // Set other upstreams in traffic-split using upstream_id + for i, upstream := range upstreams { + weight := apiv2.DefaultWeight + // get weight from the backend refs starting from the second backend + if i+1 < len(rule.BackendRefs) && rule.BackendRefs[i+1].Weight != nil { + weight = int(*rule.BackendRefs[i+1].Weight) + } + weightedUpstreams = append(weightedUpstreams, adctypes.TrafficSplitConfigRuleWeightedUpstream{ + UpstreamID: upstream.ID, + Weight: weight, + }) + } + + if len(weightedUpstreams) > 0 { + if service.Plugins == nil { + service.Plugins = make(map[string]any) + } + service.Plugins["traffic-split"] = &adctypes.TrafficSplitConfig{ + Rules: []adctypes.TrafficSplitConfigRule{ + { + WeightedUpstreams: weightedUpstreams, + }, + }, + } + } + } + streamRoute := adctypes.NewDefaultStreamRoute() + streamRouteName := adctypes.ComposeStreamRouteName(tcpRoute.Namespace, tcpRoute.Name, fmt.Sprintf("%d", ruleIndex)) + streamRoute.Name = streamRouteName + streamRoute.ID = id.GenID(streamRouteName) + streamRoute.Labels = labels + // TODO: support remote_addr, server_addr, sni, server_port + service.StreamRoutes = append(service.StreamRoutes, streamRoute) + result.Services = append(result.Services, service) + } + return result, nil +} diff --git a/internal/controller/indexer/indexer.go b/internal/controller/indexer/indexer.go index f94faafc6..bd175f622 100644 --- a/internal/controller/indexer/indexer.go +++ b/internal/controller/indexer/indexer.go @@ -29,6 +29,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" "github.com/apache/apisix-ingress-controller/api/v1alpha1" apiv2 "github.com/apache/apisix-ingress-controller/api/v2" @@ -62,6 +63,7 @@ func SetupIndexer(mgr ctrl.Manager) error { &gatewayv1.Gateway{}: setupGatewayIndexer, &gatewayv1.HTTPRoute{}: setupHTTPRouteIndexer, &gatewayv1.GRPCRoute{}: setupGRPCRouteIndexer, + &gatewayv1alpha2.TCPRoute{}: setupTCPRouteIndexer, &gatewayv1.GatewayClass{}: setupGatewayClassIndexer, &v1alpha1.Consumer{}: setupConsumerIndexer, &networkingv1.Ingress{}: setupIngressIndexer, @@ -257,6 +259,26 @@ func setupHTTPRouteIndexer(mgr ctrl.Manager) error { return nil } +func setupTCPRouteIndexer(mgr ctrl.Manager) error { + if err := mgr.GetFieldIndexer().IndexField( + context.Background(), + &gatewayv1alpha2.TCPRoute{}, + ParentRefs, + TCPRouteParentRefsIndexFunc, + ); err != nil { + return err + } + + if err := mgr.GetFieldIndexer().IndexField( + context.Background(), + &gatewayv1alpha2.TCPRoute{}, + ServiceIndexRef, + TCPPRouteServiceIndexFunc, + ); err != nil { + return err + } + return nil +} func setupIngressClassIndexer(mgr ctrl.Manager) error { // create IngressClass index if err := mgr.GetFieldIndexer().IndexField( @@ -542,6 +564,19 @@ func HTTPRouteParentRefsIndexFunc(rawObj client.Object) []string { return keys } +func TCPRouteParentRefsIndexFunc(rawObj client.Object) []string { + tr := rawObj.(*gatewayv1alpha2.TCPRoute) + keys := make([]string, 0, len(tr.Spec.ParentRefs)) + for _, ref := range tr.Spec.ParentRefs { + ns := tr.GetNamespace() + if ref.Namespace != nil { + ns = string(*ref.Namespace) + } + keys = append(keys, GenIndexKey(ns, string(ref.Name))) + } + return keys +} + func HTTPRouteServiceIndexFunc(rawObj client.Object) []string { hr := rawObj.(*gatewayv1.HTTPRoute) keys := make([]string, 0, len(hr.Spec.Rules)) @@ -560,6 +595,24 @@ func HTTPRouteServiceIndexFunc(rawObj client.Object) []string { return keys } +func TCPPRouteServiceIndexFunc(rawObj client.Object) []string { + tr := rawObj.(*gatewayv1alpha2.TCPRoute) + keys := make([]string, 0, len(tr.Spec.Rules)) + for _, rule := range tr.Spec.Rules { + for _, backend := range rule.BackendRefs { + namespace := tr.GetNamespace() + if backend.Kind != nil && *backend.Kind != internaltypes.KindService { + continue + } + if backend.Namespace != nil { + namespace = string(*backend.Namespace) + } + keys = append(keys, GenIndexKey(namespace, string(backend.Name))) + } + } + return keys +} + func ApisixRouteServiceIndexFunc(cli client.Client) func(client.Object) []string { return func(obj client.Object) (keys []string) { ar := obj.(*apiv2.ApisixRoute) diff --git a/internal/controller/tcproute_controller.go b/internal/controller/tcproute_controller.go new file mode 100644 index 000000000..271198a6d --- /dev/null +++ b/internal/controller/tcproute_controller.go @@ -0,0 +1,505 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package controller + +import ( + "cmp" + "context" + "fmt" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + k8stypes "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + "sigs.k8s.io/gateway-api/apis/v1beta1" + + "github.com/apache/apisix-ingress-controller/api/v1alpha1" + "github.com/apache/apisix-ingress-controller/internal/controller/indexer" + "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" + "github.com/apache/apisix-ingress-controller/internal/provider" + "github.com/apache/apisix-ingress-controller/internal/types" + "github.com/apache/apisix-ingress-controller/internal/utils" +) + +// TCPRouteReconciler reconciles a TCPRoute object. +type TCPRouteReconciler struct { //nolint:revive + client.Client + Scheme *runtime.Scheme + + Log logr.Logger + + Provider provider.Provider + + Updater status.Updater + Readier readiness.ReadinessManager +} + +// SetupWithManager sets up the controller with the Manager. +func (r *TCPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { + + bdr := ctrl.NewControllerManagedBy(mgr). + For(&gatewayv1alpha2.TCPRoute{}). + WithEventFilter(predicate.GenerationChangedPredicate{}). + Watches(&discoveryv1.EndpointSlice{}, + handler.EnqueueRequestsFromMapFunc(r.listTCPRoutesByServiceRef), + ). + Watches(&gatewayv1.Gateway{}, + handler.EnqueueRequestsFromMapFunc(r.listTCPRoutesForGateway), + builder.WithPredicates( + predicate.Funcs{ + GenericFunc: func(e event.GenericEvent) bool { + return false + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return false + }, + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + return true + }, + }, + ), + ). + Watches(&v1alpha1.BackendTrafficPolicy{}, + handler.EnqueueRequestsFromMapFunc(r.listTCPRoutesForBackendTrafficPolicy), + ). + Watches(&v1alpha1.GatewayProxy{}, + handler.EnqueueRequestsFromMapFunc(r.listTCPRoutesForGatewayProxy), + ) + + if GetEnableReferenceGrant() { + bdr.Watches(&v1beta1.ReferenceGrant{}, + handler.EnqueueRequestsFromMapFunc(r.listTCPRoutesForReferenceGrant), + builder.WithPredicates(referenceGrantPredicates(KindTCPRoute)), + ) + } + + return bdr.Complete(r) +} + +func (r *TCPRouteReconciler) listTCPRoutesForBackendTrafficPolicy(ctx context.Context, obj client.Object) []reconcile.Request { + policy, ok := obj.(*v1alpha1.BackendTrafficPolicy) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to BackendTrafficPolicy") + return nil + } + + tcprouteList := []gatewayv1alpha2.TCPRoute{} + for _, targetRef := range policy.Spec.TargetRefs { + service := &corev1.Service{} + if err := r.Get(ctx, client.ObjectKey{ + Namespace: policy.Namespace, + Name: string(targetRef.Name), + }, service); err != nil { + if client.IgnoreNotFound(err) != nil { + r.Log.Error(err, "failed to get service", "namespace", policy.Namespace, "name", targetRef.Name) + } + continue + } + tcprList := &gatewayv1alpha2.TCPRouteList{} + if err := r.List(ctx, tcprList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(policy.Namespace, string(targetRef.Name)), + }); err != nil { + r.Log.Error(err, "failed to list tcproutes by service reference", "service", targetRef.Name) + return nil + } + tcprouteList = append(tcprouteList, tcprList.Items...) + } + var namespacedNameMap = make(map[k8stypes.NamespacedName]struct{}) + requests := make([]reconcile.Request, 0, len(tcprouteList)) + for _, tr := range tcprouteList { + key := k8stypes.NamespacedName{ + Namespace: tr.Namespace, + Name: tr.Name, + } + if _, ok := namespacedNameMap[key]; !ok { + namespacedNameMap[key] = struct{}{} + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: tr.Namespace, + Name: tr.Name, + }, + }) + } + } + return requests +} + +func (r *TCPRouteReconciler) listTCPRoutesForGateway(ctx context.Context, obj client.Object) []reconcile.Request { + gateway, ok := obj.(*gatewayv1.Gateway) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to Gateway") + } + tcprList := &gatewayv1alpha2.TCPRouteList{} + if err := r.List(ctx, tcprList, client.MatchingFields{ + indexer.ParentRefs: indexer.GenIndexKey(gateway.Namespace, gateway.Name), + }); err != nil { + r.Log.Error(err, "failed to list tcproutes by gateway", "gateway", gateway.Name) + return nil + } + + requests := make([]reconcile.Request, 0, len(tcprList.Items)) + for _, tcr := range tcprList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: tcr.Namespace, + Name: tcr.Name, + }, + }) + } + return requests +} + +// listTCPRoutesForGatewayProxy list all TCPRoute resources that are affected by a given GatewayProxy +func (r *TCPRouteReconciler) listTCPRoutesForGatewayProxy(ctx context.Context, obj client.Object) []reconcile.Request { + gatewayProxy, ok := obj.(*v1alpha1.GatewayProxy) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to GatewayProxy") + return nil + } + + namespace := gatewayProxy.GetNamespace() + name := gatewayProxy.GetName() + + // find all gateways that reference this gateway proxy + gatewayList := &gatewayv1.GatewayList{} + if err := r.List(ctx, gatewayList, client.MatchingFields{ + indexer.ParametersRef: indexer.GenIndexKey(namespace, name), + }); err != nil { + r.Log.Error(err, "failed to list gateways for gateway proxy", "gatewayproxy", gatewayProxy.GetName()) + return nil + } + + var requests []reconcile.Request + + // for each gateway, find all TCPRoute resources that reference it + for _, gateway := range gatewayList.Items { + tcpRouteList := &gatewayv1alpha2.TCPRouteList{} + if err := r.List(ctx, tcpRouteList, client.MatchingFields{ + indexer.ParentRefs: indexer.GenIndexKey(gateway.Namespace, gateway.Name), + }); err != nil { + r.Log.Error(err, "failed to list tcproutes for gateway", "gateway", gateway.Name) + continue + } + + for _, tcpRoute := range tcpRouteList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: tcpRoute.Namespace, + Name: tcpRoute.Name, + }, + }) + } + } + + return requests +} + +func (r *TCPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&gatewayv1alpha2.TCPRoute{}, req.NamespacedName) + tr := new(gatewayv1alpha2.TCPRoute) + if err := r.Get(ctx, req.NamespacedName, tr); err != nil { + if client.IgnoreNotFound(err) == nil { + tr.Namespace = req.Namespace + tr.Name = req.Name + + tr.TypeMeta = metav1.TypeMeta{ + Kind: KindTCPRoute, + APIVersion: gatewayv1alpha2.GroupVersion.String(), + } + + if err := r.Provider.Delete(ctx, tr); err != nil { + r.Log.Error(err, "failed to delete tcproute", "tcproute", tr) + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + type ResourceStatus struct { + status bool + msg string + } + + acceptStatus := ResourceStatus{ + status: true, + msg: "Route is accepted", + } + + gateways, err := ParseRouteParentRefs(ctx, r.Client, tr, tr.Spec.ParentRefs) + if err != nil { + return ctrl.Result{}, err + } + + if len(gateways) == 0 { + return ctrl.Result{}, nil + } + + tctx := provider.NewDefaultTranslateContext(ctx) + + tctx.RouteParentRefs = tr.Spec.ParentRefs + rk := utils.NamespacedNameKind(tr) + for _, gateway := range gateways { + if err := ProcessGatewayProxy(r.Client, r.Log, tctx, gateway.Gateway, rk); err != nil { + acceptStatus.status = false + acceptStatus.msg = err.Error() + } + } + + var backendRefErr error + if err := r.processTCPRoute(tctx, tr); err != nil { + // When encountering a backend reference error, it should not affect the acceptance status + if types.IsSomeReasonError(err, gatewayv1.RouteReasonInvalidKind) { + backendRefErr = err + } else { + acceptStatus.status = false + acceptStatus.msg = err.Error() + } + } + + // Store the backend reference error for later use. + // If the backend reference error is because of an invalid kind, use this error first + if err := r.processTCPRouteBackendRefs(tctx, req.NamespacedName); err != nil && backendRefErr == nil { + backendRefErr = err + } + + ProcessBackendTrafficPolicy(r.Client, r.Log, tctx) + tr.Status.Parents = make([]gatewayv1.RouteParentStatus, 0, len(gateways)) + for _, gateway := range gateways { + parentStatus := gatewayv1.RouteParentStatus{} + SetRouteParentRef(&parentStatus, gateway.Gateway.Name, gateway.Gateway.Namespace) + for _, condition := range gateway.Conditions { + parentStatus.Conditions = MergeCondition(parentStatus.Conditions, condition) + } + SetRouteConditionAccepted(&parentStatus, tr.GetGeneration(), acceptStatus.status, acceptStatus.msg) + SetRouteConditionResolvedRefs(&parentStatus, tr.GetGeneration(), backendRefErr) + + tr.Status.Parents = append(tr.Status.Parents, parentStatus) + } + + r.Updater.Update(status.Update{ + NamespacedName: utils.NamespacedName(tr), + Resource: &gatewayv1alpha2.TCPRoute{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + t, ok := obj.(*gatewayv1alpha2.TCPRoute) + if !ok { + err := fmt.Errorf("unsupported object type %T", obj) + panic(err) + } + tCopy := t.DeepCopy() + tCopy.Status = tr.Status + return tCopy + }), + }) + UpdateStatus(r.Updater, r.Log, tctx) + if isRouteAccepted(gateways) { + routeToUpdate := tr + if err := r.Provider.Update(ctx, tctx, routeToUpdate); err != nil { + return ctrl.Result{}, err + } + } + return ctrl.Result{}, nil +} + +func (r *TCPRouteReconciler) processTCPRoute(tctx *provider.TranslateContext, tcpRoute *gatewayv1alpha2.TCPRoute) error { + var terror error + for _, rule := range tcpRoute.Spec.Rules { + for _, backend := range rule.BackendRefs { + if backend.Kind != nil && *backend.Kind != KindService { + terror = types.NewInvalidKindError(*backend.Kind) + continue + } + tctx.BackendRefs = append(tctx.BackendRefs, gatewayv1.BackendRef{ + BackendObjectReference: gatewayv1.BackendObjectReference{ + Name: backend.Name, + Namespace: cmp.Or(backend.Namespace, (*gatewayv1.Namespace)(&tcpRoute.Namespace)), + Port: backend.Port, + }, + }) + } + } + + return terror +} + +func (r *TCPRouteReconciler) processTCPRouteBackendRefs(tctx *provider.TranslateContext, trNN k8stypes.NamespacedName) error { + var terr error + for _, backend := range tctx.BackendRefs { + targetNN := k8stypes.NamespacedName{ + Namespace: trNN.Namespace, + Name: string(backend.Name), + } + if backend.Namespace != nil { + targetNN.Namespace = string(*backend.Namespace) + } + + if backend.Kind != nil && *backend.Kind != KindService { + terr = types.NewInvalidKindError(*backend.Kind) + continue + } + + if backend.Port == nil { + terr = fmt.Errorf("port is required") + continue + } + + var service corev1.Service + if err := r.Get(tctx, targetNN, &service); err != nil { + terr = err + if client.IgnoreNotFound(err) == nil { + terr = types.ReasonError{ + Reason: string(gatewayv1.RouteReasonBackendNotFound), + Message: fmt.Sprintf("Service %s not found", targetNN), + } + } + continue + } + + // if cross namespaces between TCPRoute and referenced Service, check ReferenceGrant + if trNN.Namespace != targetNN.Namespace { + if permitted := checkReferenceGrant(tctx, + r.Client, + v1beta1.ReferenceGrantFrom{ + Group: gatewayv1.GroupName, + Kind: KindTCPRoute, + Namespace: v1beta1.Namespace(trNN.Namespace), + }, + gatewayv1.ObjectReference{ + Group: corev1.GroupName, + Kind: KindService, + Name: gatewayv1.ObjectName(targetNN.Name), + Namespace: (*gatewayv1.Namespace)(&targetNN.Namespace), + }, + ); !permitted { + terr = types.ReasonError{ + Reason: string(v1beta1.RouteReasonRefNotPermitted), + Message: fmt.Sprintf("%s is in a different namespace than the TCPRoute %s and no ReferenceGrant allowing reference is configured", targetNN, trNN), + } + continue + } + } + + if service.Spec.Type == corev1.ServiceTypeExternalName { + tctx.Services[targetNN] = &service + continue + } + + portExists := false + for _, port := range service.Spec.Ports { + if port.Port == int32(*backend.Port) { + portExists = true + break + } + } + if !portExists { + terr = fmt.Errorf("port %d not found in service %s", *backend.Port, targetNN.Name) + continue + } + tctx.Services[targetNN] = &service + + endpointSliceList := new(discoveryv1.EndpointSliceList) + if err := r.List(tctx, endpointSliceList, + client.InNamespace(targetNN.Namespace), + client.MatchingLabels{ + discoveryv1.LabelServiceName: targetNN.Name, + }, + ); err != nil { + r.Log.Error(err, "failed to list endpoint slices", "Service", targetNN) + terr = err + continue + } + + tctx.EndpointSlices[targetNN] = endpointSliceList.Items + } + return terr +} + +func (r *TCPRouteReconciler) listTCPRoutesForReferenceGrant(ctx context.Context, obj client.Object) (requests []reconcile.Request) { + grant, ok := obj.(*v1beta1.ReferenceGrant) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to ReferenceGrant") + return nil + } + + var tcpRouteList gatewayv1alpha2.TCPRouteList + if err := r.List(ctx, &tcpRouteList); err != nil { + r.Log.Error(err, "failed to list tcproutes for reference ReferenceGrant", "ReferenceGrant", k8stypes.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()}) + return nil + } + + for _, tcpRoute := range tcpRouteList.Items { + tr := v1beta1.ReferenceGrantFrom{ + Group: gatewayv1.GroupName, + Kind: KindTCPRoute, + Namespace: v1beta1.Namespace(tcpRoute.GetNamespace()), + } + for _, from := range grant.Spec.From { + if from == tr { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: tcpRoute.GetNamespace(), + Name: tcpRoute.GetName(), + }, + }) + } + } + } + return requests +} + +func (r *TCPRouteReconciler) listTCPRoutesByServiceRef(ctx context.Context, obj client.Object) []reconcile.Request { + endpointSlice, ok := obj.(*discoveryv1.EndpointSlice) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to EndpointSlice") + return nil + } + namespace := endpointSlice.GetNamespace() + serviceName := endpointSlice.Labels[discoveryv1.LabelServiceName] + + trList := &gatewayv1alpha2.TCPRouteList{} + if err := r.List(ctx, trList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(namespace, serviceName), + }); err != nil { + r.Log.Error(err, "failed to list tcproutes by service", "service", serviceName) + return nil + } + requests := make([]reconcile.Request, 0, len(trList.Items)) + for _, tr := range trList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: tr.Namespace, + Name: tr.Name, + }, + }) + } + return requests +} diff --git a/internal/controller/utils.go b/internal/controller/utils.go index 72b8ab59c..8e8be6e55 100644 --- a/internal/controller/utils.go +++ b/internal/controller/utils.go @@ -49,6 +49,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" "sigs.k8s.io/gateway-api/apis/v1beta1" "github.com/apache/apisix-ingress-controller/api/v1alpha1" @@ -64,6 +65,7 @@ import ( const ( KindGateway = "Gateway" KindHTTPRoute = "HTTPRoute" + KindTCPRoute = "TCPRoute" KindGRPCRoute = "GRPCRoute" KindGatewayClass = "GatewayClass" KindIngress = "Ingress" @@ -503,6 +505,8 @@ func routeHostnamesIntersectsWithListenerHostname(route client.Object, listener switch r := route.(type) { case *gatewayv1.HTTPRoute: return listenerHostnameIntersectWithRouteHostnames(listener, r.Spec.Hostnames) + case *gatewayv1alpha2.TCPRoute: + return true // TCPRoute doesn't have Hostnames to match case *gatewayv1.GRPCRoute: return listenerHostnameIntersectWithRouteHostnames(listener, r.Spec.Hostnames) default: @@ -669,6 +673,10 @@ func routeMatchesListenerType(route client.Object, listener gatewayv1.Listener) return false } } + case *gatewayv1alpha2.TCPRoute: + if listener.Protocol != gatewayv1.TCPProtocolType { + return false + } default: return false } diff --git a/internal/manager/controllers.go b/internal/manager/controllers.go index 9241849b3..e77ca1608 100644 --- a/internal/manager/controllers.go +++ b/internal/manager/controllers.go @@ -30,6 +30,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" "github.com/apache/apisix-ingress-controller/api/v1alpha1" apiv2 "github.com/apache/apisix-ingress-controller/api/v2" @@ -85,6 +86,8 @@ import ( // +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways/status,verbs=get;update // +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=httproutes,verbs=get;list;watch // +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=httproutes/status,verbs=get;update +// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=tcproutes,verbs=get;list;watch +// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=tcproutes/status,verbs=get;update // +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=referencegrants,verbs=get;list;watch // +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=referencegrants/status,verbs=get;update // +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=grpcroutes,verbs=get;list;watch @@ -142,6 +145,14 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Updater: updater, Readier: readier, }, + &gatewayv1alpha2.TCPRoute{}: &controller.TCPRouteReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName(types.KindTCPRoute), + Provider: pro, + Updater: updater, + Readier: readier, + }, &gatewayv1.GRPCRoute{}: &controller.GRPCRouteReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), @@ -291,6 +302,9 @@ func registerGatewayAPIForReadinessGVK(mgr manager.Manager, readier readiness.Re if utils.HasAPIResource(mgr, &gatewayv1.GRPCRoute{}) { gvks = append(gvks, types.GvkOf(&gatewayv1.GRPCRoute{})) } + if utils.HasAPIResource(mgr, &gatewayv1alpha2.TCPRoute{}) { + gvks = append(gvks, types.GvkOf(&gatewayv1alpha2.TCPRoute{})) + } if len(gvks) == 0 { return } diff --git a/internal/manager/run.go b/internal/manager/run.go index de64f62a2..d996939e6 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -35,6 +35,7 @@ import ( metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/controller-runtime/pkg/webhook" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" "sigs.k8s.io/gateway-api/apis/v1beta1" "github.com/apache/apisix-ingress-controller/api/v1alpha1" @@ -59,6 +60,9 @@ func init() { if err := gatewayv1.Install(scheme); err != nil { panic(err) } + if err := gatewayv1alpha2.Install(scheme); err != nil { + panic(err) + } if err := v1alpha1.AddToScheme(scheme); err != nil { panic(err) } diff --git a/internal/provider/api7ee/provider.go b/internal/provider/api7ee/provider.go index ab3687fc4..a19c50cba 100644 --- a/internal/provider/api7ee/provider.go +++ b/internal/provider/api7ee/provider.go @@ -29,6 +29,7 @@ import ( networkingv1beta1 "k8s.io/api/networking/v1beta1" "sigs.k8s.io/controller-runtime/pkg/client" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" adctypes "github.com/apache/apisix-ingress-controller/api/adc" "github.com/apache/apisix-ingress-controller/api/v1alpha1" @@ -103,6 +104,9 @@ func (d *api7eeProvider) Update(ctx context.Context, tctx *provider.TranslateCon case *gatewayv1.HTTPRoute: result, err = d.translator.TranslateHTTPRoute(tctx, t.DeepCopy()) resourceTypes = append(resourceTypes, "service") + case *gatewayv1alpha2.TCPRoute: + result, err = d.translator.TranslateTCPRoute(tctx, t.DeepCopy()) + resourceTypes = append(resourceTypes, adctypes.TypeService) case *gatewayv1.GRPCRoute: result, err = d.translator.TranslateGRPCRoute(tctx, t.DeepCopy()) resourceTypes = append(resourceTypes, "service") @@ -184,7 +188,7 @@ func (d *api7eeProvider) Delete(ctx context.Context, obj client.Object) error { var resourceTypes []string var labels map[string]string switch obj.(type) { - case *gatewayv1.HTTPRoute, *apiv2.ApisixRoute, *gatewayv1.GRPCRoute: + case *gatewayv1.HTTPRoute, *apiv2.ApisixRoute, *gatewayv1.GRPCRoute, *gatewayv1alpha2.TCPRoute: resourceTypes = append(resourceTypes, "service") labels = label.GenLabel(obj) case *gatewayv1.Gateway: diff --git a/internal/provider/api7ee/status.go b/internal/provider/api7ee/status.go index fd9ab2027..03b63db4f 100644 --- a/internal/provider/api7ee/status.go +++ b/internal/provider/api7ee/status.go @@ -26,6 +26,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" apiv2 "github.com/apache/apisix-ingress-controller/api/v2" "github.com/apache/apisix-ingress-controller/internal/controller/label" @@ -143,6 +144,41 @@ func (d *api7eeProvider) updateStatus(nnk types.NamespacedNameKind, condition me return cp }), }) + case types.KindTCPRoute: + parentRefs := d.client.ConfigManager.GetConfigRefsByResourceKey(nnk) + log.Debugw("updating TCPRoute status", zap.Any("parentRefs", parentRefs)) + gatewayRefs := map[types.NamespacedNameKind]struct{}{} + for _, parentRef := range parentRefs { + if parentRef.Kind == types.KindGateway { + gatewayRefs[parentRef] = struct{}{} + } + } + d.updater.Update(status.Update{ + NamespacedName: nnk.NamespacedName(), + Resource: &gatewayv1alpha2.TCPRoute{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + cp := obj.(*gatewayv1alpha2.TCPRoute).DeepCopy() + gatewayNs := cp.GetNamespace() + for i, ref := range cp.Status.Parents { + ns := gatewayNs + if ref.ParentRef.Namespace != nil { + ns = string(*ref.ParentRef.Namespace) + } + if ref.ParentRef.Kind == nil || *ref.ParentRef.Kind == types.KindGateway { + nnk := types.NamespacedNameKind{ + Name: string(ref.ParentRef.Name), + Namespace: ns, + Kind: types.KindGateway, + } + if _, ok := gatewayRefs[nnk]; ok { + ref.Conditions = cutils.MergeCondition(ref.Conditions, condition) + cp.Status.Parents[i] = ref + } + } + } + return cp + }), + }) case types.KindGRPCRoute: parentRefs := d.client.ConfigManager.GetConfigRefsByResourceKey(nnk) log.Debugw("updating GRPCRoute status", zap.Any("parentRefs", parentRefs)) diff --git a/internal/provider/apisix/provider.go b/internal/provider/apisix/provider.go index 7f06fdbd1..b14df7054 100644 --- a/internal/provider/apisix/provider.go +++ b/internal/provider/apisix/provider.go @@ -29,6 +29,7 @@ import ( networkingv1beta1 "k8s.io/api/networking/v1beta1" "sigs.k8s.io/controller-runtime/pkg/client" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" adctypes "github.com/apache/apisix-ingress-controller/api/adc" "github.com/apache/apisix-ingress-controller/api/v1alpha1" @@ -110,6 +111,9 @@ func (d *apisixProvider) Update(ctx context.Context, tctx *provider.TranslateCon case *gatewayv1.HTTPRoute: result, err = d.translator.TranslateHTTPRoute(tctx, t.DeepCopy()) resourceTypes = append(resourceTypes, adctypes.TypeService) + case *gatewayv1alpha2.TCPRoute: + result, err = d.translator.TranslateTCPRoute(tctx, t.DeepCopy()) + resourceTypes = append(resourceTypes, adctypes.TypeService) case *gatewayv1.GRPCRoute: result, err = d.translator.TranslateGRPCRoute(tctx, t.DeepCopy()) resourceTypes = append(resourceTypes, adctypes.TypeService) @@ -187,7 +191,7 @@ func (d *apisixProvider) Delete(ctx context.Context, obj client.Object) error { var resourceTypes []string var labels map[string]string switch obj.(type) { - case *gatewayv1.HTTPRoute, *apiv2.ApisixRoute, *gatewayv1.GRPCRoute: + case *gatewayv1.HTTPRoute, *apiv2.ApisixRoute, *gatewayv1.GRPCRoute, *gatewayv1alpha2.TCPRoute: resourceTypes = append(resourceTypes, adctypes.TypeService) labels = label.GenLabel(obj) case *gatewayv1.Gateway: diff --git a/internal/provider/apisix/status.go b/internal/provider/apisix/status.go index d4b1e8993..465718c4a 100644 --- a/internal/provider/apisix/status.go +++ b/internal/provider/apisix/status.go @@ -26,6 +26,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" apiv2 "github.com/apache/apisix-ingress-controller/api/v2" "github.com/apache/apisix-ingress-controller/internal/controller/label" @@ -144,6 +145,41 @@ func (d *apisixProvider) updateStatus(nnk types.NamespacedNameKind, condition me return cp }), }) + case types.KindTCPRoute: + parentRefs := d.client.ConfigManager.GetConfigRefsByResourceKey(nnk) + log.Debugw("updating TCPRoute status", zap.Any("parentRefs", parentRefs)) + gatewayRefs := map[types.NamespacedNameKind]struct{}{} + for _, parentRef := range parentRefs { + if parentRef.Kind == types.KindGateway { + gatewayRefs[parentRef] = struct{}{} + } + } + d.updater.Update(status.Update{ + NamespacedName: nnk.NamespacedName(), + Resource: &gatewayv1alpha2.TCPRoute{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + cp := obj.(*gatewayv1alpha2.TCPRoute).DeepCopy() + gatewayNs := cp.GetNamespace() + for i, ref := range cp.Status.Parents { + ns := gatewayNs + if ref.ParentRef.Namespace != nil { + ns = string(*ref.ParentRef.Namespace) + } + if ref.ParentRef.Kind == nil || *ref.ParentRef.Kind == types.KindGateway { + nnk := types.NamespacedNameKind{ + Name: string(ref.ParentRef.Name), + Namespace: ns, + Kind: types.KindGateway, + } + if _, ok := gatewayRefs[nnk]; ok { + ref.Conditions = cutils.MergeCondition(ref.Conditions, condition) + cp.Status.Parents[i] = ref + } + } + } + return cp + }), + }) case types.KindGRPCRoute: parentRefs := d.client.ConfigManager.GetConfigRefsByResourceKey(nnk) log.Debugw("updating GRPCRoute status", zap.Any("parentRefs", parentRefs)) diff --git a/internal/types/k8s.go b/internal/types/k8s.go index 047fd2a8e..00580325e 100644 --- a/internal/types/k8s.go +++ b/internal/types/k8s.go @@ -25,6 +25,7 @@ import ( kschema "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" "github.com/apache/apisix-ingress-controller/api/v1alpha1" @@ -36,6 +37,7 @@ const DefaultIngressClassAnnotation = "ingressclass.kubernetes.io/is-default-cla const ( KindGateway = "Gateway" KindHTTPRoute = "HTTPRoute" + KindTCPRoute = "TCPRoute" KindGRPCRoute = "GRPCRoute" KindGatewayClass = "GatewayClass" KindIngress = "Ingress" @@ -60,6 +62,8 @@ func KindOf(obj any) string { switch obj.(type) { case *gatewayv1.Gateway: return KindGateway + case *gatewayv1alpha2.TCPRoute: + return KindTCPRoute case *gatewayv1.HTTPRoute: return KindHTTPRoute case *gatewayv1.GRPCRoute: @@ -126,6 +130,8 @@ func GvkOf(obj any) schema.GroupVersionKind { switch obj.(type) { case *gatewayv1.Gateway, *gatewayv1.HTTPRoute, *gatewayv1.GatewayClass, *gatewayv1.GRPCRoute: return gatewayv1.SchemeGroupVersion.WithKind(kind) + case *gatewayv1alpha2.TCPRoute: + return gatewayv1alpha2.SchemeGroupVersion.WithKind(kind) case *gatewayv1beta1.ReferenceGrant: return gatewayv1beta1.SchemeGroupVersion.WithKind(kind) case *netv1.Ingress, *netv1.IngressClass: diff --git a/test/e2e/framework/manifests/ingress.yaml b/test/e2e/framework/manifests/ingress.yaml index fa24398ed..72709a2d1 100644 --- a/test/e2e/framework/manifests/ingress.yaml +++ b/test/e2e/framework/manifests/ingress.yaml @@ -167,6 +167,22 @@ rules: - gateways - grpcroutes - httproutes + - tcproutes + verbs: + - get + - list + - watch +- apiGroups: + - gateway.networking.k8s.io + resources: + - httproutes/status + - tcproutes/status + verbs: + - get + - update +- apiGroups: + - gateway.networking.k8s.io + resources: - referencegrants verbs: - get diff --git a/test/e2e/gatewayapi/httproute.go b/test/e2e/gatewayapi/httproute.go index 5b4a262a8..806c92926 100644 --- a/test/e2e/gatewayapi/httproute.go +++ b/test/e2e/gatewayapi/httproute.go @@ -108,7 +108,7 @@ spec: }).Should( And( ContainSubstring(`status: "True"`), - ContainSubstring("message: the gateway has been accepted by the apisix-ingress-controlle"), + ContainSubstring("message: the gateway has been accepted by the apisix-ingress-controller"), ), "check Gateway condition status", ) @@ -146,7 +146,7 @@ spec: }).Should( And( ContainSubstring(`status: "True"`), - ContainSubstring("message: the gateway has been accepted by the apisix-ingress-controlle"), + ContainSubstring("message: the gateway has been accepted by the apisix-ingress-controller"), ), "check Gateway condition status", ) diff --git a/test/e2e/gatewayapi/tcproute.go b/test/e2e/gatewayapi/tcproute.go new file mode 100644 index 000000000..7f9e73602 --- /dev/null +++ b/test/e2e/gatewayapi/tcproute.go @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package gatewayapi + +import ( + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/apache/apisix-ingress-controller/test/e2e/scaffold" +) + +var _ = Describe("TCPRoute E2E Test", Label("networking.k8s.io", "tcproute"), func() { + s := scaffold.NewDefaultScaffold() + Context("TCPRoute Base", func() { + var tcpGateway = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: Gateway +metadata: + name: %s +spec: + gatewayClassName: %s + listeners: + - name: tcp + protocol: TCP + port: 80 + allowedRoutes: + kinds: + - kind: TCPRoute + infrastructure: + parametersRef: + group: apisix.apache.org + kind: GatewayProxy + name: apisix-proxy-config +` + + var tcpRoute = ` +apiVersion: gateway.networking.k8s.io/v1alpha2 +kind: TCPRoute +metadata: + name: tcp-app-1 +spec: + parentRefs: + - name: %s + sectionName: tcp + rules: + - backendRefs: + - name: httpbin-service-e2e-test + port: 80 +` + + BeforeEach(func() { + // Create GatewayProxy + Expect(s.CreateResourceFromString(s.GetGatewayProxySpec())).NotTo(HaveOccurred(), "creating GatewayProxy") + + // Create GatewayClass + Expect(s.CreateResourceFromString(s.GetGatewayClassYaml())).NotTo(HaveOccurred(), "creating GatewayClass") + + // Create Gateway with TCP listener + Expect(s.CreateResourceFromString(fmt.Sprintf(tcpGateway, s.Namespace(), s.Namespace()))). + NotTo(HaveOccurred(), "creating Gateway") + }) + + It("should route TCP traffic to backend service", func() { + gatewayName := s.Namespace() + By("creating TCPRoute") + Expect(s.CreateResourceFromString(fmt.Sprintf(tcpRoute, gatewayName))). + NotTo(HaveOccurred(), "creating TCPRoute") + time.Sleep(2 * time.Second) + + By("verifying TCPRoute is functional") + s.HTTPOverTCPConnectAssert(true, time.Minute*5) // should be able to connect + By("sending TCP traffic to verify routing") + s.RequestAssert(&scaffold.RequestAssert{ + Client: s.NewAPISIXClientOnTCPPort(), + Method: "GET", + Path: "/get", + Check: scaffold.WithExpectedStatus(200), + Timeout: time.Second * 60, + Interval: time.Second * 2, + }) + + By("deleting TCPRoute") + Expect(s.DeleteResource("TCPRoute", "tcp-app-1")). + NotTo(HaveOccurred(), "deleting TCPRoute") + + s.HTTPOverTCPConnectAssert(false, time.Minute*5) + }) + }) +}) diff --git a/test/e2e/scaffold/assertion.go b/test/e2e/scaffold/assertion.go index ec0d2f210..612e2c817 100644 --- a/test/e2e/scaffold/assertion.go +++ b/test/e2e/scaffold/assertion.go @@ -19,6 +19,8 @@ package scaffold import ( "fmt" + "io" + "net" "net/http" "strings" "time" @@ -189,6 +191,46 @@ func WithExpectedNotHeaders(unexpectedHeaders []string) ResponseCheckFunc { } } +func (s *Scaffold) HTTPOverTCPConnectAssert(shouldRespond bool, timeout time.Duration) { + EventuallyWithOffset(1, func() error { + conn, err := net.DialTimeout("tcp", s.GetAPISIXTCPEndpoint(), 3*time.Second) + if err != nil { + return fmt.Errorf("failed to connect: %v", err) + } + defer func() { + _ = conn.Close() + }() + _, _ = fmt.Fprintf(conn, "GET /get HTTP/1.1\r\nHost: localhost\r\n\r\n") + + // Read response + _ = conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + buf := make([]byte, 1024) + n, err := conn.Read(buf) + + if shouldRespond { + // Should get a response (HTTP 200 from httpbin) + if err != nil || n == 0 { + return fmt.Errorf("expected response but got error: %v or empty response", err) + } + // Check if we got a valid HTTP response + response := string(buf[:n]) + if !strings.Contains(response, "HTTP/1.1") { + return fmt.Errorf("expected HTTP response but got: %s", response) + } + } else { + // Should get no response or connection reset + if err == nil && n > 0 { + return fmt.Errorf("expected no response but got: %s", string(buf[:n])) + } + // EOF or timeout is expected when no route is configured + if err != io.EOF && !strings.Contains(err.Error(), "timeout") { + return fmt.Errorf("expected EOF or timeout but got: %v", err) + } + } + return nil + }).WithTimeout(timeout).WithPolling(2 * time.Second).Should(Succeed()) +} + func (s *Scaffold) RequestAssert(r *RequestAssert) bool { if r.Client == nil { r.Client = s.NewAPISIXClient() diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go index cb47afd0f..1468b11fa 100644 --- a/test/e2e/scaffold/scaffold.go +++ b/test/e2e/scaffold/scaffold.go @@ -205,6 +205,25 @@ func (s *Scaffold) NewAPISIXClient() *httpexpect.Expect { }) } +func (s *Scaffold) NewAPISIXClientOnTCPPort() *httpexpect.Expect { + u := url.URL{ + Scheme: "http", + Host: s.apisixTunnels.TCP.Endpoint(), + } + return httpexpect.WithConfig(httpexpect.Config{ + BaseURL: u.String(), + Client: &http.Client{ + Transport: &http.Transport{}, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + }, + Reporter: httpexpect.NewAssertReporter( + httpexpect.NewAssertReporter(GinkgoT()), + ), + }) +} + func (s *Scaffold) ApisixHTTPEndpoint() string { return s.apisixTunnels.HTTP.Endpoint() }