diff --git a/Makefile b/Makefile index ccaed1542..d462fe27c 100644 --- a/Makefile +++ b/Makefile @@ -62,7 +62,7 @@ GATEAY_API_VERSION ?= v1.3.0 SUPPORTED_EXTENDED_FEATURES = "HTTPRouteDestinationPortMatching,HTTPRouteMethodMatching,HTTPRoutePortRedirect,HTTPRouteRequestMirror,HTTPRouteSchemeRedirect,GatewayAddressEmpty,HTTPRouteResponseHeaderModification,GatewayPort8080" CONFORMANCE_TEST_REPORT_OUTPUT ?= $(DIR)/apisix-ingress-controller-conformance-report.yaml ## https://github.com/kubernetes-sigs/gateway-api/blob/v1.3.0/conformance/utils/suite/profiles.go -CONFORMANCE_PROFILES ?= GATEWAY-HTTP +CONFORMANCE_PROFILES ?= GATEWAY-HTTP,GATEWAY-GRPC # Get the currently used golang install path (in GOPATH/bin, unless GOBIN is set) ifeq (,$(shell go env GOBIN)) diff --git a/api/adc/types.go b/api/adc/types.go index b6e01894c..7ff2d6bc5 100644 --- a/api/adc/types.go +++ b/api/adc/types.go @@ -343,10 +343,6 @@ const ( type Scheme string -const ( - SchemeHTTP = "http" -) - type UpstreamType string const ( @@ -518,11 +514,13 @@ func ComposeServiceNameWithRule(namespace, name string, rule string) string { return buf.String() } -func ComposeServiceNameWithStream(namespace, name string, rule string) string { - // FIXME Use sync.Pool to reuse this buffer if the upstream - // name composing code path is hot. +func ComposeGRPCServiceNameWithRule(namespace, name string, rule string) string { + return ComposeServicesNameWithScheme(namespace, name, rule, "grpc") +} + +func ComposeServicesNameWithScheme(namespace, name string, rule string, scheme string) string { var p []byte - plen := len(namespace) + len(name) + 6 + plen := len(namespace) + len(name) + len(rule) + len(scheme) + 3 p = make([]byte, 0, plen) buf := bytes.NewBuffer(p) @@ -531,11 +529,16 @@ func ComposeServiceNameWithStream(namespace, name string, rule string) string { buf.WriteString(name) buf.WriteByte('_') buf.WriteString(rule) - buf.WriteString("_stream") + buf.WriteByte('_') + buf.WriteString(scheme) return buf.String() } +func ComposeServiceNameWithStream(namespace, name string, rule string) string { + return ComposeServicesNameWithScheme(namespace, name, rule, "stream") +} + func ComposeConsumerName(namespace, name string) string { // FIXME Use sync.Pool to reuse this buffer if the upstream // name composing code path is hot. @@ -568,9 +571,8 @@ func NewDefaultUpstream() *Upstream { "managed-by": "apisix-ingress-controller", }, }, - Nodes: make(UpstreamNodes, 0), - Scheme: SchemeHTTP, - Type: Roundrobin, + Nodes: make(UpstreamNodes, 0), + Type: Roundrobin, } } diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index afd8c1f26..2d5e8c225 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -80,7 +80,6 @@ rules: - gateway.networking.k8s.io resources: - gatewayclasses - - gateways verbs: - get - list @@ -91,6 +90,7 @@ rules: resources: - gatewayclasses/status - gateways/status + - grpcroutes/status - httproutes/status - referencegrants/status verbs: @@ -99,35 +99,22 @@ rules: - apiGroups: - gateway.networking.k8s.io resources: + - gateways + - grpcroutes - httproutes - verbs: - - get - - list - - watch -- apiGroups: - - gateway.networking.k8s.io - resources: - referencegrants verbs: - - list - - update - - watch -- apiGroups: - - networking.k8s.io - resources: - - ingressclasses - verbs: - get - list - watch - apiGroups: - networking.k8s.io resources: + - ingressclasses - ingresses verbs: - get - list - - update - watch - apiGroups: - networking.k8s.io diff --git a/docs/en/latest/concepts/gateway-api.md b/docs/en/latest/concepts/gateway-api.md index ce431a715..c0fcbb752 100644 --- a/docs/en/latest/concepts/gateway-api.md +++ b/docs/en/latest/concepts/gateway-api.md @@ -48,7 +48,7 @@ By supporting Gateway API, the APISIX Ingress controller can realize richer func | GatewayClass | Supported | N/A | Not supported | v1 | | Gateway | Partially supported | Partially supported | Not supported | v1 | | HTTPRoute | Supported | Partially supported | Not supported | v1 | -| GRPCRoute | Not supported | Not supported | Not supported | v1 | +| GRPCRoute | Supported | Supported | Not supported | v1 | | ReferenceGrant | Supported | Not supported | Not supported | v1beta1 | | TLSRoute | Not supported | Not supported | Not supported | v1alpha2 | | TCPRoute | Not supported | Not supported | Not supported | v1alpha2 | diff --git a/go.mod b/go.mod index aa09f673f..3fe939c32 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/stretchr/testify v1.10.0 go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 + google.golang.org/grpc v1.71.1 gopkg.in/yaml.v3 v3.0.1 helm.sh/helm/v3 v3.15.4 k8s.io/api v0.32.3 @@ -237,7 +238,6 @@ require ( gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect - google.golang.org/grpc v1.71.1 // indirect google.golang.org/protobuf v1.36.6 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/fsnotify.v1 v1.4.7 // indirect diff --git a/internal/adc/translator/apisixroute.go b/internal/adc/translator/apisixroute.go index 6f61dcef9..87cbe3ee7 100644 --- a/internal/adc/translator/apisixroute.go +++ b/internal/adc/translator/apisixroute.go @@ -240,6 +240,7 @@ func (t *Translator) buildUpstream(tctx *provider.TranslateContext, service *adc upstreamName := adc.ComposeUpstreamName(ar.Namespace, ar.Name, fmt.Sprintf("%d", ruleIndex), fmt.Sprintf("%d", backendIndex)) upstream.Name = upstreamName upstream.ID = id.GenID(upstreamName) + upstream.Scheme = cmp.Or(upstream.Scheme, apiv2.SchemeHTTP) upstreams = append(upstreams, upstream) } @@ -265,6 +266,7 @@ func (t *Translator) buildUpstream(tctx *provider.TranslateContext, service *adc upstreamName := adc.ComposeExternalUpstreamName(upsNN.Namespace, upsNN.Name) upstream.Name = upstreamName upstream.ID = id.GenID(upstreamName) + upstream.Scheme = cmp.Or(upstream.Scheme, apiv2.SchemeHTTP) upstreams = append(upstreams, upstream) } diff --git a/internal/adc/translator/grpcroute.go b/internal/adc/translator/grpcroute.go new file mode 100644 index 000000000..d24fc8bc7 --- /dev/null +++ b/internal/adc/translator/grpcroute.go @@ -0,0 +1,372 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package translator + +import ( + "cmp" + "fmt" + "strings" + + "k8s.io/utils/ptr" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + + adctypes "github.com/apache/apisix-ingress-controller/api/adc" + apiv2 "github.com/apache/apisix-ingress-controller/api/v2" + "github.com/apache/apisix-ingress-controller/internal/controller/label" + "github.com/apache/apisix-ingress-controller/internal/id" + "github.com/apache/apisix-ingress-controller/internal/provider" + internaltypes "github.com/apache/apisix-ingress-controller/internal/types" +) + +func (t *Translator) fillPluginsFromGRPCRouteFilters( + plugins adctypes.Plugins, + namespace string, + filters []gatewayv1.GRPCRouteFilter, + tctx *provider.TranslateContext, +) { + for _, filter := range filters { + switch filter.Type { + case gatewayv1.GRPCRouteFilterRequestHeaderModifier: + t.fillPluginFromHTTPRequestHeaderFilter(plugins, filter.RequestHeaderModifier) + case gatewayv1.GRPCRouteFilterRequestMirror: + t.fillPluginFromHTTPRequestMirrorFilter(plugins, namespace, filter.RequestMirror, apiv2.SchemeGRPC) + case gatewayv1.GRPCRouteFilterResponseHeaderModifier: + t.fillPluginFromHTTPResponseHeaderFilter(plugins, filter.ResponseHeaderModifier) + case gatewayv1.GRPCRouteFilterExtensionRef: + t.fillPluginFromExtensionRef(plugins, namespace, filter.ExtensionRef, tctx) + } + } +} + +func calculateGRPCRoutePriority(match *gatewayv1.GRPCRouteMatch, ruleIndex int, hosts []string) uint64 { + const ( + // PreciseHostnameShiftBits assigns bit 31-38 for the length of hostname(max length=253). + // which has 8 bits, so the max length of hostname is 2^8-1 = 255. + PreciseHostnameShiftBits = 31 + + // HostnameLengthShiftBits assigns bits 23-30 for the length of hostname(max length=253). + // which has 8 bits, so the max length of hostname is 2^8-1 = 255. + HostnameLengthShiftBits = 23 + + // ServiceMatchShiftBits assigns bits 19-22 for the length of service name. + ServiceMatchShiftBits = 19 + + // MethodMatchShiftBits assigns bits 15-18 for the length of method name. + MethodMatchShiftBits = 15 + + // HeaderNumberShiftBits assign bits 10-14 to number of headers. (max number of headers = 16) + HeaderNumberShiftBits = 10 + + // RuleIndexShiftBits assigns bits 5-9 to rule index. (max number of rules = 16) + RuleIndexShiftBits = 5 + ) + + var ( + priority uint64 = 0 + // Handle hostname priority + // 1. Non-wildcard hostname priority + // 2. Hostname length priority + maxNonWildcardLength = 0 + maxHostnameLength = 0 + ) + + for _, host := range hosts { + isNonWildcard := !strings.Contains(host, "*") + + if isNonWildcard && len(host) > maxNonWildcardLength { + maxNonWildcardLength = len(host) + } + + if len(host) > maxHostnameLength { + maxHostnameLength = len(host) + } + } + + // If there is a non-wildcard hostname, set the PreciseHostnameShiftBits bit + if maxNonWildcardLength > 0 { + priority |= (uint64(maxNonWildcardLength) << PreciseHostnameShiftBits) + } + + if maxHostnameLength > 0 { + priority |= (uint64(maxHostnameLength) << HostnameLengthShiftBits) + } + + // Service and Method matching - this is the key difference from HTTPRoute + serviceLength := 0 + methodLength := 0 + + if match.Method != nil { + // Service matching + if match.Method.Service != nil { + serviceLength = len(*match.Method.Service) + priority |= (uint64(serviceLength) << ServiceMatchShiftBits) + } + + // Method matching + if match.Method.Method != nil { + methodLength = len(*match.Method.Method) + priority |= (uint64(methodLength) << MethodMatchShiftBits) + } + } + + // HeaderNumberShiftBits - GRPCRoute also supports header matching + headerCount := 0 + if match.Headers != nil { + headerCount = len(match.Headers) + } + priority |= (uint64(headerCount) << HeaderNumberShiftBits) + + // RuleIndexShiftBits - lower index has higher priority + // We invert the index so that rule 0 gets highest priority (16), rule 1 gets 15, etc. + index := 16 - ruleIndex + if index < 0 { + index = 0 + } + if index > 16 { + index = 16 + } + priority |= (uint64(index) << RuleIndexShiftBits) + + return priority +} + +func (t *Translator) TranslateGRPCRoute(tctx *provider.TranslateContext, grpcRoute *gatewayv1.GRPCRoute) (*TranslateResult, error) { + result := &TranslateResult{} + + hosts := make([]string, 0, len(grpcRoute.Spec.Hostnames)) + for _, hostname := range grpcRoute.Spec.Hostnames { + hosts = append(hosts, string(hostname)) + } + + for _, listener := range tctx.Listeners { + if listener.Hostname != nil { + hosts = append(hosts, string(*listener.Hostname)) + } + } + + rules := grpcRoute.Spec.Rules + + labels := label.GenLabel(grpcRoute) + + for ruleIndex, rule := range rules { + service := adctypes.NewDefaultService() + service.Labels = labels + + service.Name = adctypes.ComposeGRPCServiceNameWithRule(grpcRoute.Namespace, grpcRoute.Name, fmt.Sprintf("%d", ruleIndex)) + service.ID = id.GenID(service.Name) + service.Hosts = hosts + + var ( + upstreams = make([]*adctypes.Upstream, 0) + weightedUpstreams = make([]adctypes.TrafficSplitConfigRuleWeightedUpstream, 0) + backendErr error + ) + + for _, backend := range rule.BackendRefs { + if backend.Namespace == nil { + namespace := gatewayv1.Namespace(grpcRoute.Namespace) + backend.Namespace = &namespace + } + upstream := adctypes.NewDefaultUpstream() + upNodes, err := t.translateBackendRef(tctx, backend.BackendRef, DefaultEndpointFilter) + if err != nil { + backendErr = err + continue + } + if len(upNodes) == 0 { + continue + } + + t.AttachBackendTrafficPolicyToUpstream(backend.BackendRef, tctx.BackendTrafficPolicies, upstream) + upstream.Nodes = upNodes + + var ( + kind string + port int32 + ) + if backend.Kind == nil { + kind = internaltypes.KindService + } else { + kind = string(*backend.Kind) + } + if backend.Port != nil { + port = int32(*backend.Port) + } + namespace := string(*backend.Namespace) + name := string(backend.Name) + upstreamName := adctypes.ComposeUpstreamNameForBackendRef(kind, namespace, name, port) + upstream.Name = upstreamName + upstream.ID = id.GenID(upstreamName) + upstream.Scheme = cmp.Or(upstream.Scheme, apiv2.SchemeGRPC) + upstreams = append(upstreams, upstream) + } + + // Handle multiple backends with traffic-split plugin + if len(upstreams) == 0 { + // Create a default upstream if no valid backends + upstream := adctypes.NewDefaultUpstream() + upstream.Scheme = apiv2.SchemeGRPC + service.Upstream = upstream + } else { + // Multiple backends - use traffic-split plugin + service.Upstream = upstreams[0] + // remove the id and name of the service.upstream, adc schema does not need id and name for it + service.Upstream.ID = "" + service.Upstream.Name = "" + + upstreams = upstreams[1:] + + if len(upstreams) > 0 { + service.Upstreams = upstreams + + // Set weight in traffic-split for the default upstream + weight := apiv2.DefaultWeight + if rule.BackendRefs[0].Weight != nil { + weight = int(*rule.BackendRefs[0].Weight) + } + weightedUpstreams = append(weightedUpstreams, adctypes.TrafficSplitConfigRuleWeightedUpstream{ + Weight: weight, + }) + + // Set other upstreams in traffic-split using upstream_id + for i, upstream := range upstreams { + weight := apiv2.DefaultWeight + // get weight from the backend refs starting from the second backend + if i+1 < len(rule.BackendRefs) && rule.BackendRefs[i+1].Weight != nil { + weight = int(*rule.BackendRefs[i+1].Weight) + } + weightedUpstreams = append(weightedUpstreams, adctypes.TrafficSplitConfigRuleWeightedUpstream{ + UpstreamID: upstream.ID, + Weight: weight, + }) + } + + if len(weightedUpstreams) > 0 { + if service.Plugins == nil { + service.Plugins = make(map[string]any) + } + service.Plugins["traffic-split"] = &adctypes.TrafficSplitConfig{ + Rules: []adctypes.TrafficSplitConfigRule{ + { + WeightedUpstreams: weightedUpstreams, + }, + }, + } + } + } + } + + if backendErr != nil && (service.Upstream == nil || len(service.Upstream.Nodes) == 0) { + if service.Plugins == nil { + service.Plugins = make(map[string]any) + } + service.Plugins["fault-injection"] = map[string]any{ + "abort": map[string]any{ + "http_status": 500, + "body": "No existing backendRef provided", + }, + } + } + + t.fillPluginsFromGRPCRouteFilters(service.Plugins, grpcRoute.GetNamespace(), rule.Filters, tctx) + + matches := rule.Matches + if len(matches) == 0 { + matches = []gatewayv1.GRPCRouteMatch{{}} + } + + routes := []*adctypes.Route{} + for j, match := range matches { + route, err := t.translateGatewayGRPCRouteMatch(&match) + if err != nil { + return nil, err + } + + name := adctypes.ComposeRouteName(grpcRoute.Namespace, grpcRoute.Name, fmt.Sprintf("%d-%d", ruleIndex, j)) + route.Name = name + route.ID = id.GenID(name) + route.Labels = labels + + // Set the route priority + priority := calculateGRPCRoutePriority(&match, ruleIndex, hosts) + route.Priority = ptr.To(int64(priority)) + + routes = append(routes, route) + } + service.Routes = routes + + result.Services = append(result.Services, service) + } + + return result, nil +} + +func (t *Translator) translateGatewayGRPCRouteMatch(match *gatewayv1.GRPCRouteMatch) (*adctypes.Route, error) { + route := &adctypes.Route{} + + var ( + service string + method string + ) + if match.Method != nil { + service = ptr.Deref(match.Method.Service, "") + method = ptr.Deref(match.Method.Method, "") + matchType := ptr.Deref(match.Method.Type, gatewayv1.GRPCMethodMatchExact) + if matchType == gatewayv1.GRPCMethodMatchExact && + service == "" && method == "" { + return nil, fmt.Errorf("service and method cannot both be empty for exact match type") + } + } + + uri := t.translateGRPCURI(service, method) + route.Uris = append(route.Uris, uri) + + if match.Headers != nil { + for _, header := range match.Headers { + this, err := t.translateGRPCRouteHeaderMatchToVars(header) + if err != nil { + return nil, err + } + route.Vars = append(route.Vars, this) + } + } + return route, nil +} + +func (t *Translator) translateGRPCURI(service, method string) string { + var uri string + if service == "" { + uri = "/*" + } else { + uri = fmt.Sprintf("/%s", service) + } + if method != "" { + uri = uri + fmt.Sprintf("/%s", method) + } else if service != "" { + uri = uri + "/*" + } + return uri +} + +func (t *Translator) translateGRPCRouteHeaderMatchToVars(header gatewayv1.GRPCHeaderMatch) ([]adctypes.StringOrSlice, error) { + var matchType string + if header.Type != nil { + matchType = string(*header.Type) + } + return HeaderMatchToVars(matchType, string(header.Name), header.Value) +} diff --git a/internal/adc/translator/httproute.go b/internal/adc/translator/httproute.go index b33de0e46..ddb5d329e 100644 --- a/internal/adc/translator/httproute.go +++ b/internal/adc/translator/httproute.go @@ -18,6 +18,7 @@ package translator import ( + "cmp" "encoding/json" "fmt" "strings" @@ -54,7 +55,7 @@ func (t *Translator) fillPluginsFromHTTPRouteFilters( case gatewayv1.HTTPRouteFilterRequestRedirect: t.fillPluginFromHTTPRequestRedirectFilter(plugins, filter.RequestRedirect) case gatewayv1.HTTPRouteFilterRequestMirror: - t.fillPluginFromHTTPRequestMirrorFilter(plugins, namespace, filter.RequestMirror) + t.fillPluginFromHTTPRequestMirrorFilter(plugins, namespace, filter.RequestMirror, apiv2.SchemeHTTP) case gatewayv1.HTTPRouteFilterURLRewrite: t.fillPluginFromURLRewriteFilter(plugins, filter.URLRewrite, matches) case gatewayv1.HTTPRouteFilterResponseHeaderModifier: @@ -232,7 +233,7 @@ func (t *Translator) fillPluginFromHTTPResponseHeaderFilter(plugins adctypes.Plu plugin.Headers.Remove = append(plugin.Headers.Remove, respHeaderModifier.Remove...) } -func (t *Translator) fillPluginFromHTTPRequestMirrorFilter(plugins adctypes.Plugins, namespace string, reqMirror *gatewayv1.HTTPRequestMirrorFilter) { +func (t *Translator) fillPluginFromHTTPRequestMirrorFilter(plugins adctypes.Plugins, namespace string, reqMirror *gatewayv1.HTTPRequestMirrorFilter, scheme string) { pluginName := adctypes.PluginProxyMirror obj := plugins[pluginName] @@ -255,7 +256,7 @@ func (t *Translator) fillPluginFromHTTPRequestMirrorFilter(plugins adctypes.Plug ns = string(*reqMirror.BackendRef.Namespace) } - host := fmt.Sprintf("http://%s.%s.svc.cluster.local:%d", reqMirror.BackendRef.Name, ns, port) + host := fmt.Sprintf("%s://%s.%s.svc.cluster.local:%d", scheme, reqMirror.BackendRef.Name, ns, port) plugin.Host = host } @@ -561,6 +562,7 @@ func (t *Translator) TranslateHTTPRoute(tctx *provider.TranslateContext, httpRou name := string(backend.Name) upstreamName := adctypes.ComposeUpstreamNameForBackendRef(kind, namespace, name, port) upstream.Name = upstreamName + upstream.Scheme = cmp.Or(upstream.Scheme, apiv2.SchemeHTTP) upstream.ID = id.GenID(upstreamName) upstreams = append(upstreams, upstream) } @@ -722,36 +724,10 @@ func (t *Translator) translateGatewayHTTPRouteMatch(match *gatewayv1.HTTPRouteMa if len(match.Headers) > 0 { for _, header := range match.Headers { - name := strings.ToLower(string(header.Name)) - name = strings.ReplaceAll(name, "-", "_") - - var this []adctypes.StringOrSlice - this = append(this, adctypes.StringOrSlice{ - StrVal: "http_" + name, - }) - - matchType := gatewayv1.HeaderMatchExact - if header.Type != nil { - matchType = *header.Type - } - - switch matchType { - case gatewayv1.HeaderMatchExact: - this = append(this, adctypes.StringOrSlice{ - StrVal: "==", - }) - case gatewayv1.HeaderMatchRegularExpression: - this = append(this, adctypes.StringOrSlice{ - StrVal: "~~", - }) - default: - return nil, errors.New("unknown header match type " + string(matchType)) + this, err := t.translateHTTPRouteHeaderMatchToVars(header) + if err != nil { + return nil, err } - - this = append(this, adctypes.StringOrSlice{ - StrVal: header.Value, - }) - route.Vars = append(route.Vars, this) } } @@ -797,3 +773,39 @@ func (t *Translator) translateGatewayHTTPRouteMatch(match *gatewayv1.HTTPRouteMa return route, nil } + +func HeaderMatchToVars(matchType, name, value string) ([]adctypes.StringOrSlice, error) { + name = strings.ToLower(name) + name = strings.ReplaceAll(name, "-", "_") + + var this []adctypes.StringOrSlice + this = append(this, adctypes.StringOrSlice{ + StrVal: "http_" + name, + }) + + switch matchType { + case string(gatewayv1.HeaderMatchExact): + this = append(this, adctypes.StringOrSlice{ + StrVal: "==", + }) + case string(gatewayv1.HeaderMatchRegularExpression): + this = append(this, adctypes.StringOrSlice{ + StrVal: "~~", + }) + default: + return nil, errors.New("unknown header match type " + matchType) + } + + this = append(this, adctypes.StringOrSlice{ + StrVal: value, + }) + return this, nil +} + +func (t *Translator) translateHTTPRouteHeaderMatchToVars(header gatewayv1.HTTPHeaderMatch) ([]adctypes.StringOrSlice, error) { + var matchType string + if header.Type != nil { + matchType = string(*header.Type) + } + return HeaderMatchToVars(matchType, string(header.Name), header.Value) +} diff --git a/internal/controller/context.go b/internal/controller/context.go index f5d851942..5398f0448 100644 --- a/internal/controller/context.go +++ b/internal/controller/context.go @@ -26,5 +26,7 @@ type RouteParentRefContext struct { Gateway *gatewayv1.Gateway ListenerName string - Conditions []metav1.Condition + Listener *gatewayv1.Listener + + Conditions []metav1.Condition } diff --git a/internal/controller/grpcroute_controller.go b/internal/controller/grpcroute_controller.go new file mode 100644 index 000000000..138011a9e --- /dev/null +++ b/internal/controller/grpcroute_controller.go @@ -0,0 +1,584 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package controller + +import ( + "cmp" + "context" + "fmt" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + k8stypes "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + "sigs.k8s.io/gateway-api/apis/v1beta1" + + "github.com/apache/apisix-ingress-controller/api/v1alpha1" + "github.com/apache/apisix-ingress-controller/internal/controller/indexer" + "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" + "github.com/apache/apisix-ingress-controller/internal/provider" + "github.com/apache/apisix-ingress-controller/internal/types" + "github.com/apache/apisix-ingress-controller/internal/utils" +) + +// GRPCRouteReconciler reconciles a GatewayClass object. +type GRPCRouteReconciler struct { //nolint:revive + client.Client + Scheme *runtime.Scheme + + Log logr.Logger + + Provider provider.Provider + + genericEvent chan event.GenericEvent + + Updater status.Updater + Readier readiness.ReadinessManager +} + +// SetupWithManager sets up the controller with the Manager. +func (r *GRPCRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { + r.genericEvent = make(chan event.GenericEvent, 100) + + bdr := ctrl.NewControllerManagedBy(mgr). + For(&gatewayv1.GRPCRoute{}). + WithEventFilter(predicate.GenerationChangedPredicate{}). + Watches(&discoveryv1.EndpointSlice{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesByServiceRef), + ). + Watches(&v1alpha1.PluginConfig{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesByExtensionRef), + ). + Watches(&gatewayv1.Gateway{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForGateway), + builder.WithPredicates( + predicate.Funcs{ + GenericFunc: func(e event.GenericEvent) bool { + return false + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return false + }, + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + return true + }, + }, + ), + ). + Watches(&v1alpha1.BackendTrafficPolicy{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForBackendTrafficPolicy), + builder.WithPredicates( + BackendTrafficPolicyPredicateFunc(r.genericEvent), + ), + ). + Watches(&v1alpha1.GatewayProxy{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForGatewayProxy), + ). + WatchesRawSource( + source.Channel( + r.genericEvent, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRouteForGenericEvent), + ), + ) + + if GetEnableReferenceGrant() { + bdr.Watches(&v1beta1.ReferenceGrant{}, + handler.EnqueueRequestsFromMapFunc(r.listGRPCRoutesForReferenceGrant), + builder.WithPredicates(referenceGrantPredicates(KindGRPCRoute)), + ) + } + + return bdr.Complete(r) +} + +func (r *GRPCRouteReconciler) listGRPCRoutesByExtensionRef(ctx context.Context, obj client.Object) []reconcile.Request { + pluginconfig, ok := obj.(*v1alpha1.PluginConfig) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to PluginConfig") + return nil + } + namespace := pluginconfig.GetNamespace() + name := pluginconfig.GetName() + + grList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, grList, client.MatchingFields{ + indexer.ExtensionRef: indexer.GenIndexKey(namespace, name), + }); err != nil { + r.Log.Error(err, "failed to list grpcroutes by extension reference", "extension", name) + return nil + } + requests := make([]reconcile.Request, 0, len(grList.Items)) + for _, gr := range grList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: gr.Namespace, + Name: gr.Name, + }, + }) + } + return requests +} + +func (r *GRPCRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&gatewayv1.GRPCRoute{}, req.NamespacedName) + gr := new(gatewayv1.GRPCRoute) + if err := r.Get(ctx, req.NamespacedName, gr); err != nil { + if client.IgnoreNotFound(err) == nil { + gr.Namespace = req.Namespace + gr.Name = req.Name + + gr.TypeMeta = metav1.TypeMeta{ + Kind: KindGRPCRoute, + APIVersion: gatewayv1.GroupVersion.String(), + } + + if err := r.Provider.Delete(ctx, gr); err != nil { + r.Log.Error(err, "failed to delete grpcroute", "grpcroute", gr) + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + type ResourceStatus struct { + status bool + msg string + } + + // Only keep acceptStatus since we're using error objects directly now + acceptStatus := ResourceStatus{ + status: true, + msg: "Route is accepted", + } + + gateways, err := ParseRouteParentRefs(ctx, r.Client, gr, gr.Spec.ParentRefs) + if err != nil { + return ctrl.Result{}, err + } + + if len(gateways) == 0 { + return ctrl.Result{}, nil + } + + tctx := provider.NewDefaultTranslateContext(ctx) + + tctx.RouteParentRefs = gr.Spec.ParentRefs + rk := utils.NamespacedNameKind(gr) + for _, gateway := range gateways { + if err := ProcessGatewayProxy(r.Client, r.Log, tctx, gateway.Gateway, rk); err != nil { + acceptStatus.status = false + acceptStatus.msg = err.Error() + } + if gateway.Listener != nil { + tctx.Listeners = append(tctx.Listeners, *gateway.Listener) + } + } + + var backendRefErr error + if err := r.processGRPCRoute(tctx, gr); err != nil { + // When encountering a backend reference error, it should not affect the acceptance status + if types.IsSomeReasonError(err, gatewayv1.RouteReasonInvalidKind) { + backendRefErr = err + } else { + acceptStatus.status = false + acceptStatus.msg = err.Error() + } + } + + // Store the backend reference error for later use. + // If the backend reference error is because of an invalid kind, use this error first + if err := r.processGRPCRouteBackendRefs(tctx, req.NamespacedName); err != nil && backendRefErr == nil { + backendRefErr = err + } + + ProcessBackendTrafficPolicy(r.Client, r.Log, tctx) + + // TODO: diff the old and new status + gr.Status.Parents = make([]gatewayv1.RouteParentStatus, 0, len(gateways)) + for _, gateway := range gateways { + parentStatus := gatewayv1.RouteParentStatus{} + SetRouteParentRef(&parentStatus, gateway.Gateway.Name, gateway.Gateway.Namespace) + for _, condition := range gateway.Conditions { + parentStatus.Conditions = MergeCondition(parentStatus.Conditions, condition) + } + SetRouteConditionAccepted(&parentStatus, gr.GetGeneration(), acceptStatus.status, acceptStatus.msg) + SetRouteConditionResolvedRefs(&parentStatus, gr.GetGeneration(), backendRefErr) + + gr.Status.Parents = append(gr.Status.Parents, parentStatus) + } + + r.Updater.Update(status.Update{ + NamespacedName: utils.NamespacedName(gr), + Resource: &gatewayv1.GRPCRoute{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + h, ok := obj.(*gatewayv1.GRPCRoute) + if !ok { + err := fmt.Errorf("unsupported object type %T", obj) + panic(err) + } + hCopy := h.DeepCopy() + hCopy.Status = gr.Status + return hCopy + }), + }) + UpdateStatus(r.Updater, r.Log, tctx) + + if isRouteAccepted(gateways) && err == nil { + routeToUpdate := gr + if err := r.Provider.Update(ctx, tctx, routeToUpdate); err != nil { + return ctrl.Result{}, err + } + } + return ctrl.Result{}, nil +} + +func (r *GRPCRouteReconciler) listGRPCRoutesByServiceRef(ctx context.Context, obj client.Object) []reconcile.Request { + endpointSlice, ok := obj.(*discoveryv1.EndpointSlice) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to EndpointSlice") + return nil + } + namespace := endpointSlice.GetNamespace() + serviceName := endpointSlice.Labels[discoveryv1.LabelServiceName] + + gList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, gList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(namespace, serviceName), + }); err != nil { + r.Log.Error(err, "failed to list grpcroutes by service", "service", serviceName) + return nil + } + requests := make([]reconcile.Request, 0, len(gList.Items)) + for _, gr := range gList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: gr.Namespace, + Name: gr.Name, + }, + }) + } + return requests +} + +func (r *GRPCRouteReconciler) listGRPCRoutesForBackendTrafficPolicy(ctx context.Context, obj client.Object) []reconcile.Request { + policy, ok := obj.(*v1alpha1.BackendTrafficPolicy) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to BackendTrafficPolicy") + return nil + } + + grpcRouteList := []gatewayv1.GRPCRoute{} + for _, targetRef := range policy.Spec.TargetRefs { + service := &corev1.Service{} + if err := r.Get(ctx, client.ObjectKey{ + Namespace: policy.Namespace, + Name: string(targetRef.Name), + }, service); err != nil { + if client.IgnoreNotFound(err) != nil { + r.Log.Error(err, "failed to get service", "namespace", policy.Namespace, "name", targetRef.Name) + } + continue + } + grList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, grList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(policy.Namespace, string(targetRef.Name)), + }); err != nil { + r.Log.Error(err, "failed to list grpcroutes by service reference", "service", targetRef.Name) + return nil + } + grpcRouteList = append(grpcRouteList, grList.Items...) + } + var namespacedNameMap = make(map[k8stypes.NamespacedName]struct{}) + requests := make([]reconcile.Request, 0, len(grpcRouteList)) + for _, gr := range grpcRouteList { + key := k8stypes.NamespacedName{ + Namespace: gr.Namespace, + Name: gr.Name, + } + if _, ok := namespacedNameMap[key]; !ok { + namespacedNameMap[key] = struct{}{} + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: gr.Namespace, + Name: gr.Name, + }, + }) + } + } + return requests +} + +func (r *GRPCRouteReconciler) listGRPCRoutesForGateway(ctx context.Context, obj client.Object) []reconcile.Request { + gateway, ok := obj.(*gatewayv1.Gateway) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to Gateway") + } + grList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, grList, client.MatchingFields{ + indexer.ParentRefs: indexer.GenIndexKey(gateway.Namespace, gateway.Name), + }); err != nil { + r.Log.Error(err, "failed to list grpcroutes by gateway", "gateway", gateway.Name) + return nil + } + requests := make([]reconcile.Request, 0, len(grList.Items)) + for _, gr := range grList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: gr.Namespace, + Name: gr.Name, + }, + }) + } + return requests +} + +func (r *GRPCRouteReconciler) listGRPCRouteForGenericEvent(ctx context.Context, obj client.Object) (requests []reconcile.Request) { + switch obj.(type) { + case *v1alpha1.BackendTrafficPolicy: + return r.listGRPCRoutesForBackendTrafficPolicy(ctx, obj) + default: + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to BackendTrafficPolicy") + return nil + } +} + +func (r *GRPCRouteReconciler) processGRPCRouteBackendRefs(tctx *provider.TranslateContext, grNN k8stypes.NamespacedName) error { + var terr error + for _, backend := range tctx.BackendRefs { + targetNN := k8stypes.NamespacedName{ + Namespace: grNN.Namespace, + Name: string(backend.Name), + } + if backend.Namespace != nil { + targetNN.Namespace = string(*backend.Namespace) + } + + if backend.Kind != nil && *backend.Kind != "Service" { + terr = types.NewInvalidKindError(*backend.Kind) + continue + } + + if backend.Port == nil { + terr = fmt.Errorf("port is required") + continue + } + + var service corev1.Service + if err := r.Get(tctx, targetNN, &service); err != nil { + terr = err + if client.IgnoreNotFound(err) == nil { + terr = types.ReasonError{ + Reason: string(gatewayv1.RouteReasonBackendNotFound), + Message: fmt.Sprintf("Service %s not found", targetNN), + } + } + continue + } + + // if cross namespaces between GRPCRoute and referenced Service, check ReferenceGrant + if grNN.Namespace != targetNN.Namespace { + if permitted := checkReferenceGrant(tctx, + r.Client, + v1beta1.ReferenceGrantFrom{ + Group: gatewayv1.GroupName, + Kind: KindGRPCRoute, + Namespace: v1beta1.Namespace(grNN.Namespace), + }, + gatewayv1.ObjectReference{ + Group: corev1.GroupName, + Kind: KindService, + Name: gatewayv1.ObjectName(targetNN.Name), + Namespace: (*gatewayv1.Namespace)(&targetNN.Namespace), + }, + ); !permitted { + terr = types.ReasonError{ + Reason: string(v1beta1.RouteReasonRefNotPermitted), + Message: fmt.Sprintf("%s is in a different namespace than the GRPCRoute %s and no ReferenceGrant allowing reference is configured", targetNN, grNN), + } + continue + } + } + + if service.Spec.Type == corev1.ServiceTypeExternalName { + tctx.Services[targetNN] = &service + continue + } + + portExists := false + for _, port := range service.Spec.Ports { + if port.Port == int32(*backend.Port) { + portExists = true + break + } + } + if !portExists { + terr = fmt.Errorf("port %d not found in service %s", *backend.Port, targetNN.Name) + continue + } + tctx.Services[targetNN] = &service + + endpointSliceList := new(discoveryv1.EndpointSliceList) + if err := r.List(tctx, endpointSliceList, + client.InNamespace(targetNN.Namespace), + client.MatchingLabels{ + discoveryv1.LabelServiceName: targetNN.Name, + }, + ); err != nil { + r.Log.Error(err, "failed to list endpoint slices", "Service", targetNN) + terr = err + continue + } + + tctx.EndpointSlices[targetNN] = endpointSliceList.Items + } + return terr +} + +func (r *GRPCRouteReconciler) processGRPCRoute(tctx *provider.TranslateContext, grpcroute *gatewayv1.GRPCRoute) error { + var terror error + for _, rule := range grpcroute.Spec.Rules { + for _, filter := range rule.Filters { + if filter.Type != gatewayv1.GRPCRouteFilterExtensionRef || filter.ExtensionRef == nil { + continue + } + if filter.ExtensionRef.Kind == "PluginConfig" { + pluginconfig := new(v1alpha1.PluginConfig) + if err := r.Get(context.Background(), client.ObjectKey{ + Namespace: grpcroute.GetNamespace(), + Name: string(filter.ExtensionRef.Name), + }, pluginconfig); err != nil { + terror = err + continue + } + tctx.PluginConfigs[k8stypes.NamespacedName{ + Namespace: grpcroute.GetNamespace(), + Name: string(filter.ExtensionRef.Name), + }] = pluginconfig + } + } + for _, backend := range rule.BackendRefs { + if backend.Kind != nil && *backend.Kind != "Service" { + terror = types.NewInvalidKindError(*backend.Kind) + continue + } + tctx.BackendRefs = append(tctx.BackendRefs, gatewayv1.BackendRef{ + BackendObjectReference: gatewayv1.BackendObjectReference{ + Name: backend.Name, + Namespace: cmp.Or(backend.Namespace, (*gatewayv1.Namespace)(&grpcroute.Namespace)), + Port: backend.Port, + }, + }) + } + } + + return terror +} + +// listGRPCRoutesForGatewayProxy list all GRPCRoute resources that are affected by a given GatewayProxy +func (r *GRPCRouteReconciler) listGRPCRoutesForGatewayProxy(ctx context.Context, obj client.Object) []reconcile.Request { + gatewayProxy, ok := obj.(*v1alpha1.GatewayProxy) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to GatewayProxy") + return nil + } + + namespace := gatewayProxy.GetNamespace() + name := gatewayProxy.GetName() + + // find all gateways that reference this gateway proxy + gatewayList := &gatewayv1.GatewayList{} + if err := r.List(ctx, gatewayList, client.MatchingFields{ + indexer.ParametersRef: indexer.GenIndexKey(namespace, name), + }); err != nil { + r.Log.Error(err, "failed to list gateways for gateway proxy", "gatewayproxy", gatewayProxy.GetName()) + return nil + } + + var requests []reconcile.Request + + // for each gateway, find all GRPCRoute resources that reference it + for _, gateway := range gatewayList.Items { + grpcRouteList := &gatewayv1.GRPCRouteList{} + if err := r.List(ctx, grpcRouteList, client.MatchingFields{ + indexer.ParentRefs: indexer.GenIndexKey(gateway.Namespace, gateway.Name), + }); err != nil { + r.Log.Error(err, "failed to list grpcroutes for gateway", "gateway", gateway.Name) + continue + } + + for _, grpcRoute := range grpcRouteList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: grpcRoute.Namespace, + Name: grpcRoute.Name, + }, + }) + } + } + + return requests +} + +func (r *GRPCRouteReconciler) listGRPCRoutesForReferenceGrant(ctx context.Context, obj client.Object) (requests []reconcile.Request) { + grant, ok := obj.(*v1beta1.ReferenceGrant) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to ReferenceGrant") + return nil + } + + var grpcRouteList gatewayv1.GRPCRouteList + if err := r.List(ctx, &grpcRouteList); err != nil { + r.Log.Error(err, "failed to list grpcroutes for reference ReferenceGrant", "ReferenceGrant", k8stypes.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()}) + return nil + } + + for _, grpcRoute := range grpcRouteList.Items { + gr := v1beta1.ReferenceGrantFrom{ + Group: gatewayv1.GroupName, + Kind: KindGRPCRoute, + Namespace: v1beta1.Namespace(grpcRoute.GetNamespace()), + } + for _, from := range grant.Spec.From { + if from == gr { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: grpcRoute.GetNamespace(), + Name: grpcRoute.GetName(), + }, + }) + } + } + } + return requests +} diff --git a/internal/controller/httproute_controller.go b/internal/controller/httproute_controller.go index dfaf27db0..84b588d2f 100644 --- a/internal/controller/httproute_controller.go +++ b/internal/controller/httproute_controller.go @@ -94,7 +94,7 @@ func (r *HTTPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { // Conditionally watch EndpointSlice or Endpoints based on cluster API support bdr = watchEndpointSliceOrEndpoints(bdr, r.supportsEndpointSlice, - r.listHTTPRoutesByServiceBef, + r.listHTTPRoutesByServiceRef, r.listHTTPRoutesByServiceForEndpoints, r.Log) @@ -280,7 +280,7 @@ func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, nil } -func (r *HTTPRouteReconciler) listHTTPRoutesByServiceBef(ctx context.Context, obj client.Object) []reconcile.Request { +func (r *HTTPRouteReconciler) listHTTPRoutesByServiceRef(ctx context.Context, obj client.Object) []reconcile.Request { endpointSlice, ok := obj.(*discoveryv1.EndpointSlice) if !ok { r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to EndpointSlice") @@ -341,7 +341,7 @@ func (r *HTTPRouteReconciler) listHTTPRoutesByServiceForEndpoints(ctx context.Co func (r *HTTPRouteReconciler) listHTTPRoutesByExtensionRef(ctx context.Context, obj client.Object) []reconcile.Request { pluginconfig, ok := obj.(*v1alpha1.PluginConfig) if !ok { - r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to EndpointSlice") + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to PluginConfig") return nil } namespace := pluginconfig.GetNamespace() diff --git a/internal/controller/indexer/grpcroute.go b/internal/controller/indexer/grpcroute.go new file mode 100644 index 000000000..656acf686 --- /dev/null +++ b/internal/controller/indexer/grpcroute.go @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package indexer + +import ( + "context" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + + internaltypes "github.com/apache/apisix-ingress-controller/internal/types" +) + +func setupGRPCRouteIndexer(mgr ctrl.Manager) error { + if err := mgr.GetFieldIndexer().IndexField( + context.Background(), + &gatewayv1.GRPCRoute{}, + ParentRefs, + GRPCRouteParentRefsIndexFunc, + ); err != nil { + return err + } + + if err := mgr.GetFieldIndexer().IndexField( + context.Background(), + &gatewayv1.GRPCRoute{}, + ExtensionRef, + GRPCRouteExtensionIndexFunc, + ); err != nil { + return err + } + + if err := mgr.GetFieldIndexer().IndexField( + context.Background(), + &gatewayv1.GRPCRoute{}, + ServiceIndexRef, + GRPCRouteServiceIndexFunc, + ); err != nil { + return err + } + + return nil +} + +func GRPCRouteParentRefsIndexFunc(rawObj client.Object) []string { + gr := rawObj.(*gatewayv1.GRPCRoute) + keys := make([]string, 0, len(gr.Spec.ParentRefs)) + for _, ref := range gr.Spec.ParentRefs { + ns := gr.GetNamespace() + if ref.Namespace != nil { + ns = string(*ref.Namespace) + } + keys = append(keys, GenIndexKey(ns, string(ref.Name))) + } + return keys +} + +func GRPCRouteServiceIndexFunc(rawObj client.Object) []string { + gr := rawObj.(*gatewayv1.GRPCRoute) + keys := make([]string, 0, len(gr.Spec.Rules)) + for _, rule := range gr.Spec.Rules { + for _, backend := range rule.BackendRefs { + namespace := gr.GetNamespace() + if backend.Kind != nil && *backend.Kind != internaltypes.KindService { + continue + } + if backend.Namespace != nil { + namespace = string(*backend.Namespace) + } + keys = append(keys, GenIndexKey(namespace, string(backend.Name))) + } + } + return keys +} + +func GRPCRouteExtensionIndexFunc(rawObj client.Object) []string { + gr := rawObj.(*gatewayv1.GRPCRoute) + keys := make([]string, 0, len(gr.Spec.Rules)) + for _, rule := range gr.Spec.Rules { + for _, filter := range rule.Filters { + if filter.Type != gatewayv1.GRPCRouteFilterExtensionRef || filter.ExtensionRef == nil { + continue + } + if filter.ExtensionRef.Kind == internaltypes.KindPluginConfig { + keys = append(keys, GenIndexKey(gr.GetNamespace(), string(filter.ExtensionRef.Name))) + } + } + } + return keys +} diff --git a/internal/controller/indexer/indexer.go b/internal/controller/indexer/indexer.go index 764bc9d49..f94faafc6 100644 --- a/internal/controller/indexer/indexer.go +++ b/internal/controller/indexer/indexer.go @@ -61,6 +61,7 @@ func SetupIndexer(mgr ctrl.Manager) error { for resource, setup := range map[client.Object]func(ctrl.Manager) error{ &gatewayv1.Gateway{}: setupGatewayIndexer, &gatewayv1.HTTPRoute{}: setupHTTPRouteIndexer, + &gatewayv1.GRPCRoute{}: setupGRPCRouteIndexer, &gatewayv1.GatewayClass{}: setupGatewayClassIndexer, &v1alpha1.Consumer{}: setupConsumerIndexer, &networkingv1.Ingress{}: setupIngressIndexer, diff --git a/internal/controller/utils.go b/internal/controller/utils.go index cd24041ea..72b8ab59c 100644 --- a/internal/controller/utils.go +++ b/internal/controller/utils.go @@ -64,6 +64,7 @@ import ( const ( KindGateway = "Gateway" KindHTTPRoute = "HTTPRoute" + KindGRPCRoute = "GRPCRoute" KindGatewayClass = "GatewayClass" KindIngress = "Ingress" KindIngressClass = "IngressClass" @@ -366,6 +367,7 @@ func ParseRouteParentRefs( matched := false reason := gatewayv1.RouteReasonNoMatchingParent var listenerName string + var matchedListener gatewayv1.Listener for _, listener := range gateway.Spec.Listeners { if parentRef.SectionName != nil { @@ -407,6 +409,7 @@ func ParseRouteParentRefs( // TODO: check if the listener status is programmed matched = true + matchedListener = listener break } @@ -414,6 +417,7 @@ func ParseRouteParentRefs( gateways = append(gateways, RouteParentRefContext{ Gateway: &gateway, ListenerName: listenerName, + Listener: &matchedListener, Conditions: []metav1.Condition{{ Type: string(gatewayv1.RouteConditionAccepted), Status: metav1.ConditionTrue, @@ -425,6 +429,7 @@ func ParseRouteParentRefs( gateways = append(gateways, RouteParentRefContext{ Gateway: &gateway, ListenerName: listenerName, + Listener: &matchedListener, Conditions: []metav1.Condition{{ Type: string(gatewayv1.RouteConditionAccepted), Status: metav1.ConditionFalse, @@ -498,6 +503,8 @@ func routeHostnamesIntersectsWithListenerHostname(route client.Object, listener switch r := route.(type) { case *gatewayv1.HTTPRoute: return listenerHostnameIntersectWithRouteHostnames(listener, r.Spec.Hostnames) + case *gatewayv1.GRPCRoute: + return listenerHostnameIntersectWithRouteHostnames(listener, r.Spec.Hostnames) default: return false } @@ -648,7 +655,7 @@ func isRouteNamespaceAllowed( func routeMatchesListenerType(route client.Object, listener gatewayv1.Listener) bool { switch route.(type) { - case *gatewayv1.HTTPRoute: + case *gatewayv1.HTTPRoute, *gatewayv1.GRPCRoute: if listener.Protocol != gatewayv1.HTTPProtocolType && listener.Protocol != gatewayv1.HTTPSProtocolType { return false } diff --git a/internal/manager/controllers.go b/internal/manager/controllers.go index 7fbb5cdb6..9241849b3 100644 --- a/internal/manager/controllers.go +++ b/internal/manager/controllers.go @@ -81,15 +81,17 @@ import ( // GatewayAPI // +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gatewayclasses,verbs=get;list;watch;update // +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gatewayclasses/status,verbs=get;update -// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways,verbs=get;list;watch;update +// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways,verbs=get;list;watch // +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways/status,verbs=get;update // +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=httproutes,verbs=get;list;watch // +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=httproutes/status,verbs=get;update -// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=referencegrants,verbs=list;watch;update +// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=referencegrants,verbs=get;list;watch // +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=referencegrants/status,verbs=get;update +// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=grpcroutes,verbs=get;list;watch +// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=grpcroutes/status,verbs=get;update // Networking -// +kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;list;watch;update +// +kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;list;watch // +kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses/status,verbs=get;update // +kubebuilder:rbac:groups=networking.k8s.io,resources=ingressclasses,verbs=get;list;watch @@ -140,6 +142,14 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Updater: updater, Readier: readier, }, + &gatewayv1.GRPCRoute{}: &controller.GRPCRouteReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName(types.KindGRPCRoute), + Provider: pro, + Updater: updater, + Readier: readier, + }, &v1alpha1.Consumer{}: &controller.ConsumerReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), @@ -278,6 +288,9 @@ func registerGatewayAPIForReadinessGVK(mgr manager.Manager, readier readiness.Re if utils.HasAPIResource(mgr, &gatewayv1.HTTPRoute{}) { gvks = append(gvks, types.GvkOf(&gatewayv1.HTTPRoute{})) } + if utils.HasAPIResource(mgr, &gatewayv1.GRPCRoute{}) { + gvks = append(gvks, types.GvkOf(&gatewayv1.GRPCRoute{})) + } if len(gvks) == 0 { return } diff --git a/internal/provider/api7ee/provider.go b/internal/provider/api7ee/provider.go index 3c3784a00..14e3cc45c 100644 --- a/internal/provider/api7ee/provider.go +++ b/internal/provider/api7ee/provider.go @@ -98,6 +98,9 @@ func (d *api7eeProvider) Update(ctx context.Context, tctx *provider.TranslateCon case *gatewayv1.HTTPRoute: result, err = d.translator.TranslateHTTPRoute(tctx, t.DeepCopy()) resourceTypes = append(resourceTypes, "service") + case *gatewayv1.GRPCRoute: + result, err = d.translator.TranslateGRPCRoute(tctx, t.DeepCopy()) + resourceTypes = append(resourceTypes, "service") case *gatewayv1.Gateway: result, err = d.translator.TranslateGateway(tctx, t.DeepCopy()) resourceTypes = append(resourceTypes, "global_rule", "ssl", "plugin_metadata") @@ -176,7 +179,7 @@ func (d *api7eeProvider) Delete(ctx context.Context, obj client.Object) error { var resourceTypes []string var labels map[string]string switch obj.(type) { - case *gatewayv1.HTTPRoute, *apiv2.ApisixRoute: + case *gatewayv1.HTTPRoute, *apiv2.ApisixRoute, *gatewayv1.GRPCRoute: resourceTypes = append(resourceTypes, "service") labels = label.GenLabel(obj) case *gatewayv1.Gateway: diff --git a/internal/provider/api7ee/status.go b/internal/provider/api7ee/status.go index f4d090849..fd9ab2027 100644 --- a/internal/provider/api7ee/status.go +++ b/internal/provider/api7ee/status.go @@ -143,6 +143,41 @@ func (d *api7eeProvider) updateStatus(nnk types.NamespacedNameKind, condition me return cp }), }) + case types.KindGRPCRoute: + parentRefs := d.client.ConfigManager.GetConfigRefsByResourceKey(nnk) + log.Debugw("updating GRPCRoute status", zap.Any("parentRefs", parentRefs)) + gatewayRefs := map[types.NamespacedNameKind]struct{}{} + for _, parentRef := range parentRefs { + if parentRef.Kind == types.KindGateway { + gatewayRefs[parentRef] = struct{}{} + } + } + d.updater.Update(status.Update{ + NamespacedName: nnk.NamespacedName(), + Resource: &gatewayv1.GRPCRoute{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + cp := obj.(*gatewayv1.GRPCRoute).DeepCopy() + gatewayNs := cp.GetNamespace() + for i, ref := range cp.Status.Parents { + ns := gatewayNs + if ref.ParentRef.Namespace != nil { + ns = string(*ref.ParentRef.Namespace) + } + if ref.ParentRef.Kind == nil || *ref.ParentRef.Kind == types.KindGateway { + nnk := types.NamespacedNameKind{ + Name: string(ref.ParentRef.Name), + Namespace: ns, + Kind: types.KindGateway, + } + if _, ok := gatewayRefs[nnk]; ok { + ref.Conditions = cutils.MergeCondition(ref.Conditions, condition) + cp.Status.Parents[i] = ref + } + } + } + return cp + }), + }) } } diff --git a/internal/provider/apisix/provider.go b/internal/provider/apisix/provider.go index c45b515a5..8f7a8d4c9 100644 --- a/internal/provider/apisix/provider.go +++ b/internal/provider/apisix/provider.go @@ -105,6 +105,9 @@ func (d *apisixProvider) Update(ctx context.Context, tctx *provider.TranslateCon case *gatewayv1.HTTPRoute: result, err = d.translator.TranslateHTTPRoute(tctx, t.DeepCopy()) resourceTypes = append(resourceTypes, adctypes.TypeService) + case *gatewayv1.GRPCRoute: + result, err = d.translator.TranslateGRPCRoute(tctx, t.DeepCopy()) + resourceTypes = append(resourceTypes, adctypes.TypeService) case *gatewayv1.Gateway: result, err = d.translator.TranslateGateway(tctx, t.DeepCopy()) resourceTypes = append(resourceTypes, adctypes.TypeGlobalRule, adctypes.TypeSSL, adctypes.TypePluginMetadata) @@ -179,7 +182,7 @@ func (d *apisixProvider) Delete(ctx context.Context, obj client.Object) error { var resourceTypes []string var labels map[string]string switch obj.(type) { - case *gatewayv1.HTTPRoute, *apiv2.ApisixRoute: + case *gatewayv1.HTTPRoute, *apiv2.ApisixRoute, *gatewayv1.GRPCRoute: resourceTypes = append(resourceTypes, adctypes.TypeService) labels = label.GenLabel(obj) case *gatewayv1.Gateway: diff --git a/internal/provider/apisix/status.go b/internal/provider/apisix/status.go index 150079ee9..d4b1e8993 100644 --- a/internal/provider/apisix/status.go +++ b/internal/provider/apisix/status.go @@ -144,6 +144,41 @@ func (d *apisixProvider) updateStatus(nnk types.NamespacedNameKind, condition me return cp }), }) + case types.KindGRPCRoute: + parentRefs := d.client.ConfigManager.GetConfigRefsByResourceKey(nnk) + log.Debugw("updating GRPCRoute status", zap.Any("parentRefs", parentRefs)) + gatewayRefs := map[types.NamespacedNameKind]struct{}{} + for _, parentRef := range parentRefs { + if parentRef.Kind == types.KindGateway { + gatewayRefs[parentRef] = struct{}{} + } + } + d.updater.Update(status.Update{ + NamespacedName: nnk.NamespacedName(), + Resource: &gatewayv1.GRPCRoute{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + cp := obj.(*gatewayv1.GRPCRoute).DeepCopy() + gatewayNs := cp.GetNamespace() + for i, ref := range cp.Status.Parents { + ns := gatewayNs + if ref.ParentRef.Namespace != nil { + ns = string(*ref.ParentRef.Namespace) + } + if ref.ParentRef.Kind == nil || *ref.ParentRef.Kind == types.KindGateway { + nnk := types.NamespacedNameKind{ + Name: string(ref.ParentRef.Name), + Namespace: ns, + Kind: types.KindGateway, + } + if _, ok := gatewayRefs[nnk]; ok { + ref.Conditions = cutils.MergeCondition(ref.Conditions, condition) + cp.Status.Parents[i] = ref + } + } + } + return cp + }), + }) } } diff --git a/internal/provider/provider.go b/internal/provider/provider.go index ef93de541..ecab37eb7 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -45,6 +45,7 @@ type TranslateContext struct { BackendRefs []gatewayv1.BackendRef GatewayTLSConfig []gatewayv1.GatewayTLSConfig Credentials []v1alpha1.Credential + Listeners []gatewayv1.Listener EndpointSlices map[k8stypes.NamespacedName][]discoveryv1.EndpointSlice Secrets map[k8stypes.NamespacedName]*corev1.Secret diff --git a/internal/types/k8s.go b/internal/types/k8s.go index 39a9e2ffb..047fd2a8e 100644 --- a/internal/types/k8s.go +++ b/internal/types/k8s.go @@ -25,6 +25,7 @@ import ( kschema "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" "github.com/apache/apisix-ingress-controller/api/v1alpha1" v2 "github.com/apache/apisix-ingress-controller/api/v2" @@ -35,6 +36,7 @@ const DefaultIngressClassAnnotation = "ingressclass.kubernetes.io/is-default-cla const ( KindGateway = "Gateway" KindHTTPRoute = "HTTPRoute" + KindGRPCRoute = "GRPCRoute" KindGatewayClass = "GatewayClass" KindIngress = "Ingress" KindIngressClass = "IngressClass" @@ -60,6 +62,8 @@ func KindOf(obj any) string { return KindGateway case *gatewayv1.HTTPRoute: return KindHTTPRoute + case *gatewayv1.GRPCRoute: + return KindGRPCRoute case *gatewayv1.GatewayClass: return KindGatewayClass case *netv1.Ingress: @@ -120,8 +124,10 @@ func TypeList(gvk schema.GroupVersionKind) client.ObjectList { func GvkOf(obj any) schema.GroupVersionKind { kind := KindOf(obj) switch obj.(type) { - case *gatewayv1.Gateway, *gatewayv1.HTTPRoute, *gatewayv1.GatewayClass: + case *gatewayv1.Gateway, *gatewayv1.HTTPRoute, *gatewayv1.GatewayClass, *gatewayv1.GRPCRoute: return gatewayv1.SchemeGroupVersion.WithKind(kind) + case *gatewayv1beta1.ReferenceGrant: + return gatewayv1beta1.SchemeGroupVersion.WithKind(kind) case *netv1.Ingress, *netv1.IngressClass: return netv1.SchemeGroupVersion.WithKind(kind) case *netv1beta1.IngressClass: diff --git a/test/conformance/api7ee/conformance_test.go b/test/conformance/api7ee/conformance_test.go index 6ae824752..544363d7c 100644 --- a/test/conformance/api7ee/conformance_test.go +++ b/test/conformance/api7ee/conformance_test.go @@ -22,7 +22,6 @@ import ( "testing" "github.com/stretchr/testify/require" - "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/gateway-api/conformance" conformancev1 "sigs.k8s.io/gateway-api/conformance/apis/v1" "sigs.k8s.io/gateway-api/conformance/tests" @@ -51,7 +50,6 @@ func TestGatewayAPIConformance(t *testing.T) { URL: "https://github.com/apache/apisix-ingress-controller.git", Version: "v2.0.0", } - opts.ConformanceProfiles = sets.New(suite.GatewayHTTPConformanceProfileName) cSuite, err := suite.NewConformanceTestSuite(opts) require.NoError(t, err) diff --git a/test/e2e/framework/grpc.go b/test/e2e/framework/grpc.go new file mode 100644 index 000000000..a064a0754 --- /dev/null +++ b/test/e2e/framework/grpc.go @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package framework + +import ( + "bytes" + _ "embed" + "text/template" + "time" + + "github.com/gruntwork-io/terratest/modules/k8s" + "github.com/stretchr/testify/assert" +) + +var ( + //go:embed manifests/grpc-backend.yaml + _grpcBackendDeployment string + grpcBackendTpl *template.Template +) + +type GRPCBackendOpts struct { + KubectlOptions *k8s.KubectlOptions +} + +func init() { + tpl, err := template.New("grpc-backend").Parse(_grpcBackendDeployment) + if err != nil { + panic(err) + } + grpcBackendTpl = tpl +} + +func (f *Framework) DeployGRPCBackend(opts GRPCBackendOpts) { + if opts.KubectlOptions == nil { + opts.KubectlOptions = f.kubectlOpts + } + buf := bytes.NewBuffer(nil) + + err := grpcBackendTpl.Execute(buf, opts) + assert.Nil(f.GinkgoT, err, "rendering grpc backend spec") + + k8s.KubectlApplyFromString(f.GinkgoT, opts.KubectlOptions, buf.String()) + + k8s.WaitUntilDeploymentAvailable(f.GinkgoT, opts.KubectlOptions, "grpc-infra-backend-v1", 10, 10*time.Second) +} diff --git a/test/e2e/framework/manifests/grpc-backend.yaml b/test/e2e/framework/manifests/grpc-backend.yaml new file mode 100644 index 000000000..953c75b03 --- /dev/null +++ b/test/e2e/framework/manifests/grpc-backend.yaml @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +apiVersion: v1 +kind: Service +metadata: + name: grpc-infra-backend-v1 +spec: + selector: + app: grpc-infra-backend-v1 + ports: + - protocol: TCP + port: 8080 + targetPort: 3000 + appProtocol: kubernetes.io/h2c +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: grpc-infra-backend-v1 + labels: + app: grpc-infra-backend-v1 +spec: + replicas: 1 + selector: + matchLabels: + app: grpc-infra-backend-v1 + template: + metadata: + labels: + app: grpc-infra-backend-v1 + spec: + containers: + - name: grpc-infra-backend-v1 + image: gcr.io/k8s-staging-gateway-api/echo-basic:v20240412-v1.0.0-394-g40c666fd + env: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: GRPC_ECHO_SERVER + value: "1" + resources: + requests: + cpu: 10m diff --git a/test/e2e/framework/manifests/ingress.yaml b/test/e2e/framework/manifests/ingress.yaml index 94972cc8b..fa24398ed 100644 --- a/test/e2e/framework/manifests/ingress.yaml +++ b/test/e2e/framework/manifests/ingress.yaml @@ -81,51 +81,9 @@ rules: - "" resources: - namespaces - verbs: - - get - - list - - watch -- apiGroups: - - "" - resources: - - endpoints - verbs: - - get - - list - - watch -- apiGroups: - - "" - resources: + - pods - secrets - verbs: - - get - - list - - watch -- apiGroups: - - "" - resources: - services - - pods - verbs: - - get - - list - - watch -- apiGroups: - - coordination.k8s.io - resources: - - leases - verbs: - - create - - delete - - get - - list - - patch - - update - - watch -- apiGroups: - - discovery.k8s.io - resources: - - endpointslices verbs: - get - list @@ -164,70 +122,61 @@ rules: - get - update - apiGroups: - - gateway.networking.k8s.io + - coordination.k8s.io resources: - - gatewayclasses + - leases verbs: + - create + - delete - get - list + - patch - update - watch - apiGroups: - - gateway.networking.k8s.io - resources: - - gatewayclasses/status - verbs: - - get - - update -- apiGroups: - - gateway.networking.k8s.io + - discovery.k8s.io resources: - - gateways + - endpointslices verbs: - get - list - - update - watch - apiGroups: - gateway.networking.k8s.io resources: - - gateways/status - verbs: - - get - - update -- apiGroups: - - gateway.networking.k8s.io - resources: - - httproutes + - gatewayclasses verbs: - get - list + - update - watch - apiGroups: - gateway.networking.k8s.io resources: + - gatewayclasses/status + - gateways/status + - grpcroutes/status - httproutes/status + - referencegrants/status verbs: - get - update - apiGroups: - gateway.networking.k8s.io resources: + - gateways + - grpcroutes + - httproutes - referencegrants verbs: - get - list - watch -- apiGroups: - - gateway.networking.k8s.io - resources: - - referencegrants/status - verbs: - - get - apiGroups: - networking.k8s.io resources: - ingressclasses + - ingresses verbs: - get - list @@ -235,19 +184,18 @@ rules: - apiGroups: - networking.k8s.io resources: - - ingresses + - ingresses/status verbs: - get - - list - update - - watch - apiGroups: - - networking.k8s.io + - "" resources: - - ingresses/status + - endpoints verbs: - get - - update + - list + - watch --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole diff --git a/test/e2e/gatewayapi/grpcroute.go b/test/e2e/gatewayapi/grpcroute.go new file mode 100644 index 000000000..e4485fe08 --- /dev/null +++ b/test/e2e/gatewayapi/grpcroute.go @@ -0,0 +1,289 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package gatewayapi + +import ( + "fmt" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "google.golang.org/grpc/metadata" + pb "sigs.k8s.io/gateway-api/conformance/echo-basic/grpcechoserver" + + "github.com/apache/apisix-ingress-controller/test/e2e/scaffold" +) + +var _ = Describe("Test GRPCRoute", Label("networking.k8s.io", "grpcroute"), func() { + s := scaffold.NewDefaultScaffold() + + BeforeEach(func() { + By("deploy grpc backend") + s.DeployGRPCBackend() + + By("create GatewayProxy") + Expect(s.CreateResourceFromString(s.GetGatewayProxySpec())).NotTo(HaveOccurred(), "creating GatewayProxy") + + By("create GatewayClass") + Expect(s.CreateResourceFromString(s.GetGatewayClassYaml())).NotTo(HaveOccurred(), "creating GatewayClass") + + s.RetryAssertion(func() string { + gcyaml, _ := s.GetResourceYaml("GatewayClass", s.Namespace()) + return gcyaml + }).Should( + And( + ContainSubstring(`status: "True"`), + ContainSubstring("message: the gatewayclass has been accepted by the apisix-ingress-controller"), + ), + "check GatewayClass condition", + ) + + By("create Gateway") + Expect(s.CreateResourceFromString(s.GetGatewayYaml())).NotTo(HaveOccurred(), "creating Gateway") + + s.RetryAssertion(func() string { + gcyaml, _ := s.GetResourceYaml("Gateway", s.Namespace()) + return gcyaml + }).Should( + And( + ContainSubstring(`status: "True"`), + ContainSubstring("message: the gateway has been accepted by the apisix-ingress-controlle"), + ), + "check Gateway condition status", + ) + }) + + Context("GRPCRoute Filters", func() { + var reqHeaderModifyWithAdd = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: GRPCRoute +metadata: + name: req-header-modify +spec: + parentRefs: + - name: %s + rules: + - matches: + filters: + - type: RequestHeaderModifier + requestHeaderModifier: + add: + - name: X-Req-Add + value: "plugin-req-add" + set: + - name: X-Req-Set + value: "plugin-req-set" + remove: + - X-Req-Removed + backendRefs: + - name: grpc-infra-backend-v1 + port: 8080 +` + var respHeaderModifyWithAdd = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: GRPCRoute +metadata: + name: resp-header-modify +spec: + parentRefs: + - name: %s + rules: + - matches: + filters: + - type: ResponseHeaderModifier + responseHeaderModifier: + add: + - name: X-Resp-Add + value: "plugin-resp-add" + backendRefs: + - name: grpc-infra-backend-v1 + port: 8080 +` + It("GRPCRoute RequestHeaderModifier", func() { + By("create GRPCRoute") + s.ResourceApplied("GRPCRoute", "req-header-modify", fmt.Sprintf(reqHeaderModifyWithAdd, s.Namespace()), 1) + + testCases := []scaffold.ExpectedResponse{ + { + EchoRequest: &pb.EchoRequest{}, + }, + { + EchoRequest: &pb.EchoRequest{}, + Headers: map[string]string{ + "X-Req-Add": "plugin-req-add", + }, + }, + { + EchoRequest: &pb.EchoRequest{}, + RequestMetadata: &scaffold.RequestMetadata{ + Metadata: map[string]string{ + "X-Req-Set": "test-set", + }, + }, + Headers: map[string]string{ + "X-Req-Set": "plugin-req-set", + }, + }, + { + EchoRequest: &pb.EchoRequest{}, + RequestMetadata: &scaffold.RequestMetadata{ + Metadata: map[string]string{ + "X-Req-Removed": "to-be-removed", + }, + }, + Headers: map[string]string{ + "X-Req-Removed": "", + }, + }, + } + + for i := range testCases { + tc := testCases[i] + s.RetryAssertion(func() error { + return s.RequestEchoBackend(tc) + }).ShouldNot(HaveOccurred(), "request grpc backend") + } + }) + + It("GRPCRoute ResponseHeaderModifier", func() { + By("create GRPCRoute") + s.ResourceApplied("GRPCRoute", "resp-header-modify", fmt.Sprintf(respHeaderModifyWithAdd, s.Namespace()), 1) + + testCases := []scaffold.ExpectedResponse{ + { + EchoRequest: &pb.EchoRequest{}, + }, + { + EchoRequest: &pb.EchoRequest{}, + EchoResponse: scaffold.EchoResponse{ + Headers: &metadata.MD{ + "X-Resp-Add": []string{"plugin-resp-add"}, + }, + }, + }, + } + + for i := range testCases { + tc := testCases[i] + s.RetryAssertion(func() error { + return s.RequestEchoBackend(tc) + }).ShouldNot(HaveOccurred(), "request grpc backend") + } + }) + + It("GRPCRoute ExtensionRef", func() { + var rewritePlugin = ` +apiVersion: apisix.apache.org/v1alpha1 +kind: PluginConfig +metadata: + name: rewrite +spec: + plugins: + - name: proxy-rewrite + config: + headers: + add: + x-req-add: "plugin-req-add" +` + var rewritePluginUpdate = ` +apiVersion: apisix.apache.org/v1alpha1 +kind: PluginConfig +metadata: + name: rewrite +spec: + plugins: + - name: proxy-rewrite + config: + headers: + add: + x-req-add: "plugin-req-add-v2" +` + var extensionRefRewritePlugin = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: GRPCRoute +metadata: + name: rewrite +spec: + parentRefs: + - name: %s + rules: + - matches: + filters: + - type: ExtensionRef + extensionRef: + group: apisix.apache.org + kind: PluginConfig + name: rewrite + backendRefs: + - name: grpc-infra-backend-v1 + port: 8080 +` + Expect(s.CreateResourceFromString(rewritePlugin)).NotTo(HaveOccurred(), "creating PluginConfig") + s.ResourceApplied("GRPCRoute", "rewrite", fmt.Sprintf(extensionRefRewritePlugin, s.Namespace()), 1) + + testCases := []struct { + scaffold.ExpectedResponse + Helper func() + }{ + { + ExpectedResponse: scaffold.ExpectedResponse{ + EchoRequest: &pb.EchoRequest{}, + }, + }, + { + ExpectedResponse: scaffold.ExpectedResponse{ + EchoRequest: &pb.EchoRequest{}, + Headers: map[string]string{ + "x-req-add": "plugin-req-add", + }, + }, + }, + { + ExpectedResponse: scaffold.ExpectedResponse{ + EchoRequest: &pb.EchoRequest{}, + Headers: map[string]string{ + "x-req-add": "plugin-req-add-v2", + }, + }, + Helper: func() { + Expect(s.CreateResourceFromString(rewritePluginUpdate)).NotTo(HaveOccurred(), "updating PluginConfig") + }, + }, + } + + for i := range testCases { + if testCases[i].Helper != nil { + testCases[i].Helper() + } + tc := testCases[i].ExpectedResponse + s.RetryAssertion(func() error { + return s.RequestEchoBackend(tc) + }).ShouldNot(HaveOccurred(), "request grpc backend") + } + }) + + // TODO: add GRPCRoute RequestMirror test + /* + It("GRPCRoute RequestMirror", func() {}) + */ + }) + + // TODO: add BackendTrafficPolicy test + /* + Context("GRPCRoute With BackendTrafficPolicy", func() {}) + */ +}) diff --git a/test/e2e/scaffold/grpc.go b/test/e2e/scaffold/grpc.go new file mode 100644 index 000000000..5c11bad9f --- /dev/null +++ b/test/e2e/scaffold/grpc.go @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package scaffold + +import ( + "context" + _ "embed" + "fmt" + "strings" + "time" + + "github.com/apache/apisix-ingress-controller/test/e2e/framework" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + pb "sigs.k8s.io/gateway-api/conformance/echo-basic/grpcechoserver" +) + +type RequestMetadata struct { + // The :authority pseudoheader to set on the outgoing request. + Authority string + + // Outgoing metadata pairs to add to the request. + Metadata map[string]string +} + +type ExpectedResponse struct { + EchoRequest *pb.EchoRequest + EchoTwoRequest *pb.EchoRequest + EchoThreeRequest *pb.EchoRequest + + RequestMetadata *RequestMetadata + + Headers map[string]string + + EchoResponse EchoResponse +} + +type EchoResponse struct { + Code codes.Code + Headers *metadata.MD + Trailers *metadata.MD + Response *pb.EchoResponse +} + +func (s *Scaffold) DeployGRPCBackend() { + s.Framework.DeployGRPCBackend(framework.GRPCBackendOpts{ + KubectlOptions: s.kubectlOptions, + }) +} + +func (s *Scaffold) RequestEchoBackend(exp ExpectedResponse) error { + endpoint := s.apisixTunnels.HTTP.Endpoint() + if framework.ProviderType == framework.ProviderTypeAPI7EE { + endpoint = s.apisixTunnels.HTTP2.Endpoint() + } + + endpoint = strings.Replace(endpoint, "localhost", "127.0.0.1", 1) + + dialOpts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} + if exp.RequestMetadata != nil && exp.RequestMetadata.Authority != "" { + dialOpts = append(dialOpts, grpc.WithAuthority(exp.RequestMetadata.Authority)) + } + conn, err := grpc.NewClient(endpoint, dialOpts...) + if err != nil { + return err + } + defer func() { _ = conn.Close() }() + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + if exp.RequestMetadata != nil && len(exp.RequestMetadata.Metadata) > 0 { + ctx = metadata.NewOutgoingContext(ctx, metadata.New(exp.RequestMetadata.Metadata)) + } + + var ( + resp = &EchoResponse{ + Headers: &metadata.MD{}, + Trailers: &metadata.MD{}, + } + ) + + client := pb.NewGrpcEchoClient(conn) + switch { + case exp.EchoRequest != nil: + resp.Response, err = client.Echo(ctx, exp.EchoRequest, grpc.Header(resp.Headers), grpc.Trailer(resp.Trailers)) + case exp.EchoTwoRequest != nil: + resp.Response, err = client.EchoTwo(ctx, exp.EchoTwoRequest, grpc.Header(resp.Headers), grpc.Trailer(resp.Trailers)) + case exp.EchoThreeRequest != nil: + resp.Response, err = client.EchoThree(ctx, exp.EchoThreeRequest, grpc.Header(resp.Headers), grpc.Trailer(resp.Trailers)) + } + if err != nil { + resp.Code = status.Code(err) + fmt.Printf("RPC finished with error: %v\n", err) + } else { + resp.Code = codes.OK + } + if err := expectEchoResponses(&exp, resp); err != nil { + return err + } + return nil +} + +func expectEchoResponses(expected *ExpectedResponse, actual *EchoResponse) error { + if expected.EchoResponse.Code != actual.Code { + return fmt.Errorf("expected status code to be %s (%d), but got %s (%d)", + expected.EchoResponse.Code.String(), + expected.EchoResponse.Code, + actual.Code.String(), + actual.Code, + ) + } + if expected.EchoResponse.Headers != nil { + for key, values := range *expected.EchoResponse.Headers { + actualValues := actual.Headers.Get(key) + if len(values) != len(actualValues) { + return fmt.Errorf("expected header %q to have %d values, but got %d", key, len(values), len(actualValues)) + } + for i, v := range values { + if actualValues[i] != v { + return fmt.Errorf("expected header %q to have value %q, but got %q", key, v, actualValues[i]) + } + } + } + } + if len(expected.Headers) > 0 { + msgHeaders := actual.Response.GetAssertions().GetHeaders() + + kv := make(map[string]string) + for _, header := range msgHeaders { + kv[header.GetKey()] = header.GetValue() + } + for key, value := range expected.Headers { + actualValue, ok := kv[strings.ToLower(key)] + if !ok { + if value != "" { + return fmt.Errorf("expected header %q to be present, but not found", key) + } + continue + } + if actualValue != value { + return fmt.Errorf("expected header %q to be %q, but got %q", key, value, actualValue) + } + } + } + return nil +} diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go index 0d5239a4d..cb47afd0f 100644 --- a/test/e2e/scaffold/scaffold.go +++ b/test/e2e/scaffold/scaffold.go @@ -85,6 +85,7 @@ type Tunnels struct { HTTP *k8s.Tunnel HTTPS *k8s.Tunnel TCP *k8s.Tunnel + HTTP2 *k8s.Tunnel } func (t *Tunnels) Close() { @@ -100,6 +101,10 @@ func (t *Tunnels) Close() { t.safeClose(t.TCP.Close) t.TCP = nil } + if t.HTTP2 != nil { + t.safeClose(t.HTTP2.Close) + t.HTTP2 = nil + } } func (t *Tunnels) safeClose(close func()) { @@ -349,6 +354,7 @@ func (s *Scaffold) createDataplaneTunnels( httpPort int httpsPort int tcpPort int + http2Port int ) for _, port := range svc.Spec.Ports { @@ -359,6 +365,8 @@ func (s *Scaffold) createDataplaneTunnels( httpsPort = int(port.Port) case apiv2.SchemeTCP: tcpPort = int(port.Port) + case "http2": + http2Port = int(port.Port) } } @@ -371,6 +379,8 @@ func (s *Scaffold) createDataplaneTunnels( 0, httpsPort) tcpTunnel := k8s.NewTunnel(kubectlOpts, k8s.ResourceTypeService, serviceName, 0, tcpPort) + http2Tunnel := k8s.NewTunnel(kubectlOpts, k8s.ResourceTypeService, serviceName, + 0, http2Port) if err := httpTunnel.ForwardPortE(s.t); err != nil { return nil, err @@ -387,6 +397,13 @@ func (s *Scaffold) createDataplaneTunnels( } tunnels.TCP = tcpTunnel + if http2Port != 0 { + if err := http2Tunnel.ForwardPortE(s.t); err != nil { + return nil, err + } + tunnels.HTTP2 = http2Tunnel + } + return tunnels, nil }