Skip to content

Commit f28b342

Browse files
authored
feat: support resolve svc.ports[].appProtocol (#2601)
1 parent 3c808e2 commit f28b342

File tree

16 files changed

+699
-164
lines changed

16 files changed

+699
-164
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ GO_LDFLAGS ?= "-X=$(VERSYM)=$(VERSION) -X=$(GITSHASYM)=$(GITSHA) -X=$(BUILDOSSYM
5252
# gateway-api
5353
GATEAY_API_VERSION ?= v1.3.0
5454
## https://github.com/kubernetes-sigs/gateway-api/blob/v1.3.0/pkg/features/httproute.go
55-
SUPPORTED_EXTENDED_FEATURES = "HTTPRouteDestinationPortMatching,HTTPRouteMethodMatching,HTTPRoutePortRedirect,HTTPRouteRequestMirror,HTTPRouteSchemeRedirect,GatewayAddressEmpty,HTTPRouteResponseHeaderModification,GatewayPort8080,HTTPRouteHostRewrite,HTTPRouteQueryParamMatching,HTTPRoutePathRewrite"
55+
SUPPORTED_EXTENDED_FEATURES = "HTTPRouteDestinationPortMatching,HTTPRouteMethodMatching,HTTPRoutePortRedirect,HTTPRouteRequestMirror,HTTPRouteSchemeRedirect,GatewayAddressEmpty,HTTPRouteResponseHeaderModification,GatewayPort8080,HTTPRouteHostRewrite,HTTPRouteQueryParamMatching,HTTPRoutePathRewrite,HTTPRouteBackendProtocolWebSocket"
5656
CONFORMANCE_TEST_REPORT_OUTPUT ?= $(DIR)/apisix-ingress-controller-conformance-report.yaml
5757
## https://github.com/kubernetes-sigs/gateway-api/blob/v1.3.0/conformance/utils/suite/profiles.go
5858
CONFORMANCE_PROFILES ?= GATEWAY-HTTP,GATEWAY-GRPC,GATEWAY-TLS

api/v2/apisixroute_types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ type ApisixRouteHTTP struct {
100100

101101
// Websocket enables or disables websocket support for this route.
102102
// +kubebuilder:validation:Optional
103-
Websocket bool `json:"websocket" yaml:"websocket"`
103+
Websocket *bool `json:"websocket" yaml:"websocket"`
104104
// PluginConfigName specifies the name of the plugin config to apply.
105105
PluginConfigName string `json:"plugin_config_name,omitempty" yaml:"plugin_config_name,omitempty"`
106106
// PluginConfigNamespace specifies the namespace of the plugin config.

api/v2/zz_generated.deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/adc/translator/apisixroute.go

Lines changed: 61 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"strconv"
2525

2626
"github.com/pkg/errors"
27-
v1 "k8s.io/api/core/v1"
27+
corev1 "k8s.io/api/core/v1"
2828
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2929
"k8s.io/apimachinery/pkg/types"
3030
"k8s.io/apimachinery/pkg/util/intstr"
@@ -70,10 +70,10 @@ func (t *Translator) translateHTTPRule(tctx *provider.TranslateContext, ar *apiv
7070
return nil, err
7171
}
7272

73+
var enableWebsocket *bool
7374
service := t.buildService(ar, rule, ruleIndex)
74-
t.buildRoute(ar, service, rule, plugins, timeout, vars)
75-
t.buildUpstream(tctx, service, ar, rule)
76-
75+
t.buildUpstream(tctx, service, ar, rule, &enableWebsocket)
76+
t.buildRoute(ar, service, rule, plugins, timeout, vars, &enableWebsocket)
7777
return service, nil
7878
}
7979

@@ -139,7 +139,7 @@ func (t *Translator) loadRoutePlugins(tctx *provider.TranslateContext, ar *apiv2
139139
}
140140
}
141141

