Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ GO_LDFLAGS ?= "-X=$(VERSYM)=$(VERSION) -X=$(GITSHASYM)=$(GITSHA) -X=$(BUILDOSSYM
# gateway-api
GATEAY_API_VERSION ?= v1.3.0
## https://github.com/kubernetes-sigs/gateway-api/blob/v1.3.0/pkg/features/httproute.go
SUPPORTED_EXTENDED_FEATURES = "HTTPRouteDestinationPortMatching,HTTPRouteMethodMatching,HTTPRoutePortRedirect,HTTPRouteRequestMirror,HTTPRouteSchemeRedirect,GatewayAddressEmpty,HTTPRouteResponseHeaderModification,GatewayPort8080,HTTPRouteHostRewrite,HTTPRouteQueryParamMatching,HTTPRoutePathRewrite"
SUPPORTED_EXTENDED_FEATURES = "HTTPRouteDestinationPortMatching,HTTPRouteMethodMatching,HTTPRoutePortRedirect,HTTPRouteRequestMirror,HTTPRouteSchemeRedirect,GatewayAddressEmpty,HTTPRouteResponseHeaderModification,GatewayPort8080,HTTPRouteHostRewrite,HTTPRouteQueryParamMatching,HTTPRoutePathRewrite,HTTPRouteBackendProtocolWebSocket"
CONFORMANCE_TEST_REPORT_OUTPUT ?= $(DIR)/apisix-ingress-controller-conformance-report.yaml
## https://github.com/kubernetes-sigs/gateway-api/blob/v1.3.0/conformance/utils/suite/profiles.go
CONFORMANCE_PROFILES ?= GATEWAY-HTTP,GATEWAY-GRPC,GATEWAY-TLS
Expand Down
2 changes: 1 addition & 1 deletion api/v2/apisixroute_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type ApisixRouteHTTP struct {

// Websocket enables or disables websocket support for this route.
// +kubebuilder:validation:Optional
Websocket bool `json:"websocket" yaml:"websocket"`
Websocket *bool `json:"websocket" yaml:"websocket"`
// PluginConfigName specifies the name of the plugin config to apply.
PluginConfigName string `json:"plugin_config_name,omitempty" yaml:"plugin_config_name,omitempty"`
// PluginConfigNamespace specifies the namespace of the plugin config.
Expand Down
5 changes: 5 additions & 0 deletions api/v2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

88 changes: 62 additions & 26 deletions internal/adc/translator/apisixroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"strconv"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
Expand Down Expand Up @@ -70,9 +70,11 @@ func (t *Translator) translateHTTPRule(tctx *provider.TranslateContext, ar *apiv
return nil, err
}

var enableWebsocket *bool
service := t.buildService(ar, rule, ruleIndex)
t.buildRoute(ar, service, rule, plugins, timeout, vars)
t.buildUpstream(tctx, service, ar, rule, ruleIndex)
// should build upstream before route because route needs to know the enableWebsocket flag
t.buildUpstream(tctx, service, ar, rule, ruleIndex, &enableWebsocket)
t.buildRoute(ar, service, rule, plugins, timeout, vars, &enableWebsocket)

return service, nil
}
Expand Down Expand Up @@ -139,7 +141,7 @@ func (t *Translator) loadRoutePlugins(tctx *provider.TranslateContext, ar *apiv2
}
}

