Skip to content

Commit 7a6c74d

Browse files
committed
Add TCPRoute and UDPRoute Support for L4 Load Balancing
1 parent f04edcd commit 7a6c74d

21 files changed

+846
-34
lines changed

internal/controller/manager.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,18 @@ func registerControllers(
528528
controller.WithK8sPredicate(k8spredicate.GenerationChangedPredicate{}),
529529
},
530530
},
531+
{
532+
objectType: &gatewayv1alpha2.TCPRoute{},
533+
options: []controller.Option{
534+
controller.WithK8sPredicate(k8spredicate.GenerationChangedPredicate{}),
535+
},
536+
},
537+
{
538+
objectType: &gatewayv1alpha2.UDPRoute{},
539+
options: []controller.Option{
540+
controller.WithK8sPredicate(k8spredicate.GenerationChangedPredicate{}),
541+
},
542+
},
531543
}
532544
controllerRegCfgs = append(controllerRegCfgs, gwExpFeatures...)
533545
}
@@ -754,6 +766,8 @@ func prepareFirstEventBatchPreparerArgs(cfg config.Config) ([]client.Object, []c
754766
&gatewayv1alpha3.BackendTLSPolicyList{},
755767
&apiv1.ConfigMapList{},
756768
&gatewayv1alpha2.TLSRouteList{},
769+
&gatewayv1alpha2.TCPRouteList{},
770+
&gatewayv1alpha2.UDPRouteList{},
757771
)
758772
}
759773

internal/controller/nginx/config/stream/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ type Server struct {
1111
RewriteClientIP shared.RewriteClientIPSettings
1212
SSLPreread bool
1313
IsSocket bool
14+
Protocol string
15+
UDPConfig *UDPConfig
16+
}
17+
18+
type UDPConfig struct {
19+
ProxyTimeout string
1420
}
1521

1622
// Upstream holds all configuration for a stream upstream.

internal/controller/nginx/config/stream_servers.go

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,24 @@ func (g GeneratorImpl) executeStreamServers(conf dataplane.Configuration) []exec
3232
}
3333

