Skip to content

Commit c61331c

Browse files
committed
Add TCPRoute and UDPRoute Support for L4 Load Balancing
1 parent e4eed2d commit c61331c

21 files changed

+846
-34
lines changed

internal/controller/manager.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,18 @@ func registerControllers(
536536
controller.WithK8sPredicate(k8spredicate.GenerationChangedPredicate{}),
537537
},
538538
},
539+
{
540+
objectType: &gatewayv1alpha2.TCPRoute{},
541+
options: []controller.Option{
542+
controller.WithK8sPredicate(k8spredicate.GenerationChangedPredicate{}),
543+
},
544+
},
545+
{
546+
objectType: &gatewayv1alpha2.UDPRoute{},
547+
options: []controller.Option{
548+
controller.WithK8sPredicate(k8spredicate.GenerationChangedPredicate{}),
549+
},
550+
},
539551
}
540552
controllerRegCfgs = append(controllerRegCfgs, gwExpFeatures...)
541553
}
@@ -774,6 +786,8 @@ func prepareFirstEventBatchPreparerArgs(cfg config.Config) ([]client.Object, []c
774786
&gatewayv1alpha3.BackendTLSPolicyList{},
775787
&apiv1.ConfigMapList{},
776788
&gatewayv1alpha2.TLSRouteList{},
789+
&gatewayv1alpha2.TCPRouteList{},
790+
&gatewayv1alpha2.UDPRouteList{},
777791
)
778792
}
779793

internal/controller/nginx/config/stream/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ type Server struct {
1414
RewriteClientIP shared.RewriteClientIPSettings
1515
SSLPreread bool
1616
IsSocket bool
17+
Protocol string
18+
UDPConfig *UDPConfig
19+
}
20+
21+
type UDPConfig struct {
22+
ProxyTimeout string
1723
}
1824

1925
// Upstream holds all configuration for a stream upstream.

internal/controller/nginx/config/stream_servers.go

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,24 @@ func (g GeneratorImpl) executeStreamServers(conf dataplane.Configuration) []exec
3333
}
3434

