diff --git a/internal/controller/manager.go b/internal/controller/manager.go index bb95a3660f..a69d7e9627 100644 --- a/internal/controller/manager.go +++ b/internal/controller/manager.go @@ -528,6 +528,18 @@ func registerControllers( controller.WithK8sPredicate(k8spredicate.GenerationChangedPredicate{}), }, }, + { + objectType: &gatewayv1alpha2.TCPRoute{}, + options: []controller.Option{ + controller.WithK8sPredicate(k8spredicate.GenerationChangedPredicate{}), + }, + }, + { + objectType: &gatewayv1alpha2.UDPRoute{}, + options: []controller.Option{ + controller.WithK8sPredicate(k8spredicate.GenerationChangedPredicate{}), + }, + }, } controllerRegCfgs = append(controllerRegCfgs, gwExpFeatures...) } @@ -754,6 +766,8 @@ func prepareFirstEventBatchPreparerArgs(cfg config.Config) ([]client.Object, []c &gatewayv1alpha3.BackendTLSPolicyList{}, &apiv1.ConfigMapList{}, &gatewayv1alpha2.TLSRouteList{}, + &gatewayv1alpha2.TCPRouteList{}, + &gatewayv1alpha2.UDPRouteList{}, ) } diff --git a/internal/controller/nginx/config/stream/config.go b/internal/controller/nginx/config/stream/config.go index 21a42c505d..58827c7be7 100644 --- a/internal/controller/nginx/config/stream/config.go +++ b/internal/controller/nginx/config/stream/config.go @@ -11,6 +11,12 @@ type Server struct { RewriteClientIP shared.RewriteClientIPSettings SSLPreread bool IsSocket bool + Protocol string + UDPConfig *UDPConfig +} + +type UDPConfig struct { + ProxyTimeout string } // Upstream holds all configuration for a stream upstream. diff --git a/internal/controller/nginx/config/stream_servers.go b/internal/controller/nginx/config/stream_servers.go index 52c32e19f6..ccff1227c9 100644 --- a/internal/controller/nginx/config/stream_servers.go +++ b/internal/controller/nginx/config/stream_servers.go @@ -32,17 +32,24 @@ func (g GeneratorImpl) executeStreamServers(conf dataplane.Configuration) []exec } func createStreamServers(conf dataplane.Configuration) []stream.Server { - if len(conf.TLSPassthroughServers) == 0 { + totalServers := len(conf.TLSPassthroughServers) + len(conf.TCPServers) + len(conf.UDPServers) + if totalServers == 0 { return nil } - streamServers := make([]stream.Server, 0, len(conf.TLSPassthroughServers)*2) + streamServers := make([]stream.Server, 0, totalServers*2) portSet := make(map[int32]struct{}) upstreams := make(map[string]dataplane.Upstream) for _, u := range conf.StreamUpstreams { upstreams[u.Name] = u } + for _, u := range conf.TCPUpstreams { + upstreams[u.Name] = u + } + for _, u := range conf.UDPUpstreams { + upstreams[u.Name] = u + } for _, server := range conf.TLSPassthroughServers { if u, ok := upstreams[server.UpstreamName]; ok && server.UpstreamName != "" { @@ -76,6 +83,47 @@ func createStreamServers(conf dataplane.Configuration) []stream.Server { } streamServers = append(streamServers, streamServer) } + + // Process TCP servers + for i, server := range conf.TCPServers { + if _, inPortSet := portSet[server.Port]; inPortSet { + continue // Skip if port already in use + } + + if u, ok := upstreams[server.UpstreamName]; ok && server.UpstreamName != "" && len(u.Endpoints) > 0 { + streamServer := stream.Server{ + Listen: fmt.Sprint(server.Port), + StatusZone: fmt.Sprintf("tcp_%d", server.Port), + ProxyPass: server.UpstreamName, + } + streamServers = append(streamServers, streamServer) + portSet[server.Port] = struct{}{} + } else { + fmt.Printf("DEBUG: createStreamServers - TCP Server %d: Skipped - upstream not found or no endpoints\n", i) + } + } + + // Process UDP servers + for _, server := range conf.UDPServers { + if _, inPortSet := portSet[server.Port]; inPortSet { + continue // Skip if port already in use + } + + if u, ok := upstreams[server.UpstreamName]; ok && server.UpstreamName != "" && len(u.Endpoints) > 0 { + streamServer := stream.Server{ + Listen: fmt.Sprintf("%d udp", server.Port), + StatusZone: fmt.Sprintf("udp_%d", server.Port), + ProxyPass: server.UpstreamName, + Protocol: "udp", + UDPConfig: &stream.UDPConfig{ + ProxyTimeout: "1s", + }, + } + streamServers = append(streamServers, streamServer) + portSet[server.Port] = struct{}{} + } + } + return streamServers } diff --git a/internal/controller/nginx/config/stream_servers_template.go b/internal/controller/nginx/config/stream_servers_template.go index 75868bc0de..00c5eac0cb 100644 --- a/internal/controller/nginx/config/stream_servers_template.go +++ b/internal/controller/nginx/config/stream_servers_template.go @@ -26,6 +26,10 @@ server { {{- if $s.SSLPreread }} ssl_preread on; {{- end }} + + {{- if and (eq $s.Protocol "udp") $s.UDPConfig }} + proxy_timeout {{ $s.UDPConfig.ProxyTimeout }}; + {{- end }} } {{- end }} diff --git a/internal/controller/nginx/config/upstreams.go b/internal/controller/nginx/config/upstreams.go index 5a792606bb..63e470b81f 100644 --- a/internal/controller/nginx/config/upstreams.go +++ b/internal/controller/nginx/config/upstreams.go @@ -69,7 +69,13 @@ func executeUpstreams(upstreams []http.Upstream) []executeResult { } func (g GeneratorImpl) executeStreamUpstreams(conf dataplane.Configuration) []executeResult { - upstreams := g.createStreamUpstreams(conf.StreamUpstreams) + // Combine all stream upstreams: TLS, TCP, and UDP + allUpstreams := make([]dataplane.Upstream, 0, len(conf.StreamUpstreams)+len(conf.TCPUpstreams)+len(conf.UDPUpstreams)) + allUpstreams = append(allUpstreams, conf.StreamUpstreams...) + allUpstreams = append(allUpstreams, conf.TCPUpstreams...) + allUpstreams = append(allUpstreams, conf.UDPUpstreams...) + + upstreams := g.createStreamUpstreams(allUpstreams) result := executeResult{ dest: streamConfigFile, diff --git a/internal/controller/provisioner/objects.go b/internal/controller/provisioner/objects.go index 47cd221256..de22ced5c9 100644 --- a/internal/controller/provisioner/objects.go +++ b/internal/controller/provisioner/objects.go @@ -43,6 +43,11 @@ const ( defaultInitialDelaySeconds = int32(3) ) +type PortInfo struct { + Port int32 + Protocol corev1.Protocol +} + var emptyDirVolumeSource = corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}} func (p *NginxProvisioner) buildNginxResourceObjects( @@ -136,9 +141,18 @@ func (p *NginxProvisioner) buildNginxResourceObjects( openshiftObjs = p.buildOpenshiftObjects(objectMeta) } - ports := make(map[int32]struct{}) + ports := make(map[int32]PortInfo) for _, listener := range gateway.Spec.Listeners { - ports[int32(listener.Port)] = struct{}{} + var protocol corev1.Protocol + switch listener.Protocol { + case gatewayv1.TCPProtocolType: + protocol = corev1.ProtocolTCP + case gatewayv1.UDPProtocolType: + protocol = corev1.ProtocolUDP + default: + protocol = corev1.ProtocolTCP + } + ports[int32(listener.Port)] = PortInfo{Port: int32(listener.Port), Protocol: protocol} } service, err := buildNginxService(objectMeta, nProxyCfg, ports, selectorLabels) @@ -434,7 +448,7 @@ func (p *NginxProvisioner) buildOpenshiftObjects(objectMeta metav1.ObjectMeta) [ func buildNginxService( objectMeta metav1.ObjectMeta, nProxyCfg *graph.EffectiveNginxProxy, - ports map[int32]struct{}, + ports map[int32]PortInfo, selectorLabels map[string]string, ) (*corev1.Service, error) { var serviceCfg ngfAPIv1alpha2.ServiceSpec @@ -456,16 +470,17 @@ func buildNginxService( } servicePorts := make([]corev1.ServicePort, 0, len(ports)) - for port := range ports { + for _, portInfo := range ports { servicePort := corev1.ServicePort{ - Name: fmt.Sprintf("port-%d", port), - Port: port, - TargetPort: intstr.FromInt32(port), + Name: fmt.Sprintf("port-%d", portInfo.Port), + Port: portInfo.Port, + TargetPort: intstr.FromInt32(portInfo.Port), + Protocol: portInfo.Protocol, } if serviceType != corev1.ServiceTypeClusterIP { for _, nodePort := range serviceCfg.NodePorts { - if nodePort.ListenerPort == port { + if nodePort.ListenerPort == portInfo.Port { servicePort.NodePort = nodePort.Port } } @@ -533,7 +548,7 @@ func (p *NginxProvisioner) buildNginxDeployment( nProxyCfg *graph.EffectiveNginxProxy, ngxIncludesConfigMapName string, ngxAgentConfigMapName string, - ports map[int32]struct{}, + ports map[int32]PortInfo, selectorLabels map[string]string, agentTLSSecretName string, dockerSecretNames map[string]string, @@ -665,7 +680,7 @@ func (p *NginxProvisioner) buildNginxPodTemplateSpec( nProxyCfg *graph.EffectiveNginxProxy, ngxIncludesConfigMapName string, ngxAgentConfigMapName string, - ports map[int32]struct{}, + ports map[int32]PortInfo, agentTLSSecretName string, dockerSecretNames map[string]string, jwtSecretName string, @@ -673,10 +688,11 @@ func (p *NginxProvisioner) buildNginxPodTemplateSpec( clientSSLSecretName string, ) corev1.PodTemplateSpec { containerPorts := make([]corev1.ContainerPort, 0, len(ports)) - for port := range ports { + for _, portInfo := range ports { containerPort := corev1.ContainerPort{ - Name: fmt.Sprintf("port-%d", port), - ContainerPort: port, + Name: fmt.Sprintf("port-%d", portInfo.Port), + ContainerPort: portInfo.Port, + Protocol: portInfo.Protocol, } containerPorts = append(containerPorts, containerPort) } diff --git a/internal/controller/state/change_processor.go b/internal/controller/state/change_processor.go index a89d956198..87757b5699 100644 --- a/internal/controller/state/change_processor.go +++ b/internal/controller/state/change_processor.go @@ -96,6 +96,8 @@ func NewChangeProcessorImpl(cfg ChangeProcessorConfig) *ChangeProcessorImpl { NginxProxies: make(map[types.NamespacedName]*ngfAPIv1alpha2.NginxProxy), GRPCRoutes: make(map[types.NamespacedName]*v1.GRPCRoute), TLSRoutes: make(map[types.NamespacedName]*v1alpha2.TLSRoute), + TCPRoutes: make(map[types.NamespacedName]*v1alpha2.TCPRoute), + UDPRoutes: make(map[types.NamespacedName]*v1alpha2.UDPRoute), NGFPolicies: make(map[graph.PolicyKey]policies.Policy), SnippetsFilters: make(map[types.NamespacedName]*ngfAPIv1alpha1.SnippetsFilter), } @@ -211,6 +213,16 @@ func NewChangeProcessorImpl(cfg ChangeProcessorConfig) *ChangeProcessorImpl { store: newObjectStoreMapAdapter(clusterStore.TLSRoutes), predicate: nil, }, + { + gvk: cfg.MustExtractGVK(&v1alpha2.TCPRoute{}), + store: newObjectStoreMapAdapter(clusterStore.TCPRoutes), + predicate: nil, + }, + { + gvk: cfg.MustExtractGVK(&v1alpha2.UDPRoute{}), + store: newObjectStoreMapAdapter(clusterStore.UDPRoutes), + predicate: nil, + }, { gvk: cfg.MustExtractGVK(&ngfAPIv1alpha1.SnippetsFilter{}), store: newObjectStoreMapAdapter(clusterStore.SnippetsFilters), diff --git a/internal/controller/state/change_processor_test.go b/internal/controller/state/change_processor_test.go index 2d17e6f6e9..ef90638cca 100644 --- a/internal/controller/state/change_processor_test.go +++ b/internal/controller/state/change_processor_test.go @@ -3776,7 +3776,7 @@ var _ = Describe("ChangeProcessor", func() { }, Entry( "an unsupported resource", - &v1alpha2.TCPRoute{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "tcp"}}, + &apiv1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "pod"}}, ), Entry( "nil resource", @@ -3794,8 +3794,8 @@ var _ = Describe("ChangeProcessor", func() { }, Entry( "an unsupported resource", - &v1alpha2.TCPRoute{}, - types.NamespacedName{Namespace: "test", Name: "tcp"}, + &apiv1.Pod{}, + types.NamespacedName{Namespace: "test", Name: "pod"}, ), Entry( "nil resource type", diff --git a/internal/controller/state/dataplane/configuration.go b/internal/controller/state/dataplane/configuration.go index 84b520f053..73e787c80d 100644 --- a/internal/controller/state/dataplane/configuration.go +++ b/internal/controller/state/dataplane/configuration.go @@ -73,6 +73,8 @@ func BuildConfiguration( HTTPServers: httpServers, SSLServers: sslServers, TLSPassthroughServers: buildPassthroughServers(gateway), + TCPServers: buildTCPServers(gateway), + UDPServers: buildUDPServers(gateway), Upstreams: upstreams, StreamUpstreams: buildStreamUpstreams( ctx, @@ -80,8 +82,10 @@ func BuildConfiguration( gateway, serviceResolver, baseHTTPConfig.IPFamily), - BackendGroups: backendGroups, - SSLKeyPairs: buildSSLKeyPairs(g.ReferencedSecrets, gateway.Listeners), + TCPUpstreams: buildTCPUpstreams(ctx, gateway, serviceResolver, baseHTTPConfig.IPFamily), + UDPUpstreams: buildUDPUpstreams(ctx, gateway, serviceResolver, baseHTTPConfig.IPFamily), + BackendGroups: backendGroups, + SSLKeyPairs: buildSSLKeyPairs(g.ReferencedSecrets, gateway.Listeners), CertBundles: buildCertBundles( buildRefCertificateBundles(g.ReferencedSecrets, g.ReferencedCaCertConfigMaps), backendGroups, @@ -168,6 +172,79 @@ func buildPassthroughServers(gateway *graph.Gateway) []Layer4VirtualServer { return passthroughServers } +// buildTCPServers builds TCPServers from TCPRoutes attached to listeners. +func buildTCPServers(gateway *graph.Gateway) []Layer4VirtualServer { + var tcpServers []Layer4VirtualServer + + for _, l := range gateway.Listeners { + if !l.Valid || l.Source.Protocol != v1.TCPProtocolType { + continue + } + + if len(l.L4Routes) > 1 { + fmt.Printf( + "WARN: Listener %s has %d TCPRoutes, which is not supported. Skipping.", + l.Name, + len(l.L4Routes), + ) + continue + } + + for _, r := range l.L4Routes { + if !r.Valid { + continue + } + + upstreamName := r.Spec.BackendRef.ServicePortReference() + tcpServer := Layer4VirtualServer{ + Hostname: "", // TCP doesn't use hostnames + UpstreamName: upstreamName, + Port: int32(l.Source.Port), + } + + tcpServers = append(tcpServers, tcpServer) + } + } + + return tcpServers +} + +// buildUDPServers builds UDPServers from UDPRoutes attached to listeners. +func buildUDPServers(gateway *graph.Gateway) []Layer4VirtualServer { + var udpServers []Layer4VirtualServer + + for _, l := range gateway.Listeners { + if !l.Valid || l.Source.Protocol != v1.UDPProtocolType { + continue + } + + if len(l.L4Routes) > 1 { + fmt.Printf( + "WARN: Listener %s has %d UDPRoutes, which is not supported. Skipping.", + l.Name, + len(l.L4Routes), + ) + continue + } + + for _, r := range l.L4Routes { + if !r.Valid { + continue + } + + udpServer := Layer4VirtualServer{ + Hostname: "", // UDP doesn't use hostnames + UpstreamName: r.Spec.BackendRef.ServicePortReference(), + Port: int32(l.Source.Port), + } + + udpServers = append(udpServers, udpServer) + } + } + + return udpServers +} + // buildStreamUpstreams builds all stream upstreams. func buildStreamUpstreams( ctx context.Context, @@ -242,6 +319,137 @@ func buildStreamUpstreams( return upstreams } +// buildTCPUpstreams builds all TCP upstreams. +func buildTCPUpstreams( + ctx context.Context, + gateway *graph.Gateway, + serviceResolver resolver.ServiceResolver, + ipFamily IPFamilyType, +) []Upstream { + uniqueUpstreams := make(map[string]Upstream) + + for _, l := range gateway.Listeners { + if !l.Valid || l.Source.Protocol != v1.TCPProtocolType { + continue + } + + for _, route := range l.L4Routes { + if !route.Valid { + continue + } + + br := route.Spec.BackendRef + + if !br.Valid { + continue + } + + gatewayNSName := client.ObjectKeyFromObject(gateway.Source) + if _, ok := br.InvalidForGateways[gatewayNSName]; ok { + continue + } + + upstreamName := br.ServicePortReference() + + if _, exist := uniqueUpstreams[upstreamName]; exist { + continue + } + + var errMsg string + + allowedAddressType := getAllowedAddressType(ipFamily) + + eps, err := serviceResolver.Resolve(ctx, br.SvcNsName, br.ServicePort, allowedAddressType) + if err != nil { + errMsg = err.Error() + } + + uniqueUpstreams[upstreamName] = Upstream{ + Name: upstreamName, + Endpoints: eps, + ErrorMsg: errMsg, + } + } + } + + if len(uniqueUpstreams) == 0 { + return nil + } + + upstreams := make([]Upstream, 0, len(uniqueUpstreams)) + + for _, up := range uniqueUpstreams { + upstreams = append(upstreams, up) + } + + return upstreams +} + +// buildUDPUpstreams builds all UDP upstreams. +func buildUDPUpstreams( + ctx context.Context, + gateway *graph.Gateway, + serviceResolver resolver.ServiceResolver, + ipFamily IPFamilyType, +) []Upstream { + uniqueUpstreams := make(map[string]Upstream) + + for _, l := range gateway.Listeners { + if !l.Valid || l.Source.Protocol != v1.UDPProtocolType { + continue + } + + for _, route := range l.L4Routes { + if !route.Valid { + continue + } + + br := route.Spec.BackendRef + + if !br.Valid { + continue + } + + gatewayNSName := client.ObjectKeyFromObject(gateway.Source) + if _, ok := br.InvalidForGateways[gatewayNSName]; ok { + continue + } + + upstreamName := br.ServicePortReference() + + if _, exist := uniqueUpstreams[upstreamName]; exist { + continue + } + + var errMsg string + + allowedAddressType := getAllowedAddressType(ipFamily) + + eps, err := serviceResolver.Resolve(ctx, br.SvcNsName, br.ServicePort, allowedAddressType) + if err != nil { + errMsg = err.Error() + } + + uniqueUpstreams[upstreamName] = Upstream{ + Name: upstreamName, + Endpoints: eps, + ErrorMsg: errMsg, + } + } + } + + if len(uniqueUpstreams) == 0 { + return nil + } + + upstreams := make([]Upstream, 0, len(uniqueUpstreams)) + + for _, up := range uniqueUpstreams { + upstreams = append(upstreams, up) + } + return upstreams +} + // buildSSLKeyPairs builds the SSLKeyPairs from the Secrets. It will only include Secrets that are referenced by // valid listeners, so that we don't include unused Secrets in the configuration of the data plane. func buildSSLKeyPairs( @@ -425,6 +633,9 @@ func buildServers(gateway *graph.Gateway) (http, ssl []VirtualServer) { if l.Source.Protocol == v1.TLSProtocolType { continue } + if l.Source.Protocol == v1.TCPProtocolType || l.Source.Protocol == v1.UDPProtocolType { + continue + } if l.Valid { rules := rulesForProtocol[l.Source.Protocol][l.Source.Port] if rules == nil { diff --git a/internal/controller/state/dataplane/types.go b/internal/controller/state/dataplane/types.go index ac8e263050..e36b31348e 100644 --- a/internal/controller/state/dataplane/types.go +++ b/internal/controller/state/dataplane/types.go @@ -33,6 +33,10 @@ type Configuration struct { SSLServers []VirtualServer // TLSPassthroughServers hold all TLSPassthroughServers TLSPassthroughServers []Layer4VirtualServer + // TCPServers holds all TCPServers + TCPServers []Layer4VirtualServer + // UDPServers holds all UDPServers + UDPServers []Layer4VirtualServer // Upstreams holds all unique http Upstreams. Upstreams []Upstream // DeploymentContext contains metadata about NGF and the cluster. @@ -42,6 +46,10 @@ type Configuration struct { AuxiliarySecrets map[graph.SecretFileType][]byte // StreamUpstreams holds all unique stream Upstreams StreamUpstreams []Upstream + // TCPUpstreams holds all unique TCP Upstreams + TCPUpstreams []Upstream + // UDPUpstreams holds all unique UDP Upstreams + UDPUpstreams []Upstream // BackendGroups holds all unique BackendGroups. BackendGroups []BackendGroup // MainSnippets holds all the snippets that apply to the main context. diff --git a/internal/controller/state/graph/gateway_listener.go b/internal/controller/state/graph/gateway_listener.go index d8524c93a8..9ac74bbd9f 100644 --- a/internal/controller/state/graph/gateway_listener.go +++ b/internal/controller/state/graph/gateway_listener.go @@ -66,7 +66,7 @@ func buildListeners( } type listenerConfiguratorFactory struct { - http, https, tls, unsupportedProtocol *listenerConfigurator + http, https, tls, tcp, udp, unsupportedProtocol *listenerConfigurator } func (f *listenerConfiguratorFactory) getConfiguratorForListener(l v1.Listener) *listenerConfigurator { @@ -77,6 +77,10 @@ func (f *listenerConfiguratorFactory) getConfiguratorForListener(l v1.Listener) return f.https case v1.TLSProtocolType: return f.tls + case v1.TCPProtocolType: + return f.tcp + case v1.UDPProtocolType: + return f.udp default: return f.unsupportedProtocol } @@ -97,7 +101,7 @@ func newListenerConfiguratorFactory( valErr := field.NotSupported( field.NewPath("protocol"), listener.Protocol, - []string{string(v1.HTTPProtocolType), string(v1.HTTPSProtocolType), string(v1.TLSProtocolType)}, + []string{string(v1.HTTPProtocolType), string(v1.HTTPSProtocolType), string(v1.TLSProtocolType), string(v1.TCPProtocolType), string(v1.UDPProtocolType)}, ) return conditions.NewListenerUnsupportedProtocol(valErr.Error()), false /* not attachable */ }, @@ -140,6 +144,26 @@ func newListenerConfiguratorFactory( }, externalReferenceResolvers: []listenerExternalReferenceResolver{}, }, + tcp: &listenerConfigurator{ + validators: []listenerValidator{ + validateListenerAllowedRouteKind, + validateListenerLabelSelector, + createTCPListenerValidator(protectedPorts), + }, + conflictResolvers: []listenerConflictResolver{ + sharedPortConflictResolver, + }, + }, + udp: &listenerConfigurator{ + validators: []listenerValidator{ + validateListenerAllowedRouteKind, + validateListenerLabelSelector, + createUDPListenerValidator(protectedPorts), + }, + conflictResolvers: []listenerConflictResolver{ + sharedPortConflictResolver, + }, + }, } } @@ -268,6 +292,14 @@ func getAndValidateListenerSupportedKinds(listener v1.Listener) ( validKinds = []v1.RouteGroupKind{ {Kind: v1.Kind(kinds.TLSRoute), Group: helpers.GetPointer[v1.Group](v1.GroupName)}, } + case v1.TCPProtocolType: + validKinds = []v1.RouteGroupKind{ + {Kind: v1.Kind(kinds.TCPRoute), Group: helpers.GetPointer[v1.Group](v1.GroupName)}, + } + case v1.UDPProtocolType: + validKinds = []v1.RouteGroupKind{ + {Kind: v1.Kind(kinds.UDPRoute), Group: helpers.GetPointer[v1.Group](v1.GroupName)}, + } } validProtocolRouteKind := func(kind v1.RouteGroupKind) bool { @@ -591,3 +623,51 @@ func haveOverlap(hostname1, hostname2 *v1.Hostname) bool { } return matchesWildcard(h1, h2) } + +func createTCPListenerValidator(protectedPorts ProtectedPorts) listenerValidator { + return func(listener v1.Listener) (conds []conditions.Condition, attachable bool) { + if err := validateListenerPort(listener.Port, protectedPorts); err != nil { + path := field.NewPath("port") + valErr := field.Invalid(path, listener.Port, err.Error()) + conds = append(conds, conditions.NewListenerUnsupportedValue(valErr.Error())...) + } + + if listener.TLS != nil { + path := field.NewPath("tls") + valErr := field.Forbidden(path, "tls is not supported for TCP listener") + conds = append(conds, conditions.NewListenerUnsupportedValue(valErr.Error())...) + } + + if listener.Hostname != nil { + path := field.NewPath("hostname") + valErr := field.Forbidden(path, "hostname is not supported for TCP listener") + conds = append(conds, conditions.NewListenerUnsupportedValue(valErr.Error())...) + } + + return conds, true + } +} + +func createUDPListenerValidator(protectedPorts ProtectedPorts) listenerValidator { + return func(listener v1.Listener) (conds []conditions.Condition, attachable bool) { + if err := validateListenerPort(listener.Port, protectedPorts); err != nil { + path := field.NewPath("port") + valErr := field.Invalid(path, listener.Port, err.Error()) + conds = append(conds, conditions.NewListenerUnsupportedValue(valErr.Error())...) + } + + if listener.TLS != nil { + path := field.NewPath("tls") + valErr := field.Forbidden(path, "tls is not supported for UDP listener") + conds = append(conds, conditions.NewListenerUnsupportedValue(valErr.Error())...) + } + + if listener.Hostname != nil { + path := field.NewPath("hostname") + valErr := field.Forbidden(path, "hostname is not supported for UDP listener") + conds = append(conds, conditions.NewListenerUnsupportedValue(valErr.Error())...) + } + + return conds, true + } +} diff --git a/internal/controller/state/graph/gateway_listener_test.go b/internal/controller/state/graph/gateway_listener_test.go index 0b75ddb7c0..086a7d8a13 100644 --- a/internal/controller/state/graph/gateway_listener_test.go +++ b/internal/controller/state/graph/gateway_listener_test.go @@ -323,8 +323,24 @@ func TestGetAndValidateListenerSupportedKinds(t *testing.T) { { protocol: v1.TCPProtocolType, expectErr: false, - name: "unsupported protocol is ignored", - expected: nil, + name: "valid TCP protocol", + expected: []v1.RouteGroupKind{ + { + Kind: kinds.TCPRoute, + Group: helpers.GetPointer[v1.Group](v1.GroupName), + }, + }, + }, + { + protocol: v1.UDPProtocolType, + expectErr: false, + name: "valid UDP protocol", + expected: []v1.RouteGroupKind{ + { + Kind: kinds.UDPRoute, + Group: helpers.GetPointer[v1.Group](v1.GroupName), + }, + }, }, { protocol: v1.HTTPProtocolType, diff --git a/internal/controller/state/graph/graph.go b/internal/controller/state/graph/graph.go index 52cb047ae2..8f29c8646b 100644 --- a/internal/controller/state/graph/graph.go +++ b/internal/controller/state/graph/graph.go @@ -28,6 +28,8 @@ type ClusterState struct { Gateways map[types.NamespacedName]*gatewayv1.Gateway HTTPRoutes map[types.NamespacedName]*gatewayv1.HTTPRoute TLSRoutes map[types.NamespacedName]*v1alpha2.TLSRoute + TCPRoutes map[types.NamespacedName]*v1alpha2.TCPRoute + UDPRoutes map[types.NamespacedName]*v1alpha2.UDPRoute Services map[types.NamespacedName]*v1.Service Namespaces map[types.NamespacedName]*v1.Namespace ReferenceGrants map[types.NamespacedName]*v1beta1.ReferenceGrant @@ -252,6 +254,8 @@ func BuildGraph( l4routes := buildL4RoutesForGateways( state.TLSRoutes, + state.TCPRoutes, + state.UDPRoutes, state.Services, gws, refGrantResolver, diff --git a/internal/controller/state/graph/reference_grant.go b/internal/controller/state/graph/reference_grant.go index b827d47024..f56949a4be 100644 --- a/internal/controller/state/graph/reference_grant.go +++ b/internal/controller/state/graph/reference_grant.go @@ -89,6 +89,22 @@ func fromTLSRoute(namespace string) fromResource { } } +func fromTCPRoute(namespace string) fromResource { + return fromResource{ + group: v1.GroupName, + kind: kinds.TCPRoute, + namespace: namespace, + } +} + +func fromUDPRoute(namespace string) fromResource { + return fromResource{ + group: v1.GroupName, + kind: kinds.UDPRoute, + namespace: namespace, + } +} + // newReferenceGrantResolver creates a new referenceGrantResolver. func newReferenceGrantResolver(refGrants map[types.NamespacedName]*v1beta1.ReferenceGrant) *referenceGrantResolver { allowed := make(map[allowedReference]struct{}) diff --git a/internal/controller/state/graph/route_common.go b/internal/controller/state/graph/route_common.go index d097e72cbb..eb2c3fedae 100644 --- a/internal/controller/state/graph/route_common.go +++ b/internal/controller/state/graph/route_common.go @@ -74,6 +74,10 @@ const ( RouteTypeGRPC RouteType = "grpc" // RouteTypeTLS indicates that the RouteType of the L4Route is TLS. RouteTypeTLS RouteType = "tls" + // RouteTypeTCP indicates that the RouteType of the L4Route is TCP. + RouteTypeTCP RouteType = "tcp" + // RouteTypeUDP indicates that the RouteType of the L4Route is UDP. + RouteTypeUDP RouteType = "udp" ) // L4RouteKey is the unique identifier for a L4Route. @@ -209,6 +213,8 @@ func (e routeRuleErrors) append(newErrors routeRuleErrors) routeRuleErrors { func buildL4RoutesForGateways( tlsRoutes map[types.NamespacedName]*v1alpha.TLSRoute, + tcpRoutes map[types.NamespacedName]*v1alpha.TCPRoute, + udpRoutes map[types.NamespacedName]*v1alpha.UDPRoute, services map[types.NamespacedName]*apiv1.Service, gws map[types.NamespacedName]*Gateway, resolver *referenceGrantResolver, @@ -230,6 +236,32 @@ func buildL4RoutesForGateways( } } + // Process TCP routes + for _, route := range tcpRoutes { + r := buildTCPRoute( + route, + gws, + services, + resolver.refAllowedFrom(fromTCPRoute(route.Namespace)), + ) + if r != nil { + routes[CreateRouteKeyL4(route)] = r + } + } + + // Process UDP routes + for _, route := range udpRoutes { + r := buildUDPRoute( + route, + gws, + services, + resolver.refAllowedFrom(fromUDPRoute(route.Namespace)), + ) + if r != nil { + routes[CreateRouteKeyL4(route)] = r + } + } + return routes } @@ -687,6 +719,19 @@ func tryToAttachL4RouteToListeners( return conditions.Condition{}, true } +func getL4RouteKind(route *L4Route) v1.Kind { + switch route.Source.(type) { + case *v1alpha.TLSRoute: + return v1.Kind(kinds.TLSRoute) + case *v1alpha.TCPRoute: + return v1.Kind(kinds.TCPRoute) + case *v1alpha.UDPRoute: + return v1.Kind(kinds.UDPRoute) + default: + return v1.Kind(kinds.TLSRoute) + } +} + func bindToListenerL4( l *Listener, route *L4Route, @@ -699,7 +744,8 @@ func bindToListenerL4( return false, false, false } - if !isRouteTypeAllowedByListener(l, kinds.TLSRoute) { + routeKind := getL4RouteKind(route) + if !isRouteTypeAllowedByListener(l, routeKind) { return false, false, false } diff --git a/internal/controller/state/graph/route_common_test.go b/internal/controller/state/graph/route_common_test.go index 0e5d28f0e4..28e0958358 100644 --- a/internal/controller/state/graph/route_common_test.go +++ b/internal/controller/state/graph/route_common_test.go @@ -2365,8 +2365,10 @@ func TestBuildL4RoutesForGateways_NoGateways(t *testing.T) { g.Expect(buildL4RoutesForGateways( tlsRoutes, + nil, // tcpRoutes + nil, // udpRoutes services, - nil, + nil, // gateways refGrantResolver, )).To(BeNil()) } diff --git a/internal/controller/state/graph/tcproute.go b/internal/controller/state/graph/tcproute.go new file mode 100644 index 0000000000..9cddfe86e6 --- /dev/null +++ b/internal/controller/state/graph/tcproute.go @@ -0,0 +1,125 @@ +package graph + +import ( + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/validation/field" + "sigs.k8s.io/gateway-api/apis/v1alpha2" + + "github.com/nginx/nginx-gateway-fabric/internal/controller/state/conditions" +) + +func buildTCPRoute( + tcpRoute *v1alpha2.TCPRoute, + gws map[types.NamespacedName]*Gateway, + services map[types.NamespacedName]*apiv1.Service, + refGrantResolver func(resource toResource) bool, +) *L4Route { + r := &L4Route{ + Source: tcpRoute, + } + + sectionNameRefs, err := buildSectionNameRefs(tcpRoute.Spec.ParentRefs, tcpRoute.Namespace, gws) + if err != nil { + r.Valid = false + return r + } + + // route doesn't belong to any of the Gateways + if len(sectionNameRefs) == 0 { + return nil + } + r.ParentRefs = sectionNameRefs + + // TCPRoute doesn't have hostnames like TLSRoute, so we skip hostname validation + + if len(tcpRoute.Spec.Rules) != 1 || len(tcpRoute.Spec.Rules[0].BackendRefs) != 1 { + r.Valid = false + cond := conditions.NewRouteBackendRefUnsupportedValue( + "Must have exactly one Rule and BackendRef", + ) + r.Conditions = append(r.Conditions, cond) + return r + } + + br, conds := validateBackendRefTCPRoute(tcpRoute, services, r.ParentRefs, refGrantResolver) + + r.Spec.BackendRef = br + r.Valid = true + r.Attachable = true + + if len(conds) > 0 { + r.Conditions = append(r.Conditions, conds...) + } + + return r +} + +func validateBackendRefTCPRoute( + tcpRoute *v1alpha2.TCPRoute, + services map[types.NamespacedName]*apiv1.Service, + parentRefs []ParentRef, + refGrantResolver func(resource toResource) bool, +) (BackendRef, []conditions.Condition) { + // Length of BackendRefs and Rules is guaranteed to be one due to earlier check in buildTCPRoute + refPath := field.NewPath("spec").Child("rules").Index(0).Child("backendRefs").Index(0) + + ref := tcpRoute.Spec.Rules[0].BackendRefs[0] + + if valid, cond := validateBackendRef( + ref, + tcpRoute.Namespace, + refGrantResolver, + refPath, + ); !valid { + backendRef := BackendRef{ + Valid: false, + InvalidForGateways: make(map[types.NamespacedName]conditions.Condition), + } + + return backendRef, []conditions.Condition{cond} + } + + ns := tcpRoute.Namespace + if ref.Namespace != nil { + ns = string(*ref.Namespace) + } + + svcNsName := types.NamespacedName{ + Namespace: ns, + Name: string(tcpRoute.Spec.Rules[0].BackendRefs[0].Name), + } + + svcIPFamily, svcPort, err := getIPFamilyAndPortFromRef( + ref, + svcNsName, + services, + refPath, + ) + + backendRef := BackendRef{ + SvcNsName: svcNsName, + ServicePort: svcPort, + Valid: true, + InvalidForGateways: make(map[types.NamespacedName]conditions.Condition), + } + + if err != nil { + backendRef.Valid = false + + return backendRef, []conditions.Condition{conditions.NewRouteBackendRefRefBackendNotFound(err.Error())} + } + + // For TCPRoute, we don't need to validate app protocol compatibility + // as TCP is protocol-agnostic at the application layer + + var conds []conditions.Condition + for _, parentRef := range parentRefs { + if err := verifyIPFamily(parentRef.Gateway.EffectiveNginxProxy, svcIPFamily); err != nil { + backendRef.Valid = backendRef.Valid || false + backendRef.InvalidForGateways[parentRef.Gateway.NamespacedName] = conditions.NewRouteInvalidIPFamily(err.Error()) + } + } + + return backendRef, conds +} diff --git a/internal/controller/state/graph/udproute.go b/internal/controller/state/graph/udproute.go new file mode 100644 index 0000000000..c54ad538e4 --- /dev/null +++ b/internal/controller/state/graph/udproute.go @@ -0,0 +1,125 @@ +package graph + +import ( + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/validation/field" + "sigs.k8s.io/gateway-api/apis/v1alpha2" + + "github.com/nginx/nginx-gateway-fabric/internal/controller/state/conditions" +) + +func buildUDPRoute( + udpRoute *v1alpha2.UDPRoute, + gws map[types.NamespacedName]*Gateway, + services map[types.NamespacedName]*apiv1.Service, + refGrantResolver func(resource toResource) bool, +) *L4Route { + r := &L4Route{ + Source: udpRoute, + } + + sectionNameRefs, err := buildSectionNameRefs(udpRoute.Spec.ParentRefs, udpRoute.Namespace, gws) + if err != nil { + r.Valid = false + return r + } + + // route doesn't belong to any of the Gateways + if len(sectionNameRefs) == 0 { + return nil + } + r.ParentRefs = sectionNameRefs + + // UDPRoute doesn't have hostnames like TLSRoute, so we skip hostname validation + + if len(udpRoute.Spec.Rules) != 1 || len(udpRoute.Spec.Rules[0].BackendRefs) != 1 { + r.Valid = false + cond := conditions.NewRouteBackendRefUnsupportedValue( + "Must have exactly one Rule and BackendRef", + ) + r.Conditions = append(r.Conditions, cond) + return r + } + + br, conds := validateBackendRefUDPRoute(udpRoute, services, r.ParentRefs, refGrantResolver) + + r.Spec.BackendRef = br + r.Valid = true + r.Attachable = true + + if len(conds) > 0 { + r.Conditions = append(r.Conditions, conds...) + } + + return r +} + +func validateBackendRefUDPRoute( + udpRoute *v1alpha2.UDPRoute, + services map[types.NamespacedName]*apiv1.Service, + parentRefs []ParentRef, + refGrantResolver func(resource toResource) bool, +) (BackendRef, []conditions.Condition) { + // Length of BackendRefs and Rules is guaranteed to be one due to earlier check in buildUDPRoute + refPath := field.NewPath("spec").Child("rules").Index(0).Child("backendRefs").Index(0) + + ref := udpRoute.Spec.Rules[0].BackendRefs[0] + + if valid, cond := validateBackendRef( + ref, + udpRoute.Namespace, + refGrantResolver, + refPath, + ); !valid { + backendRef := BackendRef{ + Valid: false, + InvalidForGateways: make(map[types.NamespacedName]conditions.Condition), + } + + return backendRef, []conditions.Condition{cond} + } + + ns := udpRoute.Namespace + if ref.Namespace != nil { + ns = string(*ref.Namespace) + } + + svcNsName := types.NamespacedName{ + Namespace: ns, + Name: string(udpRoute.Spec.Rules[0].BackendRefs[0].Name), + } + + svcIPFamily, svcPort, err := getIPFamilyAndPortFromRef( + ref, + svcNsName, + services, + refPath, + ) + + backendRef := BackendRef{ + SvcNsName: svcNsName, + ServicePort: svcPort, + Valid: true, + InvalidForGateways: make(map[types.NamespacedName]conditions.Condition), + } + + if err != nil { + backendRef.Valid = false + + return backendRef, []conditions.Condition{conditions.NewRouteBackendRefRefBackendNotFound(err.Error())} + } + + // For UDPRoute, we don't need to validate app protocol compatibility + // as UDP is protocol-agnostic at the application layer + + var conds []conditions.Condition + for _, parentRef := range parentRefs { + if err := verifyIPFamily(parentRef.Gateway.EffectiveNginxProxy, svcIPFamily); err != nil { + backendRef.Valid = backendRef.Valid || false + backendRef.InvalidForGateways[parentRef.Gateway.NamespacedName] = conditions.NewRouteInvalidIPFamily(err.Error()) + } + } + + return backendRef, conds +} diff --git a/internal/controller/status/prepare_requests.go b/internal/controller/status/prepare_requests.go index d2f39a7a3a..037862063c 100644 --- a/internal/controller/status/prepare_requests.go +++ b/internal/controller/status/prepare_requests.go @@ -37,17 +37,44 @@ func PrepareRouteRequests( r.Source.GetGeneration(), ) - status := v1alpha2.TLSRouteStatus{ - RouteStatus: routeStatus, - } + switch r.Source.(type) { + case *v1alpha2.TLSRoute: + status := v1alpha2.TLSRouteStatus{ + RouteStatus: routeStatus, + } - req := UpdateRequest{ - NsName: routeKey.NamespacedName, - ResourceType: &v1alpha2.TLSRoute{}, - Setter: newTLSRouteStatusSetter(status, gatewayCtlrName), - } + req := UpdateRequest{ + NsName: routeKey.NamespacedName, + ResourceType: &v1alpha2.TLSRoute{}, + Setter: newTLSRouteStatusSetter(status, gatewayCtlrName), + } + reqs = append(reqs, req) - reqs = append(reqs, req) + case *v1alpha2.TCPRoute: + status := v1alpha2.TCPRouteStatus{ + RouteStatus: routeStatus, + } + req := UpdateRequest{ + NsName: routeKey.NamespacedName, + ResourceType: &v1alpha2.TCPRoute{}, + Setter: newTCPRouteStatusSetter(status, gatewayCtlrName), + } + reqs = append(reqs, req) + + case *v1alpha2.UDPRoute: + status := v1alpha2.UDPRouteStatus{ + RouteStatus: routeStatus, + } + req := UpdateRequest{ + NsName: routeKey.NamespacedName, + ResourceType: &v1alpha2.UDPRoute{}, + Setter: newUDPRouteStatusSetter(status, gatewayCtlrName), + } + reqs = append(reqs, req) + + default: + continue + } } for routeKey, r := range routes { diff --git a/internal/controller/status/status_setters.go b/internal/controller/status/status_setters.go index c4fcc7c128..fb96fa97b0 100644 --- a/internal/controller/status/status_setters.go +++ b/internal/controller/status/status_setters.go @@ -142,6 +142,48 @@ func newGRPCRouteStatusSetter(status gatewayv1.GRPCRouteStatus, gatewayCtlrName } } +func newTCPRouteStatusSetter(status v1alpha2.TCPRouteStatus, gatewayCtlrName string) Setter { + return func(object client.Object) (wasSet bool) { + tr := helpers.MustCastObject[*v1alpha2.TCPRoute](object) + + // keep all the parent statuses that belong to other controllers + for _, os := range tr.Status.Parents { + if string(os.ControllerName) != gatewayCtlrName { + status.Parents = append(status.Parents, os) + } + } + + if routeStatusEqual(gatewayCtlrName, tr.Status.Parents, status.Parents) { + return false + } + + tr.Status = status + + return true + } +} + +func newUDPRouteStatusSetter(status v1alpha2.UDPRouteStatus, gatewayCtlrName string) Setter { + return func(object client.Object) (wasSet bool) { + ur := helpers.MustCastObject[*v1alpha2.UDPRoute](object) + + // keep all the parent statuses that belong to other controllers + for _, os := range ur.Status.Parents { + if string(os.ControllerName) != gatewayCtlrName { + status.Parents = append(status.Parents, os) + } + } + + if routeStatusEqual(gatewayCtlrName, ur.Status.Parents, status.Parents) { + return false + } + + ur.Status = status + + return true + } +} + func routeStatusEqual(gatewayCtlrName string, prevParents, curParents []gatewayv1.RouteParentStatus) bool { // Since other controllers may update HTTPRoute status we can't assume anything about the order of the statuses, // and we have to ignore statuses written by other controllers when checking for equality. diff --git a/internal/framework/kinds/kinds.go b/internal/framework/kinds/kinds.go index baeda6f9ee..2100d785d6 100644 --- a/internal/framework/kinds/kinds.go +++ b/internal/framework/kinds/kinds.go @@ -21,6 +21,10 @@ const ( GRPCRoute = "GRPCRoute" // TLSRoute is the TLSRoute kind. TLSRoute = "TLSRoute" + // TCPRoute is the TCPRoute kind. + TCPRoute = "TCPRoute" + // UDPRoute is the UDPRoute kind. + UDPRoute = "UDPRoute" ) // Core API Kinds.