3434
func createStreamServers(conf dataplane.Configuration) []stream.Server {
35-
if len(conf.TLSPassthroughServers) == 0 {
35+
totalServers := len(conf.TLSPassthroughServers) + len(conf.TCPServers) + len(conf.UDPServers)
36+
if totalServers == 0 {
3637
return nil
3738
}
3839

39-
streamServers := make([]stream.Server, 0, len(conf.TLSPassthroughServers)*2)
40+
streamServers := make([]stream.Server, 0, totalServers*2)
4041
portSet := make(map[int32]struct{})
4142
upstreams := make(map[string]dataplane.Upstream)
4243

4344
for _, u := range conf.StreamUpstreams {
4445
upstreams[u.Name] = u
4546
}
47+
for _, u := range conf.TCPUpstreams {
48+
upstreams[u.Name] = u
49+
}
50+
for _, u := range conf.UDPUpstreams {
51+
upstreams[u.Name] = u
52+
}
4653

4754
for _, server := range conf.TLSPassthroughServers {
4855
if u, ok := upstreams[server.UpstreamName]; ok && server.UpstreamName != "" {
@@ -76,6 +83,47 @@ func createStreamServers(conf dataplane.Configuration) []stream.Server {
7683
}
7784
streamServers = append(streamServers, streamServer)
7885
}
86+
87+
// Process TCP servers
88+
for i, server := range conf.TCPServers {
89+
if _, inPortSet := portSet[server.Port]; inPortSet {
90+
continue // Skip if port already in use
91+
}
92+
93+
if u, ok := upstreams[server.UpstreamName]; ok && server.UpstreamName != "" && len(u.Endpoints) > 0 {
94+
streamServer := stream.Server{
95+
Listen: fmt.Sprint(server.Port),
96+
StatusZone: fmt.Sprintf("tcp_%d", server.Port),
97+
ProxyPass: server.UpstreamName,
98+
}
99+
streamServers = append(streamServers, streamServer)
100+
portSet[server.Port] = struct{}{}
101+
} else {
102+
fmt.Printf("DEBUG: createStreamServers - TCP Server %d: Skipped - upstream not found or no endpoints\n", i)
103+
}
104+
}
105+
106+
// Process UDP servers
107+
for _, server := range conf.UDPServers {
108+
if _, inPortSet := portSet[server.Port]; inPortSet {
109+
continue // Skip if port already in use
110+
}
111+
112+
if u, ok := upstreams[server.UpstreamName]; ok && server.UpstreamName != "" && len(u.Endpoints) > 0 {
113+
streamServer := stream.Server{
114+
Listen: fmt.Sprintf("%d udp", server.Port),
115+
StatusZone: fmt.Sprintf("udp_%d", server.Port),
116+
ProxyPass: server.UpstreamName,
117+
Protocol: "udp",
118+
UDPConfig: &stream.UDPConfig{
119+
ProxyTimeout: "1s",
120+
},
121+
}
122+
streamServers = append(streamServers, streamServer)
123+
portSet[server.Port] = struct{}{}
124+
}
125+
}
126+
79127
return streamServers
80128
}
81129

internal/controller/nginx/config/stream_servers_template.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ server {
2626
{{- if $s.SSLPreread }}
2727
ssl_preread on;
2828
{{- end }}
29+
30+
{{- if and (eq $s.Protocol "udp") $s.UDPConfig }}
31+
proxy_timeout {{ $s.UDPConfig.ProxyTimeout }};
32+
{{- end }}
2933
}
3034
{{- end }}
3135

internal/controller/nginx/config/upstreams.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,13 @@ func executeUpstreams(upstreams []http.Upstream) []executeResult {
6969
}
7070

7171
func (g GeneratorImpl) executeStreamUpstreams(conf dataplane.Configuration) []executeResult {
72-
upstreams := g.createStreamUpstreams(conf.StreamUpstreams)
72+
// Combine all stream upstreams: TLS, TCP, and UDP
73+
allUpstreams := make([]dataplane.Upstream, 0, len(conf.StreamUpstreams)+len(conf.TCPUpstreams)+len(conf.UDPUpstreams))
74+
allUpstreams = append(allUpstreams, conf.StreamUpstreams...)
75+
allUpstreams = append(allUpstreams, conf.TCPUpstreams...)
76+
allUpstreams = append(allUpstreams, conf.UDPUpstreams...)
77+
78+
upstreams := g.createStreamUpstreams(allUpstreams)
7379

7480
result := executeResult{
7581
dest: streamConfigFile,

internal/controller/provisioner/objects.go

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ const (
3939
defaultImagePullPolicy = corev1.PullIfNotPresent
4040
)
4141

42+
type PortInfo struct {
43+
Port int32
44+
Protocol corev1.Protocol
45+
}
46+
4247
var emptyDirVolumeSource = corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}
4348

4449
func (p *NginxProvisioner) buildNginxResourceObjects(
@@ -127,9 +132,18 @@ func (p *NginxProvisioner) buildNginxResourceObjects(
127132
openshiftObjs = p.buildOpenshiftObjects(objectMeta)
128133
}
129134

130-
ports := make(map[int32]struct{})
135+
ports := make(map[int32]PortInfo)
131136
for _, listener := range gateway.Spec.Listeners {
132-
ports[int32(listener.Port)] = struct{}{}
137+
var protocol corev1.Protocol
138+
switch listener.Protocol {
139+
case gatewayv1.TCPProtocolType:
140+
protocol = corev1.ProtocolTCP
141+
case gatewayv1.UDPProtocolType:
142+
protocol = corev1.ProtocolUDP
143+
default:
144+
protocol = corev1.ProtocolTCP
145+
}
146+
ports[int32(listener.Port)] = PortInfo{Port: int32(listener.Port), Protocol: protocol}
133147
}
134148

135149
service := buildNginxService(objectMeta, nProxyCfg, ports, selectorLabels)
@@ -418,7 +432,7 @@ func (p *NginxProvisioner) buildOpenshiftObjects(objectMeta metav1.ObjectMeta) [
418432
func buildNginxService(
419433
objectMeta metav1.ObjectMeta,
420434
nProxyCfg *graph.EffectiveNginxProxy,
421-
ports map[int32]struct{},
435+
ports map[int32]PortInfo,
422436
selectorLabels map[string]string,
423437
) *corev1.Service {
424438
var serviceCfg ngfAPIv1alpha2.ServiceSpec
@@ -440,16 +454,17 @@ func buildNginxService(
440454
}
441455

442456
servicePorts := make([]corev1.ServicePort, 0, len(ports))
443-
for port := range ports {
457+
for _, portInfo := range ports {
444458
servicePort := corev1.ServicePort{
445-
Name: fmt.Sprintf("port-%d", port),
446-
Port: port,
447-
TargetPort: intstr.FromInt32(port),
459+
Name: fmt.Sprintf("port-%d", portInfo.Port),
460+
Port: portInfo.Port,
461+
TargetPort: intstr.FromInt32(portInfo.Port),
462+
Protocol: portInfo.Protocol,
448463
}
449464

450465
if serviceType != corev1.ServiceTypeClusterIP {
451466
for _, nodePort := range serviceCfg.NodePorts {
452-
if nodePort.ListenerPort == port {
467+
if nodePort.ListenerPort == portInfo.Port {
453468
servicePort.NodePort = nodePort.Port
454469
}
455470
}
@@ -506,7 +521,7 @@ func (p *NginxProvisioner) buildNginxDeployment(
506521
nProxyCfg *graph.EffectiveNginxProxy,
507522
ngxIncludesConfigMapName string,
508523
ngxAgentConfigMapName string,
509-
ports map[int32]struct{},
524+
ports map[int32]PortInfo,
510525
selectorLabels map[string]string,
511526
agentTLSSecretName string,
512527
dockerSecretNames map[string]string,
@@ -567,18 +582,19 @@ func (p *NginxProvisioner) buildNginxPodTemplateSpec(
567582
nProxyCfg *graph.EffectiveNginxProxy,
568583
ngxIncludesConfigMapName string,
569584
ngxAgentConfigMapName string,
570-
ports map[int32]struct{},
585+
ports map[int32]PortInfo,
571586
agentTLSSecretName string,
572587
dockerSecretNames map[string]string,
573588
jwtSecretName string,
574589
caSecretName string,
575590
clientSSLSecretName string,
576591
) corev1.PodTemplateSpec {
577592
containerPorts := make([]corev1.ContainerPort, 0, len(ports))
578-
for port := range ports {
593+
for _, portInfo := range ports {
579594
containerPort := corev1.ContainerPort{
580-
Name: fmt.Sprintf("port-%d", port),
581-
ContainerPort: port,
595+
Name: fmt.Sprintf("port-%d", portInfo.Port),
596+
ContainerPort: portInfo.Port,
597+
Protocol: portInfo.Protocol,
582598
}
583599
containerPorts = append(containerPorts, containerPort)
584600
}

internal/controller/state/change_processor.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ func NewChangeProcessorImpl(cfg ChangeProcessorConfig) *ChangeProcessorImpl {
9696
NginxProxies: make(map[types.NamespacedName]*ngfAPIv1alpha2.NginxProxy),
9797
GRPCRoutes: make(map[types.NamespacedName]*v1.GRPCRoute),
9898
TLSRoutes: make(map[types.NamespacedName]*v1alpha2.TLSRoute),
99+
TCPRoutes: make(map[types.NamespacedName]*v1alpha2.TCPRoute),
100+
UDPRoutes: make(map[types.NamespacedName]*v1alpha2.UDPRoute),
99101
NGFPolicies: make(map[graph.PolicyKey]policies.Policy),
100102
SnippetsFilters: make(map[types.NamespacedName]*ngfAPIv1alpha1.SnippetsFilter),
101103
}
@@ -211,6 +213,16 @@ func NewChangeProcessorImpl(cfg ChangeProcessorConfig) *ChangeProcessorImpl {
211213
store: newObjectStoreMapAdapter(clusterStore.TLSRoutes),
212214
predicate: nil,
213215
},
216+
{
217+
gvk: cfg.MustExtractGVK(&v1alpha2.TCPRoute{}),
218+
store: newObjectStoreMapAdapter(clusterStore.TCPRoutes),
219+
predicate: nil,
220+
},
221+
{
222+
gvk: cfg.MustExtractGVK(&v1alpha2.UDPRoute{}),
223+
store: newObjectStoreMapAdapter(clusterStore.UDPRoutes),
224+
predicate: nil,
225+
},
214226
{
215227
gvk: cfg.MustExtractGVK(&ngfAPIv1alpha1.SnippetsFilter{}),
216228
store: newObjectStoreMapAdapter(clusterStore.SnippetsFilters),

internal/controller/state/change_processor_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3776,7 +3776,7 @@ var _ = Describe("ChangeProcessor", func() {
37763776
},
37773777
Entry(
37783778
"an unsupported resource",
3779-
&v1alpha2.TCPRoute{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "tcp"}},
3779+
&apiv1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "pod"}},
37803780
),
37813781
Entry(
37823782
"nil resource",
@@ -3794,8 +3794,8 @@ var _ = Describe("ChangeProcessor", func() {
37943794
},
37953795
Entry(
37963796
"an unsupported resource",
3797-
&v1alpha2.TCPRoute{},
3798-
types.NamespacedName{Namespace: "test", Name: "tcp"},
3797+
&apiv1.Pod{},
3798+
types.NamespacedName{Namespace: "test", Name: "pod"},
37993799
),
38003800
Entry(
38013801
"nil resource type",

0 commit comments

Comments
 (0)