3535
func createStreamServers(conf dataplane.Configuration) []stream.Server {
36-
if len(conf.TLSPassthroughServers) == 0 {
36+
totalServers := len(conf.TLSPassthroughServers) + len(conf.TCPServers) + len(conf.UDPServers)
37+
if totalServers == 0 {
3738
return nil
3839
}
3940

40-
streamServers := make([]stream.Server, 0, len(conf.TLSPassthroughServers)*2)
41+
streamServers := make([]stream.Server, 0, totalServers*2)
4142
portSet := make(map[int32]struct{})
4243
upstreams := make(map[string]dataplane.Upstream)
4344

4445
for _, u := range conf.StreamUpstreams {
4546
upstreams[u.Name] = u
4647
}
48+
for _, u := range conf.TCPUpstreams {
49+
upstreams[u.Name] = u
50+
}
51+
for _, u := range conf.UDPUpstreams {
52+
upstreams[u.Name] = u
53+
}
4754

4855
for _, server := range conf.TLSPassthroughServers {
4956
if u, ok := upstreams[server.UpstreamName]; ok && server.UpstreamName != "" {
@@ -77,6 +84,47 @@ func createStreamServers(conf dataplane.Configuration) []stream.Server {
7784
}
7885
streamServers = append(streamServers, streamServer)
7986
}
87+
88+
// Process TCP servers
89+
for i, server := range conf.TCPServers {
90+
if _, inPortSet := portSet[server.Port]; inPortSet {
91+
continue // Skip if port already in use
92+
}
93+
94+
if u, ok := upstreams[server.UpstreamName]; ok && server.UpstreamName != "" && len(u.Endpoints) > 0 {
95+
streamServer := stream.Server{
96+
Listen: fmt.Sprint(server.Port),
97+
StatusZone: fmt.Sprintf("tcp_%d", server.Port),
98+
ProxyPass: server.UpstreamName,
99+
}
100+
streamServers = append(streamServers, streamServer)
101+
portSet[server.Port] = struct{}{}
102+
} else {
103+
fmt.Printf("DEBUG: createStreamServers - TCP Server %d: Skipped - upstream not found or no endpoints\n", i)
104+
}
105+
}
106+
107+
// Process UDP servers
108+
for _, server := range conf.UDPServers {
109+
if _, inPortSet := portSet[server.Port]; inPortSet {
110+
continue // Skip if port already in use
111+
}
112+
113+
if u, ok := upstreams[server.UpstreamName]; ok && server.UpstreamName != "" && len(u.Endpoints) > 0 {
114+
streamServer := stream.Server{
115+
Listen: fmt.Sprintf("%d udp", server.Port),
116+
StatusZone: fmt.Sprintf("udp_%d", server.Port),
117+
ProxyPass: server.UpstreamName,
118+
Protocol: "udp",
119+
UDPConfig: &stream.UDPConfig{
120+
ProxyTimeout: "1s",
121+
},
122+
}
123+
streamServers = append(streamServers, streamServer)
124+
portSet[server.Port] = struct{}{}
125+
}
126+
}
127+
80128
return streamServers
81129
}
82130

internal/controller/nginx/config/stream_servers_template.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ server {
3535
{{- if $s.SSLPreread }}
3636
ssl_preread on;
3737
{{- end }}
38+
39+
{{- if and (eq $s.Protocol "udp") $s.UDPConfig }}
40+
proxy_timeout {{ $s.UDPConfig.ProxyTimeout }};
41+
{{- end }}
3842
}
3943
{{- end }}
4044

internal/controller/nginx/config/upstreams.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,13 @@ func executeUpstreams(upstreams []http.Upstream) []executeResult {
6969
}
7070

7171
func (g GeneratorImpl) executeStreamUpstreams(conf dataplane.Configuration) []executeResult {
72-
upstreams := g.createStreamUpstreams(conf.StreamUpstreams)
72+
// Combine all stream upstreams: TLS, TCP, and UDP
73+
allUpstreams := make([]dataplane.Upstream, 0, len(conf.StreamUpstreams)+len(conf.TCPUpstreams)+len(conf.UDPUpstreams))
74+
allUpstreams = append(allUpstreams, conf.StreamUpstreams...)
75+
allUpstreams = append(allUpstreams, conf.TCPUpstreams...)
76+
allUpstreams = append(allUpstreams, conf.UDPUpstreams...)
77+
78+
upstreams := g.createStreamUpstreams(allUpstreams)
7379

7480
result := executeResult{
7581
dest: streamConfigFile,

internal/controller/provisioner/objects.go

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ const (
4444
defaultInitialDelaySeconds = int32(3)
4545
)
4646

47+
type PortInfo struct {
48+
Port int32
49+
Protocol corev1.Protocol
50+
}
51+
4752
var emptyDirVolumeSource = corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}
4853

4954
//nolint:gocyclo // will refactor at some point
@@ -147,9 +152,18 @@ func (p *NginxProvisioner) buildNginxResourceObjects(
147152
openshiftObjs = p.buildOpenshiftObjects(objectMeta)
148153
}
149154

150-
ports := make(map[int32]struct{})
155+
ports := make(map[int32]PortInfo)
151156
for _, listener := range gateway.Spec.Listeners {
152-
ports[int32(listener.Port)] = struct{}{}
157+
var protocol corev1.Protocol
158+
switch listener.Protocol {
159+
case gatewayv1.TCPProtocolType:
160+
protocol = corev1.ProtocolTCP
161+
case gatewayv1.UDPProtocolType:
162+
protocol = corev1.ProtocolUDP
163+
default:
164+
protocol = corev1.ProtocolTCP
165+
}
166+
ports[int32(listener.Port)] = PortInfo{Port: int32(listener.Port), Protocol: protocol}
153167
}
154168

155169
// Create separate copies of objectMeta for service and deployment to avoid shared map references
@@ -515,7 +529,7 @@ func (p *NginxProvisioner) buildOpenshiftObjects(objectMeta metav1.ObjectMeta) [
515529
func buildNginxService(
516530
objectMeta metav1.ObjectMeta,
517531
nProxyCfg *graph.EffectiveNginxProxy,
518-
ports map[int32]struct{},
532+
ports map[int32]PortInfo,
519533
selectorLabels map[string]string,
520534
addresses []gatewayv1.GatewaySpecAddress,
521535
) (*corev1.Service, error) {
@@ -538,16 +552,17 @@ func buildNginxService(
538552
}
539553

540554
servicePorts := make([]corev1.ServicePort, 0, len(ports))
541-
for port := range ports {
555+
for _, portInfo := range ports {
542556
servicePort := corev1.ServicePort{
543-
Name: fmt.Sprintf("port-%d", port),
544-
Port: port,
545-
TargetPort: intstr.FromInt32(port),
557+
Name: fmt.Sprintf("port-%d", portInfo.Port),
558+
Port: portInfo.Port,
559+
TargetPort: intstr.FromInt32(portInfo.Port),
560+
Protocol: portInfo.Protocol,
546561
}
547562

548563
if serviceType != corev1.ServiceTypeClusterIP {
549564
for _, nodePort := range serviceCfg.NodePorts {
550-
if nodePort.ListenerPort == port {
565+
if nodePort.ListenerPort == portInfo.Port {
551566
servicePort.NodePort = nodePort.Port
552567
}
553568
}
@@ -625,7 +640,7 @@ func (p *NginxProvisioner) buildNginxDeployment(
625640
nProxyCfg *graph.EffectiveNginxProxy,
626641
ngxIncludesConfigMapName string,
627642
ngxAgentConfigMapName string,
628-
ports map[int32]struct{},
643+
ports map[int32]PortInfo,
629644
selectorLabels map[string]string,
630645
agentTLSSecretName string,
631646
dockerSecretNames map[string]string,
@@ -779,7 +794,7 @@ func (p *NginxProvisioner) buildNginxPodTemplateSpec(
779794
nProxyCfg *graph.EffectiveNginxProxy,
780795
ngxIncludesConfigMapName string,
781796
ngxAgentConfigMapName string,
782-
ports map[int32]struct{},
797+
ports map[int32]PortInfo,
783798
agentTLSSecretName string,
784799
dockerSecretNames map[string]string,
785800
jwtSecretName string,
@@ -788,10 +803,11 @@ func (p *NginxProvisioner) buildNginxPodTemplateSpec(
788803
dataplaneKeySecretName string,
789804
) corev1.PodTemplateSpec {
790805
containerPorts := make([]corev1.ContainerPort, 0, len(ports))
791-
for port := range ports {
806+
for _, portInfo := range ports {
792807
containerPort := corev1.ContainerPort{
793-
Name: fmt.Sprintf("port-%d", port),
794-
ContainerPort: port,
808+
Name: fmt.Sprintf("port-%d", portInfo.Port),
809+
ContainerPort: portInfo.Port,
810+
Protocol: portInfo.Protocol,
795811
}
796812
containerPorts = append(containerPorts, containerPort)
797813
}

internal/controller/state/change_processor.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ func NewChangeProcessorImpl(cfg ChangeProcessorConfig) *ChangeProcessorImpl {
9797
NginxProxies: make(map[types.NamespacedName]*ngfAPIv1alpha2.NginxProxy),
9898
GRPCRoutes: make(map[types.NamespacedName]*v1.GRPCRoute),
9999
TLSRoutes: make(map[types.NamespacedName]*v1alpha2.TLSRoute),
100+
TCPRoutes: make(map[types.NamespacedName]*v1alpha2.TCPRoute),
101+
UDPRoutes: make(map[types.NamespacedName]*v1alpha2.UDPRoute),
100102
NGFPolicies: make(map[graph.PolicyKey]policies.Policy),
101103
SnippetsFilters: make(map[types.NamespacedName]*ngfAPIv1alpha1.SnippetsFilter),
102104
InferencePools: make(map[types.NamespacedName]*inference.InferencePool),
@@ -218,6 +220,16 @@ func NewChangeProcessorImpl(cfg ChangeProcessorConfig) *ChangeProcessorImpl {
218220
store: newObjectStoreMapAdapter(clusterStore.TLSRoutes),
219221
predicate: nil,
220222
},
223+
{
224+
gvk: cfg.MustExtractGVK(&v1alpha2.TCPRoute{}),
225+
store: newObjectStoreMapAdapter(clusterStore.TCPRoutes),
226+
predicate: nil,
227+
},
228+
{
229+
gvk: cfg.MustExtractGVK(&v1alpha2.UDPRoute{}),
230+
store: newObjectStoreMapAdapter(clusterStore.UDPRoutes),
231+
predicate: nil,
232+
},
221233
{
222234
gvk: cfg.MustExtractGVK(&ngfAPIv1alpha1.SnippetsFilter{}),
223235
store: newObjectStoreMapAdapter(clusterStore.SnippetsFilters),

internal/controller/state/change_processor_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3778,7 +3778,7 @@ var _ = Describe("ChangeProcessor", func() {
37783778
},
37793779
Entry(
37803780
"an unsupported resource",
3781-
&v1alpha2.TCPRoute{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "tcp"}},
3781+
&apiv1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "pod"}},
37823782
),
37833783
Entry(
37843784
"nil resource",
@@ -3796,8 +3796,8 @@ var _ = Describe("ChangeProcessor", func() {
37963796
},
37973797
Entry(
37983798
"an unsupported resource",
3799-
&v1alpha2.TCPRoute{},
3800-
types.NamespacedName{Namespace: "test", Name: "tcp"},
3799+
&apiv1.Pod{},
3800+
types.NamespacedName{Namespace: "test", Name: "pod"},
38013801
),
38023802
Entry(
38033803
"nil resource type",

0 commit comments

Comments
 (0)