142-
func (t *Translator) buildPluginConfig(plugin apiv2.ApisixRoutePlugin, namespace string, secrets map[types.NamespacedName]*v1.Secret) map[string]any {
142+
func (t *Translator) buildPluginConfig(plugin apiv2.ApisixRoutePlugin, namespace string, secrets map[types.NamespacedName]*corev1.Secret) map[string]any {
143143
config := make(map[string]any)
144144
if len(plugin.Config.Raw) > 0 {
145145
if err := json.Unmarshal(plugin.Config.Raw, &config); err != nil {
@@ -179,13 +179,16 @@ func (t *Translator) addAuthenticationPlugins(rule apiv2.ApisixRouteHTTP, plugin
179179
}
180180
}
181181

182-
func (t *Translator) buildRoute(ar *apiv2.ApisixRoute, service *adc.Service, rule apiv2.ApisixRouteHTTP, plugins adc.Plugins, timeout *adc.Timeout, vars adc.Vars) {
182+
func (t *Translator) buildRoute(ar *apiv2.ApisixRoute, service *adc.Service, rule apiv2.ApisixRouteHTTP, plugins adc.Plugins, timeout *adc.Timeout, vars adc.Vars, enableWebsocket **bool) {
183183
route := adc.NewDefaultRoute()
184184
route.Name = adc.ComposeRouteName(ar.Namespace, ar.Name, rule.Name)
185185
route.ID = id.GenID(route.Name)
186186
route.Desc = "Created by apisix-ingress-controller, DO NOT modify it manually"
187187
route.Labels = label.GenLabel(ar)
188-
route.EnableWebsocket = ptr.To(rule.Websocket)
188+
route.EnableWebsocket = rule.Websocket
189+
if route.EnableWebsocket == nil && *enableWebsocket != nil {
190+
route.EnableWebsocket = *enableWebsocket
191+
}
189192
route.FilterFunc = rule.Match.FilterFunc
190193
route.Hosts = rule.Match.Hosts
191194
route.Methods = rule.Match.Methods
@@ -202,7 +205,7 @@ func (t *Translator) buildRoute(ar *apiv2.ApisixRoute, service *adc.Service, rul
202205
service.Routes = []*adc.Route{route}
203206
}
204207

205-
func (t *Translator) buildUpstream(tctx *provider.TranslateContext, service *adc.Service, ar *apiv2.ApisixRoute, rule apiv2.ApisixRouteHTTP) {
208+
func (t *Translator) buildUpstream(tctx *provider.TranslateContext, service *adc.Service, ar *apiv2.ApisixRoute, rule apiv2.ApisixRouteHTTP, enableWebsocket **bool) {
206209
var (
207210
upstreams = make([]*adc.Upstream, 0)
208211
weightedUpstreams = make([]adc.TrafficSplitConfigRuleWeightedUpstream, 0)
@@ -211,7 +214,7 @@ func (t *Translator) buildUpstream(tctx *provider.TranslateContext, service *adc
211214
for _, backend := range rule.Backends {
212215
// try to get the apisixupstream with the same name as the backend service to be upstream config.
213216
// err is ignored because it does not care about the externalNodes of the apisixupstream.
214-
upstream, err := t.translateApisixRouteHTTPBackend(tctx, ar, backend)
217+
upstream, err := t.translateApisixRouteHTTPBackend(tctx, ar, backend, enableWebsocket)
215218
if err != nil {
216219
t.Log.Error(err, "failed to translate ApisixRoute backend", "backend", backend)
217220
continue
@@ -310,7 +313,7 @@ func (t *Translator) buildService(ar *apiv2.ApisixRoute, rule apiv2.ApisixRouteH
310313
return service
311314
}
312315

313-
func getPortFromService(svc *v1.Service, backendSvcPort intstr.IntOrString) (int32, error) {
316+
func getPortFromService(svc *corev1.Service, backendSvcPort intstr.IntOrString) (int32, error) {
314317
var port int32
315318
if backendSvcPort.Type == intstr.Int {
316319
port = int32(backendSvcPort.IntValue())
@@ -330,7 +333,31 @@ func getPortFromService(svc *v1.Service, backendSvcPort intstr.IntOrString) (int
330333
return port, nil
331334
}
332335

333-
func (t *Translator) translateApisixRouteHTTPBackend(tctx *provider.TranslateContext, ar *apiv2.ApisixRoute, backend apiv2.ApisixRouteHTTPBackend) (*adc.Upstream, error) {
336+
func findMatchingServicePort(svc *corev1.Service, backendSvcPort intstr.IntOrString) (*corev1.ServicePort, error) {
337+
var servicePort *corev1.ServicePort
338+
var portNumber int32 = -1
339+
var servicePortName string
340+
switch backendSvcPort.Type {
341+
case intstr.Int:
342+
portNumber = backendSvcPort.IntVal
343+
case intstr.String:
344+
servicePortName = backendSvcPort.StrVal
345+
}
346+
for _, svcPort := range svc.Spec.Ports {
347+
p := svcPort
348+
if p.Port == portNumber || (p.Name != "" && p.Name == servicePortName) {
349+
servicePort = &p
350+
break
351+
}
352+
}
353+
if servicePort == nil {
354+
return nil, errors.Errorf("service port %s not found in service %s", backendSvcPort.String(), svc.Name)
355+
}
356+
357+
return servicePort, nil
358+
}
359+
360+
func (t *Translator) translateApisixRouteHTTPBackend(tctx *provider.TranslateContext, ar *apiv2.ApisixRoute, backend apiv2.ApisixRouteHTTPBackend, enableWebsocket **bool) (*adc.Upstream, error) {
334361
auNN := types.NamespacedName{
335362
Namespace: ar.Namespace,
336363
Name: backend.ServiceName,
@@ -352,50 +379,57 @@ func (t *Translator) translateApisixRouteHTTPBackend(tctx *provider.TranslateCon
352379
upstream = u
353380
}
354381
var (
355-
err error
356-
nodes adc.UpstreamNodes
382+
err error
383+
nodes adc.UpstreamNodes
384+
protocol string
357385
)
358386
if backend.ResolveGranularity == apiv2.ResolveGranularityService {
359-
nodes, err = t.translateApisixRouteBackendResolveGranularityService(tctx, auNN, backend)
387+
nodes, protocol, err = t.translateApisixRouteBackendResolveGranularityService(tctx, auNN, backend)
360388
} else {
361-
nodes, err = t.translateApisixRouteBackendResolveGranularityEndpoint(tctx, auNN, backend)
389+
nodes, protocol, err = t.translateApisixRouteBackendResolveGranularityEndpoint(tctx, auNN, backend)
362390
}
363391
if err != nil {
364392
return nil, err
365393
}
366394
upstream.Nodes = nodes
395+
if upstream.Scheme == "" {
396+
upstream.Scheme = appProtocolToUpstreamScheme(protocol)
397+
}
398+
if protocol == internaltypes.AppProtocolWS || protocol == internaltypes.AppProtocolWSS {
399+
*enableWebsocket = ptr.To(true)
400+
}
367401
if backend.Weight != nil {
368402
upstream.Labels["meta_weight"] = strconv.FormatInt(int64(*backend.Weight), 10)
369403
}
370404
return upstream, nil
371405
}
372406

373-
func (t *Translator) translateApisixRouteBackendResolveGranularityService(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, error) {
407+
func (t *Translator) translateApisixRouteBackendResolveGranularityService(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, string, error) {
374408
serviceNN := types.NamespacedName{
375409
Namespace: arNN.Namespace,
376410
Name: backend.ServiceName,
377411
}
378412
svc, ok := tctx.Services[serviceNN]
379413
if !ok {
380-
return nil, errors.Errorf("service not found, ApisixRoute: %s, Service: %s", arNN, serviceNN)
414+
return nil, "", errors.Errorf("service not found, ApisixRoute: %s, Service: %s", arNN, serviceNN)
381415
}
382416
if svc.Spec.ClusterIP == "" {
383-
return nil, errors.Errorf("conflict headless service and backend resolve granularity, ApisixRoute: %s, Service: %s", arNN, serviceNN)
417+
return nil, "", errors.Errorf("conflict headless service and backend resolve granularity, ApisixRoute: %s, Service: %s", arNN, serviceNN)
384418
}
385-
port, err := getPortFromService(svc, backend.ServicePort)
419+
port, err := findMatchingServicePort(svc, backend.ServicePort)
386420
if err != nil {
387-
return nil, err
421+
return nil, "", err
388422
}
389423
return adc.UpstreamNodes{
390424
{
391425
Host: svc.Spec.ClusterIP,
392-
Port: int(port),
426+
Port: int(port.Port),
393427
Weight: *cmp.Or(backend.Weight, ptr.To(apiv2.DefaultWeight)),
394428
},
395-
}, nil
429+
}, ptr.Deref(port.AppProtocol, ""), nil
396430
}
397431

398-
func (t *Translator) translateApisixRouteStreamBackendResolveGranularity(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteStreamBackend) (adc.UpstreamNodes, error) {
432+
func (t *Translator) translateApisixRouteStreamBackendResolveGranularity(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteStreamBackend) (adc.UpstreamNodes, string, error) {
399433
tsBackend := apiv2.ApisixRouteHTTPBackend{
400434
ServiceName: backend.ServiceName,
401435
ServicePort: backend.ServicePort,
@@ -409,18 +443,18 @@ func (t *Translator) translateApisixRouteStreamBackendResolveGranularity(tctx *p
409443
}
410444
}
411445

412-
func (t *Translator) translateApisixRouteBackendResolveGranularityEndpoint(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, error) {
446+
func (t *Translator) translateApisixRouteBackendResolveGranularityEndpoint(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, string, error) {
413447
serviceNN := types.NamespacedName{
414448
Namespace: arNN.Namespace,
415449
Name: backend.ServiceName,
416450
}
417451
svc, ok := tctx.Services[serviceNN]
418452
if !ok {
419-
return nil, errors.Errorf("service not found, ApisixRoute: %s, Service: %s", arNN, serviceNN)
453+
return nil, "", errors.Errorf("service not found, ApisixRoute: %s, Service: %s", arNN, serviceNN)
420454
}
421455
port, err := getPortFromService(svc, backend.ServicePort)
422456
if err != nil {
423-
return nil, err
457+
return nil, "", err
424458
}
425459
weight := int32(*cmp.Or(backend.Weight, ptr.To(apiv2.DefaultWeight)))
426460
backendRef := gatewayv1.BackendRef{
@@ -482,7 +516,7 @@ func (t *Translator) translateApisixRouteStreamBackend(tctx *provider.TranslateC
482516
}
483517
upstream = u
484518
}
485-
nodes, err := t.translateApisixRouteStreamBackendResolveGranularity(tctx, utils.NamespacedName(ar), backend)
519+
nodes, _, err := t.translateApisixRouteStreamBackendResolveGranularity(tctx, utils.NamespacedName(ar), backend)
486520
if err != nil {
487521
return nil, err
488522
}

internal/adc/translator/gatewayproxy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func (t *Translator) TranslateGatewayProxyToConfig(tctx *provider.TranslateConte
9898
if endpoint == nil {
9999
return nil, nil
100100
}
101-
upstreamNodes, err := t.TranslateBackendRefWithFilter(tctx, gatewayv1.BackendRef{
101+
upstreamNodes, _, err := t.TranslateBackendRefWithFilter(tctx, gatewayv1.BackendRef{
102102
BackendObjectReference: gatewayv1.BackendObjectReference{
103103
Name: gatewayv1.ObjectName(provider.ControlPlane.Service.Name),
104104
Namespace: (*gatewayv1.Namespace)(&gatewayProxy.Namespace),

internal/adc/translator/grpcroute.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func (t *Translator) TranslateGRPCRoute(tctx *provider.TranslateContext, grpcRou
183183
backend.Namespace = &namespace
184184
}
185185
upstream := adctypes.NewDefaultUpstream()
186-
upNodes, err := t.translateBackendRef(tctx, backend.BackendRef, DefaultEndpointFilter)
186+
upNodes, _, err := t.translateBackendRef(tctx, backend.BackendRef, DefaultEndpointFilter)
187187
if err != nil {
188188
backendErr = err
189189
continue

internal/adc/translator/httproute.go

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -391,13 +391,15 @@ func DefaultEndpointFilter(endpoint *discoveryv1.Endpoint) bool {
391391
return true
392392
}
393393

394-
func (t *Translator) TranslateBackendRefWithFilter(tctx *provider.TranslateContext, ref gatewayv1.BackendRef, endpointFilter func(*discoveryv1.Endpoint) bool) (adctypes.UpstreamNodes, error) {
394+
func (t *Translator) TranslateBackendRefWithFilter(tctx *provider.TranslateContext, ref gatewayv1.BackendRef, endpointFilter func(*discoveryv1.Endpoint) bool) (adctypes.UpstreamNodes, string, error) {
395395
return t.translateBackendRef(tctx, ref, endpointFilter)
396396
}
397397

398-
func (t *Translator) translateBackendRef(tctx *provider.TranslateContext, ref gatewayv1.BackendRef, endpointFilter func(*discoveryv1.Endpoint) bool) (adctypes.UpstreamNodes, error) {
398+
func (t *Translator) translateBackendRef(tctx *provider.TranslateContext, ref gatewayv1.BackendRef, endpointFilter func(*discoveryv1.Endpoint) bool) (adctypes.UpstreamNodes, string, error) {
399+
nodes := adctypes.UpstreamNodes{}
400+
var protocol string
399401
if ref.Kind != nil && *ref.Kind != internaltypes.KindService {
400-
return adctypes.UpstreamNodes{}, fmt.Errorf("kind %s is not supported", *ref.Kind)
402+
return nodes, protocol, fmt.Errorf("kind %s is not supported", *ref.Kind)
401403
}
402404

403405
key := types.NamespacedName{
@@ -406,7 +408,7 @@ func (t *Translator) translateBackendRef(tctx *provider.TranslateContext, ref ga
406408
}
407409
service, ok := tctx.Services[key]
408410
if !ok {
409-
return adctypes.UpstreamNodes{}, fmt.Errorf("service %s not found", key)
411+
return nodes, protocol, fmt.Errorf("service %s not found", key)
410412
}
411413

412414
weight := 1
@@ -425,24 +427,26 @@ func (t *Translator) translateBackendRef(tctx *provider.TranslateContext, ref ga
425427
Port: port,
426428
Weight: weight,
427429
},
428-
}, nil
430+
}, protocol, nil
429431
}
430432

431433
var portName *string
432434
if ref.Port != nil {
433435
for _, p := range service.Spec.Ports {
434436
if int(p.Port) == int(*ref.Port) {
435437
portName = ptr.To(p.Name)
438+
protocol = ptr.Deref(p.AppProtocol, "")
436439
break
437440
}
438441
}
439442
if portName == nil {
440-
return adctypes.UpstreamNodes{}, nil
443+
return adctypes.UpstreamNodes{}, protocol, nil
441444
}
442445
}
443446

444447
endpointSlices := tctx.EndpointSlices[key]
445-
return t.translateEndpointSlice(portName, weight, endpointSlices, endpointFilter), nil
448+
nodes = t.translateEndpointSlice(portName, weight, endpointSlices, endpointFilter)
449+
return nodes, protocol, nil
446450
}
447451

448452
// calculateHTTPRoutePriority calculates the priority of the HTTP route.
@@ -544,6 +548,7 @@ func (t *Translator) TranslateHTTPRoute(tctx *provider.TranslateContext, httpRou
544548
upstreams = make([]*adctypes.Upstream, 0)
545549
weightedUpstreams = make([]adctypes.TrafficSplitConfigRuleWeightedUpstream, 0)
546550
backendErr error
551+
enableWebsocket *bool
547552
)
548553

549554
for _, backend := range rule.BackendRefs {
@@ -552,18 +557,21 @@ func (t *Translator) TranslateHTTPRoute(tctx *provider.TranslateContext, httpRou
552557
backend.Namespace = &namespace
553558
}
554559
upstream := adctypes.NewDefaultUpstream()
555-
upNodes, err := t.translateBackendRef(tctx, backend.BackendRef, DefaultEndpointFilter)
560+
upNodes, protocol, err := t.translateBackendRef(tctx, backend.BackendRef, DefaultEndpointFilter)
556561
if err != nil {
557562
backendErr = err
558563
continue
559564
}
560565
if len(upNodes) == 0 {
561566
continue
562567
}
568+
if protocol == internaltypes.AppProtocolWS || protocol == internaltypes.AppProtocolWSS {
569+
enableWebsocket = ptr.To(true)
570+
}
563571

564572
t.AttachBackendTrafficPolicyToUpstream(backend.BackendRef, tctx.BackendTrafficPolicies, upstream)
565573
upstream.Nodes = upNodes
566-
574+
upstream.Scheme = appProtocolToUpstreamScheme(protocol)
567575
var (
568576
kind string
569577
port int32
@@ -683,7 +691,7 @@ func (t *Translator) TranslateHTTPRoute(tctx *provider.TranslateContext, httpRou
683691
route.Name = name
684692
route.ID = id.GenID(name)
685693
route.Labels = labels
686-
route.EnableWebsocket = ptr.To(true)
694+
route.EnableWebsocket = enableWebsocket
687695

688696
// Set the route priority
689697
priority := calculateHTTPRoutePriority(&match, ruleIndex, hosts)
@@ -825,3 +833,18 @@ func (t *Translator) translateHTTPRouteHeaderMatchToVars(header gatewayv1.HTTPHe
825833
}
826834
return HeaderMatchToVars(matchType, string(header.Name), header.Value)
827835
}
836+
837+
func appProtocolToUpstreamScheme(appProtocol string) string {
838+
switch appProtocol {
839+
case internaltypes.AppProtocolHTTP:
840+
return apiv2.SchemeHTTP
841+
case internaltypes.AppProtocolHTTPS:
842+
return apiv2.SchemeHTTPS
843+
case internaltypes.AppProtocolWS:
844+
return apiv2.SchemeHTTP
845+
case internaltypes.AppProtocolWSS:
846+
return apiv2.SchemeHTTPS
847+
default:
848+
return ""
849+
}
850+
}

0 commit comments

Comments
 (0)