Skip to content

Commit 53267ff

Browse files
authored
feat: support stream_route for ApisixRoute (#2551)
1 parent ebaed22 commit 53267ff

File tree

19 files changed

+799
-190
lines changed

19 files changed

+799
-190
lines changed

api/adc/types.go

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -195,15 +195,13 @@ type Timeout struct {
195195

196196
// +k8s:deepcopy-gen=true
197197
type StreamRoute struct {
198-
Description string `json:"description,omitempty"`
199-
ID string `json:"id,omitempty"`
200-
Labels map[string]string `json:"labels,omitempty"`
201-
Name string `json:"name"`
202-
Plugins Plugins `json:"plugins,omitempty"`
203-
RemoteAddr string `json:"remote_addr,omitempty"`
204-
ServerAddr string `json:"server_addr,omitempty"`
205-
ServerPort *int64 `json:"server_port,omitempty"`
206-
Sni string `json:"sni,omitempty"`
198+
Metadata `json:",inline" yaml:",inline"`
199+
200+
Plugins Plugins `json:"plugins,omitempty"`
201+
RemoteAddr string `json:"remote_addr,omitempty"`
202+
ServerAddr string `json:"server_addr,omitempty"`
203+
ServerPort int32 `json:"server_port,omitempty"`
204+
SNI string `json:"sni,omitempty"`
207205
}
208206

209207
// +k8s:deepcopy-gen=true
@@ -537,6 +535,24 @@ func ComposeRouteName(namespace, name string, rule string) string {
537535
return buf.String()
538536
}
539537

538+
// ComposeStreamRouteName uses namespace, name and rule name to compose
539+
// the stream_route name.
540+
func ComposeStreamRouteName(namespace, name string, rule string) string {
541+
// FIXME Use sync.Pool to reuse this buffer if the upstream
542+
// name composing code path is hot.
543+
p := make([]byte, 0, len(namespace)+len(name)+len(rule)+6)
544+
buf := bytes.NewBuffer(p)
545+
546+
buf.WriteString(namespace)
547+
buf.WriteByte('_')
548+
buf.WriteString(name)
549+
buf.WriteByte('_')
550+
buf.WriteString(rule)
551+
buf.WriteString("_tcp")
552+
553+
return buf.String()
554+
}
555+
540556
func ComposeServiceNameWithRule(namespace, name string, rule string) string {
541557
// FIXME Use sync.Pool to reuse this buffer if the upstream
542558
// name composing code path is hot.
@@ -554,6 +570,24 @@ func ComposeServiceNameWithRule(namespace, name string, rule string) string {
554570
return buf.String()
555571
}
556572

573+
func ComposeServiceNameWithStream(namespace, name string, rule string) string {
574+
// FIXME Use sync.Pool to reuse this buffer if the upstream
575+
// name composing code path is hot.
576+
var p []byte
577+
plen := len(namespace) + len(name) + 6
578+
579+
p = make([]byte, 0, plen)
580+
buf := bytes.NewBuffer(p)
581+
buf.WriteString(namespace)
582+
buf.WriteByte('_')
583+
buf.WriteString(name)
584+
buf.WriteByte('_')
585+
buf.WriteString(rule)
586+
buf.WriteString("_stream")
587+
588+
return buf.String()
589+
}
590+
557591
func ComposeConsumerName(namespace, name string) string {
558592
// FIXME Use sync.Pool to reuse this buffer if the upstream
559593
// name composing code path is hot.
@@ -604,6 +638,18 @@ func NewDefaultRoute() *Route {
604638
}
605639
}
606640

641+
// NewDefaultStreamRoute returns an empty StreamRoute with default values.
642+
func NewDefaultStreamRoute() *StreamRoute {
643+
return &StreamRoute{
644+
Metadata: Metadata{
645+
Desc: "Created by apisix-ingress-controller, DO NOT modify it manually",
646+
Labels: map[string]string{
647+
"managed-by": "apisix-ingress-controller",
648+
},
649+
},
650+
}
651+
}
652+
607653
const (
608654
PluginProxyRewrite string = "proxy-rewrite"
609655
PluginRedirect string = "redirect"

api/adc/zz_generated.deepcopy.go

Lines changed: 1 addition & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/v2/apisixroute_types.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ type ApisixRouteSpec struct {
3939
HTTP []ApisixRouteHTTP `json:"http,omitempty" yaml:"http,omitempty"`
4040
// Stream defines a list of stream route rules.
4141
// Each rule specifies conditions to match TCP/UDP traffic and how to forward them.
42-
// Stream is currently not supported.
4342
Stream []ApisixRouteStream `json:"stream,omitempty" yaml:"stream,omitempty"`
4443
}
4544

@@ -111,7 +110,9 @@ type ApisixRouteHTTP struct {
111110
type ApisixRouteStream struct {
112111
// Name is a unique identifier for the route. This field must not be empty.
113112
Name string `json:"name" yaml:"name"`
114-
// Protocol specifies the L4 protocol to match. Can be `tcp` or `udp`.
113+
// Protocol specifies the L4 protocol to match. Can be `TCP` or `UDP`.
114+
//
115+
// +kubebuilder:validation:Enum=TCP;UDP
115116
Protocol string `json:"protocol" yaml:"protocol"`
116117
// Match defines the criteria used to match incoming TCP or UDP connections.
117118
Match ApisixRouteStreamMatch `json:"match" yaml:"match"`
@@ -225,6 +226,9 @@ type ApisixRouteAuthentication struct {
225226
type ApisixRouteStreamMatch struct {
226227
// IngressPort is the port on which the APISIX Ingress proxy server listens.
227228
// This must be a statically configured port, as APISIX does not support dynamic port binding.
229+
//
230+
// +kubebuilder:validation:Minimum=0
231+
// +kubebuilder:validation:Maximum=65535
228232
IngressPort int32 `json:"ingressPort" yaml:"ingressPort"`
229233
// Host is the destination host address used to match the incoming TCP/UDP traffic.
230234
Host string `json:"host,omitempty" yaml:"host,omitempty"`
@@ -240,9 +244,12 @@ type ApisixRouteStreamBackend struct {
240244
// This can be either the port name or port number.
241245
ServicePort intstr.IntOrString `json:"servicePort" yaml:"servicePort"`
242246
// ResolveGranularity determines how the backend service is resolved.
243-
// Valid values are `endpoints` and `service`. When set to `endpoints`,
247+
// Valid values are `endpoint` and `service`. When set to `endpoint`,
244248
// individual pod IPs will be used; otherwise, the Service's ClusterIP or ExternalIP is used.
245-
// The default is `endpoints`.
249+
// The default is `endpoint`.
250+
//
251+
// +kubebuilder:default=endpoint
252+
// +kubebuilder:validation:Enum=endpoint;service
246253
ResolveGranularity string `json:"resolveGranularity,omitempty" yaml:"resolveGranularity,omitempty"`
247254
// Subset specifies a named subset of the target Service.
248255
// The subset must be pre-defined in the corresponding ApisixUpstream resource.

api/v2/shared_types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ const (
4545
DefaultWeight = 100
4646
)
4747

48+
const (
49+
ResolveGranularityService = "service"
50+
ResolveGranularityEndpoint = "endpoint"
51+
)
52+
4853
const (
4954
// OpEqual means the equal ("==") operator in nginxVars.
5055
OpEqual = "Equal"

config/crd/bases/apisix.apache.org_apisixroutes.yaml

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,6 @@ spec:
355355
description: |-
356356
Stream defines a list of stream route rules.
357357
Each rule specifies conditions to match TCP/UDP traffic and how to forward them.
358-
Stream is currently not supported.
359358
items:
360359
description: ApisixRouteStream defines the configuration for a Layer
361360
4 (TCP/UDP) route. Currently not supported.
@@ -365,11 +364,15 @@ spec:
365364
traffic should be forwarded.
366365
properties:
367366
resolveGranularity:
367+
default: endpoint
368368
description: |-
369369
ResolveGranularity determines how the backend service is resolved.
370-
Valid values are `endpoints` and `service`. When set to `endpoints`,
370+
Valid values are `endpoint` and `service`. When set to `endpoint`,
371371
individual pod IPs will be used; otherwise, the Service's ClusterIP or ExternalIP is used.
372-
The default is `endpoints`.
372+
The default is `endpoint`.
373+
enum:
374+
- endpoint
375+
- service
373376
type: string
374377
serviceName:
375378
description: |-
@@ -407,6 +410,8 @@ spec:
407410
IngressPort is the port on which the APISIX Ingress proxy server listens.
408411
This must be a statically configured port, as APISIX does not support dynamic port binding.
409412
format: int32
413+
maximum: 65535
414+
minimum: 0
410415
type: integer
411416
required:
412417
- ingressPort
@@ -442,7 +447,10 @@ spec:
442447
type: array
443448
protocol:
444449
description: Protocol specifies the L4 protocol to match. Can
445-
be `tcp` or `udp`.
450+
be `TCP` or `UDP`.
451+
enum:
452+
- TCP
453+
- UDP
446454
type: string
447455
required:
448456
- backend

docs/en/latest/reference/api-reference.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,7 +1178,7 @@ It defines routing rules for both HTTP and stream traffic.
11781178
| --- | --- |
11791179
| `ingressClassName` _string_ | IngressClassName is the name of the IngressClass this route belongs to. It allows multiple controllers to watch and reconcile different routes. |
11801180
| `http` _[ApisixRouteHTTP](#apisixroutehttp) array_ | HTTP defines a list of HTTP route rules. Each rule specifies conditions to match HTTP requests and how to forward them. |
1181-
| `stream` _[ApisixRouteStream](#apisixroutestream) array_ | Stream defines a list of stream route rules. Each rule specifies conditions to match TCP/UDP traffic and how to forward them. Stream is currently not supported. |
1181+
| `stream` _[ApisixRouteStream](#apisixroutestream) array_ | Stream defines a list of stream route rules. Each rule specifies conditions to match TCP/UDP traffic and how to forward them. |
11821182

11831183

11841184
_Appears in:_
@@ -1194,7 +1194,7 @@ ApisixRouteStream defines the configuration for a Layer 4 (TCP/UDP) route. Curre
11941194
| Field | Description |
11951195
| --- | --- |
11961196
| `name` _string_ | Name is a unique identifier for the route. This field must not be empty. |
1197-
| `protocol` _string_ | Protocol specifies the L4 protocol to match. Can be `tcp` or `udp`. |
1197+
| `protocol` _string_ | Protocol specifies the L4 protocol to match. Can be `TCP` or `UDP`. |
11981198
| `match` _[ApisixRouteStreamMatch](#apisixroutestreammatch)_ | Match defines the criteria used to match incoming TCP or UDP connections. |
11991199
| `backend` _[ApisixRouteStreamBackend](#apisixroutestreambackend)_ | Backend specifies the destination service to which traffic should be forwarded. |
12001200
| `plugins` _[ApisixRoutePlugin](#apisixrouteplugin) array_ | Plugins defines a list of plugins to apply to this route. |
@@ -1214,7 +1214,7 @@ ApisixRouteStreamBackend represents the backend service for a TCP or UDP stream
12141214
| --- | --- |
12151215
| `serviceName` _string_ | ServiceName is the name of the Kubernetes Service. Cross-namespace references are not supported—ensure the ApisixRoute and the Service are in the same namespace. |
12161216
| `servicePort` _[IntOrString](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#intorstring-intstr-util)_ | ServicePort is the port of the Kubernetes Service. This can be either the port name or port number. |
1217-
| `resolveGranularity` _string_ | ResolveGranularity determines how the backend service is resolved. Valid values are `endpoints` and `service`. When set to `endpoints`, individual pod IPs will be used; otherwise, the Service's ClusterIP or ExternalIP is used. The default is `endpoints`. |
1217+
| `resolveGranularity` _string_ | ResolveGranularity determines how the backend service is resolved. Valid values are `endpoint` and `service`. When set to `endpoint`, individual pod IPs will be used; otherwise, the Service's ClusterIP or ExternalIP is used. The default is `endpoint`. |
12181218
| `subset` _string_ | Subset specifies a named subset of the target Service. The subset must be pre-defined in the corresponding ApisixUpstream resource. |
12191219

12201220

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@ toolchain go1.24.7
77
require (
88
github.com/Masterminds/sprig/v3 v3.2.3
99
github.com/api7/gopkg v0.2.1-0.20230601092738-0f3730f9b57a
10+
github.com/eclipse/paho.mqtt.golang v1.5.0
1011
github.com/gavv/httpexpect/v2 v2.16.0
1112
github.com/go-logr/logr v1.4.2
1213
github.com/go-logr/zapr v1.3.0
1314
github.com/google/go-cmp v0.7.0
1415
github.com/google/uuid v1.6.0
15-
github.com/gorilla/websocket v1.5.1
16+
github.com/gorilla/websocket v1.5.3
1617
github.com/gruntwork-io/terratest v0.50.0
1718
github.com/hashicorp/go-memdb v1.3.4
1819
github.com/incubator4/go-resty-expr v0.1.1

go.sum

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
121121
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
122122
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
123123
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
124+
github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
125+
github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
124126
github.com/emicklei/go-restful/v3 v3.12.0 h1:y2DdzBAURM29NFF94q6RaY4vjIH1rtwDapwQtU84iWk=
125127
github.com/emicklei/go-restful/v3 v3.12.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
126128
github.com/evanphx/json-patch v5.9.0+incompatible h1:fBXyNpNMuTTDdquAq/uisOr2lShz4oaXpDTX2bLe7ls=
@@ -198,8 +200,8 @@ github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
198200
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
199201
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
200202
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
201-
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
202-
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
203+
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
204+
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
203205
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0=
204206
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k=
205207
github.com/gruntwork-io/go-commons v0.8.0 h1:k/yypwrPqSeYHevLlEDmvmgQzcyTwrlZGRaxEM6G0ro=

internal/adc/translator/apisixroute.go

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@ func (t *Translator) TranslateApisixRoute(tctx *provider.TranslateContext, ar *a
5050
}
5151
result.Services = append(result.Services, service)
5252
}
53+
54+
for _, part := range ar.Spec.Stream {
55+
service, err := t.translateStreamRule(tctx, ar, part)
56+
if err != nil {
57+
return nil, err
58+
}
59+
result.Services = append(result.Services, service)
60+
}
5361
return result, nil
5462
}
5563

@@ -88,7 +96,7 @@ func (t *Translator) buildPlugins(tctx *provider.TranslateContext, ar *apiv2.Api
8896
t.loadPluginConfigPlugins(tctx, ar, rule, plugins)
8997

9098
// Apply plugins from the route itself
91-
t.loadRoutePlugins(tctx, ar, rule, plugins)
99+
t.loadRoutePlugins(tctx, ar, rule.Plugins, plugins)
92100

93101
// Add authentication plugins
94102
t.addAuthenticationPlugins(rule, plugins)
@@ -121,8 +129,8 @@ func (t *Translator) loadPluginConfigPlugins(tctx *provider.TranslateContext, ar
121129
}
122130
}
123131

124-
func (t *Translator) loadRoutePlugins(tctx *provider.TranslateContext, ar *apiv2.ApisixRoute, rule apiv2.ApisixRouteHTTP, plugins adc.Plugins) {
125-
for _, plugin := range rule.Plugins {
132+
func (t *Translator) loadRoutePlugins(tctx *provider.TranslateContext, ar *apiv2.ApisixRoute, routePlugins []apiv2.ApisixRoutePlugin, plugins adc.Plugins) {
133+
for _, plugin := range routePlugins {
126134
if !plugin.Enable {
127135
continue
128136
}
@@ -210,7 +218,7 @@ func (t *Translator) buildUpstream(tctx *provider.TranslateContext, service *adc
210218
upstream, _ = t.translateApisixUpstream(tctx, au)
211219
}
212220

213-
if backend.ResolveGranularity == "service" {
221+
if backend.ResolveGranularity == apiv2.ResolveGranularityService {
214222
upstream.Nodes, backendErr = t.translateApisixRouteBackendResolveGranularityService(tctx, utils.NamespacedName(ar), backend)
215223
if backendErr != nil {
216224
t.Log.Error(backendErr, "failed to translate ApisixRoute backend with ResolveGranularity Service")
@@ -341,6 +349,20 @@ func (t *Translator) translateApisixRouteBackendResolveGranularityService(tctx *
341349
}, nil
342350
}
343351

352+
func (t *Translator) translateApisixRouteStreamBackendResolveGranularity(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteStreamBackend) (adc.UpstreamNodes, error) {
353+
tsBackend := apiv2.ApisixRouteHTTPBackend{
354+
ServiceName: backend.ServiceName,
355+
ServicePort: backend.ServicePort,
356+
ResolveGranularity: backend.ResolveGranularity,
357+
Subset: backend.Subset,
358+
}
359+
if backend.ResolveGranularity == apiv2.ResolveGranularityService {
360+
return t.translateApisixRouteBackendResolveGranularityService(tctx, arNN, tsBackend)
361+
} else {
362+
return t.translateApisixRouteBackendResolveGranularityEndpoint(tctx, arNN, tsBackend)
363+
}
364+
}
365+
344366
func (t *Translator) translateApisixRouteBackendResolveGranularityEndpoint(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, error) {
345367
weight := int32(*cmp.Or(backend.Weight, ptr.To(apiv2.DefaultWeight)))
346368
backendRef := gatewayv1.BackendRef{
@@ -355,3 +377,37 @@ func (t *Translator) translateApisixRouteBackendResolveGranularityEndpoint(tctx
355377
}
356378
return t.translateBackendRef(tctx, backendRef, DefaultEndpointFilter)
357379
}
380+
381+
func (t *Translator) translateStreamRule(tctx *provider.TranslateContext, ar *apiv2.ApisixRoute, part apiv2.ApisixRouteStream) (*adc.Service, error) {
382+
// add stream route plugins
383+
plugins := make(adc.Plugins)
384+
t.loadRoutePlugins(tctx, ar, part.Plugins, plugins)
385+
386+
sr := adc.NewDefaultStreamRoute()
387+
sr.Name = adc.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name)
388+
sr.ID = id.GenID(sr.Name)
389+
sr.ServerPort = part.Match.IngressPort
390+
sr.SNI = part.Match.Host
391+
sr.Plugins = plugins
392+
393+
svc := adc.NewDefaultService()
394+
svc.Name = adc.ComposeServiceNameWithStream(ar.Namespace, ar.Name, part.Name)
395+
svc.ID = id.GenID(svc.Name)
396+
svc.StreamRoutes = append(svc.StreamRoutes, sr)
397+
398+
auNN := types.NamespacedName{Namespace: ar.GetNamespace(), Name: part.Backend.ServiceName}
399+
upstream := adc.NewDefaultUpstream()
400+
if au, ok := tctx.Upstreams[auNN]; ok {
401+
upstream, _ = t.translateApisixUpstream(tctx, au)
402+
}
403+
nodes, err := t.translateApisixRouteStreamBackendResolveGranularity(tctx, utils.NamespacedName(ar), part.Backend)
404+
if err != nil {
405+
return nil, err
406+
}
407+
upstream.Nodes = nodes
408+
upstream.ID = ""
409+
upstream.Name = ""
410+
411+
svc.Upstream = upstream
412+
return svc, nil
413+
}

0 commit comments

Comments
 (0)