func (t *Translator) buildPluginConfig(plugin apiv2.ApisixRoutePlugin, namespace string, secrets map[types.NamespacedName]*v1.Secret) map[string]any {
func (t *Translator) buildPluginConfig(plugin apiv2.ApisixRoutePlugin, namespace string, secrets map[types.NamespacedName]*corev1.Secret) map[string]any {
config := make(map[string]any)
if len(plugin.Config.Raw) > 0 {
if err := json.Unmarshal(plugin.Config.Raw, &config); err != nil {
Expand Down Expand Up @@ -179,13 +181,16 @@ func (t *Translator) addAuthenticationPlugins(rule apiv2.ApisixRouteHTTP, plugin
}
}

func (t *Translator) buildRoute(ar *apiv2.ApisixRoute, service *adc.Service, rule apiv2.ApisixRouteHTTP, plugins adc.Plugins, timeout *adc.Timeout, vars adc.Vars) {
func (t *Translator) buildRoute(ar *apiv2.ApisixRoute, service *adc.Service, rule apiv2.ApisixRouteHTTP, plugins adc.Plugins, timeout *adc.Timeout, vars adc.Vars, enableWebsocket **bool) {
route := adc.NewDefaultRoute()
route.Name = adc.ComposeRouteName(ar.Namespace, ar.Name, rule.Name)
route.ID = id.GenID(route.Name)
route.Desc = "Created by apisix-ingress-controller, DO NOT modify it manually"
route.Labels = label.GenLabel(ar)
route.EnableWebsocket = ptr.To(rule.Websocket)
route.EnableWebsocket = rule.Websocket
if route.EnableWebsocket == nil && *enableWebsocket != nil {
route.EnableWebsocket = *enableWebsocket
}
route.FilterFunc = rule.Match.FilterFunc
route.Hosts = rule.Match.Hosts
route.Methods = rule.Match.Methods
Expand All @@ -202,7 +207,7 @@ func (t *Translator) buildRoute(ar *apiv2.ApisixRoute, service *adc.Service, rul
service.Routes = []*adc.Route{route}
}

func (t *Translator) buildUpstream(tctx *provider.TranslateContext, service *adc.Service, ar *apiv2.ApisixRoute, rule apiv2.ApisixRouteHTTP, ruleIndex int) {
func (t *Translator) buildUpstream(tctx *provider.TranslateContext, service *adc.Service, ar *apiv2.ApisixRoute, rule apiv2.ApisixRouteHTTP, ruleIndex int, enableWebsocket **bool) {
var (
upstreams = make([]*adc.Upstream, 0)
weightedUpstreams = make([]adc.TrafficSplitConfigRuleWeightedUpstream, 0)
Expand All @@ -211,7 +216,7 @@ func (t *Translator) buildUpstream(tctx *provider.TranslateContext, service *adc
for backendIndex, backend := range rule.Backends {
// try to get the apisixupstream with the same name as the backend service to be upstream config.
// err is ignored because it does not care about the externalNodes of the apisixupstream.
upstream, err := t.translateApisixRouteHTTPBackend(tctx, ar, backend)
upstream, err := t.translateApisixRouteHTTPBackend(tctx, ar, backend, enableWebsocket)
if err != nil {
t.Log.Error(err, "failed to translate ApisixRoute backend", "backend", backend)
continue
Expand Down Expand Up @@ -312,7 +317,7 @@ func (t *Translator) buildService(ar *apiv2.ApisixRoute, rule apiv2.ApisixRouteH
return service
}

func getPortFromService(svc *v1.Service, backendSvcPort intstr.IntOrString) (int32, error) {
func getPortFromService(svc *corev1.Service, backendSvcPort intstr.IntOrString) (int32, error) {
var port int32
if backendSvcPort.Type == intstr.Int {
port = int32(backendSvcPort.IntValue())
Expand All @@ -332,7 +337,31 @@ func getPortFromService(svc *v1.Service, backendSvcPort intstr.IntOrString) (int
return port, nil
}

func (t *Translator) translateApisixRouteHTTPBackend(tctx *provider.TranslateContext, ar *apiv2.ApisixRoute, backend apiv2.ApisixRouteHTTPBackend) (*adc.Upstream, error) {
func findMatchingServicePort(svc *corev1.Service, backendSvcPort intstr.IntOrString) (*corev1.ServicePort, error) {
var servicePort *corev1.ServicePort
var portNumber int32 = -1
var servicePortName string
switch backendSvcPort.Type {
case intstr.Int:
portNumber = backendSvcPort.IntVal
case intstr.String:
servicePortName = backendSvcPort.StrVal
}
for _, svcPort := range svc.Spec.Ports {
p := svcPort
if p.Port == portNumber || (p.Name != "" && p.Name == servicePortName) {
servicePort = &p
break
}
}
if servicePort == nil {
return nil, errors.Errorf("service port %s not found in service %s", backendSvcPort.String(), svc.Name)
}

return servicePort, nil
}

func (t *Translator) translateApisixRouteHTTPBackend(tctx *provider.TranslateContext, ar *apiv2.ApisixRoute, backend apiv2.ApisixRouteHTTPBackend, enableWebsocket **bool) (*adc.Upstream, error) {
auNN := types.NamespacedName{
Namespace: ar.Namespace,
Name: backend.ServiceName,
Expand All @@ -354,50 +383,57 @@ func (t *Translator) translateApisixRouteHTTPBackend(tctx *provider.TranslateCon
upstream = u
}
var (
err error
nodes adc.UpstreamNodes
err error
nodes adc.UpstreamNodes
protocol string
)
if backend.ResolveGranularity == apiv2.ResolveGranularityService {
nodes, err = t.translateApisixRouteBackendResolveGranularityService(tctx, auNN, backend)
nodes, protocol, err = t.translateApisixRouteBackendResolveGranularityService(tctx, auNN, backend)
} else {
nodes, err = t.translateApisixRouteBackendResolveGranularityEndpoint(tctx, auNN, backend)
nodes, protocol, err = t.translateApisixRouteBackendResolveGranularityEndpoint(tctx, auNN, backend)
}
if err != nil {
return nil, err
}
upstream.Nodes = nodes
if upstream.Scheme == "" {
upstream.Scheme = appProtocolToUpstreamScheme(protocol)
}
if protocol == internaltypes.AppProtocolWS || protocol == internaltypes.AppProtocolWSS {
*enableWebsocket = ptr.To(true)
}
if backend.Weight != nil {
upstream.Labels["meta_weight"] = strconv.FormatInt(int64(*backend.Weight), 10)
}
return upstream, nil
}

func (t *Translator) translateApisixRouteBackendResolveGranularityService(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, error) {
func (t *Translator) translateApisixRouteBackendResolveGranularityService(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, string, error) {
serviceNN := types.NamespacedName{
Namespace: arNN.Namespace,
Name: backend.ServiceName,
}
svc, ok := tctx.Services[serviceNN]
if !ok {
return nil, errors.Errorf("service not found, ApisixRoute: %s, Service: %s", arNN, serviceNN)
return nil, "", errors.Errorf("service not found, ApisixRoute: %s, Service: %s", arNN, serviceNN)
}
if svc.Spec.ClusterIP == "" {
return nil, errors.Errorf("conflict headless service and backend resolve granularity, ApisixRoute: %s, Service: %s", arNN, serviceNN)
return nil, "", errors.Errorf("conflict headless service and backend resolve granularity, ApisixRoute: %s, Service: %s", arNN, serviceNN)
}
port, err := getPortFromService(svc, backend.ServicePort)
port, err := findMatchingServicePort(svc, backend.ServicePort)
if err != nil {
return nil, err
return nil, "", err
}
return adc.UpstreamNodes{
{
Host: svc.Spec.ClusterIP,
Port: int(port),
Port: int(port.Port),
Weight: *cmp.Or(backend.Weight, ptr.To(apiv2.DefaultWeight)),
},
}, nil
}, ptr.Deref(port.AppProtocol, ""), nil
}

func (t *Translator) translateApisixRouteStreamBackendResolveGranularity(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteStreamBackend) (adc.UpstreamNodes, error) {
func (t *Translator) translateApisixRouteStreamBackendResolveGranularity(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteStreamBackend) (adc.UpstreamNodes, string, error) {
tsBackend := apiv2.ApisixRouteHTTPBackend{
ServiceName: backend.ServiceName,
ServicePort: backend.ServicePort,
Expand All @@ -411,18 +447,18 @@ func (t *Translator) translateApisixRouteStreamBackendResolveGranularity(tctx *p
}
}

func (t *Translator) translateApisixRouteBackendResolveGranularityEndpoint(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, error) {
func (t *Translator) translateApisixRouteBackendResolveGranularityEndpoint(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, string, error) {
serviceNN := types.NamespacedName{
Namespace: arNN.Namespace,
Name: backend.ServiceName,
}
svc, ok := tctx.Services[serviceNN]
if !ok {
return nil, errors.Errorf("service not found, ApisixRoute: %s, Service: %s", arNN, serviceNN)
return nil, "", errors.Errorf("service not found, ApisixRoute: %s, Service: %s", arNN, serviceNN)
}
port, err := getPortFromService(svc, backend.ServicePort)
if err != nil {
return nil, err
return nil, "", err
}
weight := int32(*cmp.Or(backend.Weight, ptr.To(apiv2.DefaultWeight)))
backendRef := gatewayv1.BackendRef{
Expand Down Expand Up @@ -484,7 +520,7 @@ func (t *Translator) translateApisixRouteStreamBackend(tctx *provider.TranslateC
}
upstream = u
}
nodes, err := t.translateApisixRouteStreamBackendResolveGranularity(tctx, utils.NamespacedName(ar), backend)
nodes, _, err := t.translateApisixRouteStreamBackendResolveGranularity(tctx, utils.NamespacedName(ar), backend)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/adc/translator/gatewayproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (t *Translator) TranslateGatewayProxyToConfig(tctx *provider.TranslateConte
if endpoint == nil {
return nil, nil
}
upstreamNodes, err := t.TranslateBackendRefWithFilter(tctx, gatewayv1.BackendRef{
upstreamNodes, _, err := t.TranslateBackendRefWithFilter(tctx, gatewayv1.BackendRef{
BackendObjectReference: gatewayv1.BackendObjectReference{
Name: gatewayv1.ObjectName(provider.ControlPlane.Service.Name),
Namespace: (*gatewayv1.Namespace)(&gatewayProxy.Namespace),
Expand Down
2 changes: 1 addition & 1 deletion internal/adc/translator/grpcroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (t *Translator) TranslateGRPCRoute(tctx *provider.TranslateContext, grpcRou
backend.Namespace = &namespace
}
upstream := adctypes.NewDefaultUpstream()
upNodes, err := t.translateBackendRef(tctx, backend.BackendRef, DefaultEndpointFilter)
upNodes, _, err := t.translateBackendRef(tctx, backend.BackendRef, DefaultEndpointFilter)
if err != nil {
backendErr = err
continue
Expand Down
43 changes: 33 additions & 10 deletions internal/adc/translator/httproute.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,13 +392,15 @@ func DefaultEndpointFilter(endpoint *discoveryv1.Endpoint) bool {
return true
}

func (t *Translator) TranslateBackendRefWithFilter(tctx *provider.TranslateContext, ref gatewayv1.BackendRef, endpointFilter func(*discoveryv1.Endpoint) bool) (adctypes.UpstreamNodes, error) {
func (t *Translator) TranslateBackendRefWithFilter(tctx *provider.TranslateContext, ref gatewayv1.BackendRef, endpointFilter func(*discoveryv1.Endpoint) bool) (adctypes.UpstreamNodes, string, error) {
return t.translateBackendRef(tctx, ref, endpointFilter)
}

func (t *Translator) translateBackendRef(tctx *provider.TranslateContext, ref gatewayv1.BackendRef, endpointFilter func(*discoveryv1.Endpoint) bool) (adctypes.UpstreamNodes, error) {
func (t *Translator) translateBackendRef(tctx *provider.TranslateContext, ref gatewayv1.BackendRef, endpointFilter func(*discoveryv1.Endpoint) bool) (adctypes.UpstreamNodes, string, error) {
nodes := adctypes.UpstreamNodes{}
var protocol string
if ref.Kind != nil && *ref.Kind != internaltypes.KindService {
return adctypes.UpstreamNodes{}, fmt.Errorf("kind %s is not supported", *ref.Kind)
return nodes, protocol, fmt.Errorf("kind %s is not supported", *ref.Kind)
}

key := types.NamespacedName{
Expand All @@ -407,7 +409,7 @@ func (t *Translator) translateBackendRef(tctx *provider.TranslateContext, ref ga
}
service, ok := tctx.Services[key]
if !ok {
return adctypes.UpstreamNodes{}, fmt.Errorf("service %s not found", key)
return nodes, protocol, fmt.Errorf("service %s not found", key)
}

weight := 1
Expand All @@ -426,24 +428,26 @@ func (t *Translator) translateBackendRef(tctx *provider.TranslateContext, ref ga
Port: port,
Weight: weight,
},
}, nil
}, protocol, nil
}

var portName *string
if ref.Port != nil {
for _, p := range service.Spec.Ports {
if int(p.Port) == int(*ref.Port) {
portName = ptr.To(p.Name)
protocol = ptr.Deref(p.AppProtocol, "")
break
}
}
if portName == nil {
return adctypes.UpstreamNodes{}, nil
return adctypes.UpstreamNodes{}, protocol, nil
}
}

endpointSlices := tctx.EndpointSlices[key]
return t.translateEndpointSlice(portName, weight, endpointSlices, endpointFilter), nil
nodes = t.translateEndpointSlice(portName, weight, endpointSlices, endpointFilter)
return nodes, protocol, nil
}

// calculateHTTPRoutePriority calculates the priority of the HTTP route.
Expand Down Expand Up @@ -545,6 +549,7 @@ func (t *Translator) TranslateHTTPRoute(tctx *provider.TranslateContext, httpRou
upstreams = make([]*adctypes.Upstream, 0)
weightedUpstreams = make([]adctypes.TrafficSplitConfigRuleWeightedUpstream, 0)
backendErr error
enableWebsocket *bool
)

for _, backend := range rule.BackendRefs {
Expand All @@ -553,18 +558,21 @@ func (t *Translator) TranslateHTTPRoute(tctx *provider.TranslateContext, httpRou
backend.Namespace = &namespace
}
upstream := adctypes.NewDefaultUpstream()
upNodes, err := t.translateBackendRef(tctx, backend.BackendRef, DefaultEndpointFilter)
upNodes, protocol, err := t.translateBackendRef(tctx, backend.BackendRef, DefaultEndpointFilter)
if err != nil {
backendErr = err
continue
}
if len(upNodes) == 0 {
continue
}
if protocol == internaltypes.AppProtocolWS || protocol == internaltypes.AppProtocolWSS {
enableWebsocket = ptr.To(true)
}

t.AttachBackendTrafficPolicyToUpstream(backend.BackendRef, tctx.BackendTrafficPolicies, upstream)
upstream.Nodes = upNodes

upstream.Scheme = appProtocolToUpstreamScheme(protocol)
var (
kind string
port int32
Expand Down Expand Up @@ -685,7 +693,7 @@ func (t *Translator) TranslateHTTPRoute(tctx *provider.TranslateContext, httpRou
route.Name = name
route.ID = id.GenID(name)
route.Labels = labels
route.EnableWebsocket = ptr.To(true)
route.EnableWebsocket = enableWebsocket

// Set the route priority
priority := calculateHTTPRoutePriority(&match, ruleIndex, hosts)
Expand Down Expand Up @@ -827,3 +835,18 @@ func (t *Translator) translateHTTPRouteHeaderMatchToVars(header gatewayv1.HTTPHe
}
return HeaderMatchToVars(matchType, string(header.Name), header.Value)
}

func appProtocolToUpstreamScheme(appProtocol string) string {
switch appProtocol {
case internaltypes.AppProtocolHTTP:
return apiv2.SchemeHTTP
case internaltypes.AppProtocolHTTPS:
return apiv2.SchemeHTTPS
case internaltypes.AppProtocolWS:
return apiv2.SchemeHTTP
case internaltypes.AppProtocolWSS:
return apiv2.SchemeHTTPS
default:
return ""
}
}
Loading
Loading