@@ -68,7 +68,12 @@ func (r *Reconciler) createDataPlane(
6868 // so it cannot be overridden by spec.infrastructure).
6969 setGatewayNameLabelInDataPlane (& dataplane .Spec .DataPlaneOptions , gateway .Name )
7070 setDataPlaneOptionsDefaults (& dataplane .Spec .DataPlaneOptions , r .DefaultDataPlaneImage )
71- if err := setDataPlaneIngressServicePorts (& dataplane .Spec .DataPlaneOptions , gateway .Spec .Listeners , gatewayConfig .Spec .ListenersOptions ); err != nil {
71+
72+ if err := setDataPlaneOptionsForListeners (
73+ & dataplane .Spec .DataPlaneOptions ,
74+ gateway .Spec .Listeners ,
75+ gatewayConfig .Spec .ListenersOptions ,
76+ ); err != nil {
7277 return nil , err
7378 }
7479
@@ -583,11 +588,12 @@ func generateDataPlaneNetworkPolicy(
583588 podLabels map [string ]string ,
584589) (* networkingv1.NetworkPolicy , error ) {
585590 var (
586- protocolTCP = corev1 .ProtocolTCP
587- adminAPISSLPort = intstr .FromInt (consts .DataPlaneAdminAPIPort )
588- proxyPort = intstr .FromInt (consts .DataPlaneProxyPort )
589- proxySSLPort = intstr .FromInt (consts .DataPlaneProxySSLPort )
590- metricsPort = intstr .FromInt (consts .DataPlaneMetricsPort )
591+ protocolTCP = corev1 .ProtocolTCP
592+ adminAPISSLPorts = []intstr.IntOrString {intstr .FromInt (consts .DataPlaneAdminAPIPort )}
593+ proxyPorts = []intstr.IntOrString {intstr .FromInt (consts .DataPlaneProxyPort )}
594+ proxySSLPorts = []intstr.IntOrString {intstr .FromInt (consts .DataPlaneProxySSLPort )}
595+ metricsPorts = []intstr.IntOrString {intstr .FromInt (consts .DataPlaneMetricsPort )}
596+ streamListenPorts = []intstr.IntOrString {}
591597 // The label keys to match Kong operator pod.
592598 // To not create new NetworkPolicy on upgrade of , we just keep the keys marking the application
593599 // and remove the keys related to versions such as `version`,`pod-template-hash`,`helm.sh/chart`.
@@ -614,21 +620,33 @@ func generateDataPlaneNetworkPolicy(
614620 if err != nil {
615621 return nil , fmt .Errorf ("failed parsing KONG_PROXY_LISTEN env: %w" , err )
616622 }
617- if kongListenConfig .Endpoint != nil {
618- proxyPort = intstr .FromInt (kongListenConfig .Endpoint .Port )
619- }
620- if kongListenConfig .SSLEndpoint != nil {
621- proxySSLPort = intstr .FromInt (kongListenConfig .SSLEndpoint .Port )
622- }
623+
624+ proxyPorts = lo .Map (kongListenConfig .Endpoints , func (ep * proxyListenEndpoint , _ int ) intstr.IntOrString {
625+ return intstr .FromInt (ep .Port )
626+ })
627+ proxySSLPorts = lo .Map (kongListenConfig .SSLEndpoints , func (ep * proxyListenEndpoint , _ int ) intstr.IntOrString {
628+ return intstr .FromInt (ep .Port )
629+ })
623630 }
624631 if adminListen := k8sutils .EnvValueByName (container .Env , "KONG_ADMIN_LISTEN" ); adminListen != "" {
625632 kongListenConfig , err := parseKongListenEnv (adminListen )
626633 if err != nil {
627634 return nil , fmt .Errorf ("failed parsing KONG_ADMIN_LISTEN env: %w" , err )
628635 }
629- if kongListenConfig .SSLEndpoint != nil {
630- adminAPISSLPort = intstr .FromInt (kongListenConfig .SSLEndpoint .Port )
636+ adminAPISSLPorts = lo .Map (kongListenConfig .SSLEndpoints , func (ep * proxyListenEndpoint , _ int ) intstr.IntOrString {
637+ return intstr .FromInt (ep .Port )
638+ })
639+ }
640+ if streamListen := k8sutils .EnvValueByName (container .Env , "KONG_STREAM_LISTEN" ); streamListen != "" {
641+ kongListenConfig , err := parseKongListenEnv (streamListen )
642+ if err != nil {
643+ return nil , fmt .Errorf ("failed parsing KONG_STREAM_LISTEN env: %w" , err )
631644 }
645+ // Since currently we only support TLS (TCP SSL) listeners, we only extract SSL ports here.
646+ // For TCPRoute and UDPRoute, we also need to extract TCP endpoints and UDP endpoints.
647+ streamListenPorts = lo .Map (kongListenConfig .SSLEndpoints , func (ep * proxyListenEndpoint , _ int ) intstr.IntOrString {
648+ return intstr .FromInt (ep .Port )
649+ })
632650 }
633651
634652 // Construct the policy to allow the KO pod to access DataPlane admin APIs.
@@ -653,25 +671,47 @@ func generateDataPlaneNetworkPolicy(
653671 }
654672 }
655673 limitAdminAPIIngress := networkingv1.NetworkPolicyIngressRule {
656- Ports : [] networkingv1.NetworkPolicyPort {
657- {Protocol : & protocolTCP , Port : & adminAPISSLPort },
658- },
674+ Ports : lo . Map ( adminAPISSLPorts , func ( port intstr. IntOrString , _ int ) networkingv1.NetworkPolicyPort {
675+ return networkingv1. NetworkPolicyPort {Protocol : & protocolTCP , Port : & port }
676+ }) ,
659677 From : []networkingv1.NetworkPolicyPeer {
660678 policyPeerForControllerPod ,
661679 },
662680 }
663681
664682 allowProxyIngress := networkingv1.NetworkPolicyIngressRule {
665- Ports : []networkingv1.NetworkPolicyPort {
666- {Protocol : & protocolTCP , Port : & proxyPort },
667- {Protocol : & protocolTCP , Port : & proxySSLPort },
668- },
683+ Ports : append (
684+ lo .Map (proxyPorts , func (port intstr.IntOrString , _ int ) networkingv1.NetworkPolicyPort {
685+ return networkingv1.NetworkPolicyPort {Protocol : & protocolTCP , Port : & port }
686+ }),
687+ lo .Map (proxySSLPorts , func (port intstr.IntOrString , _ int ) networkingv1.NetworkPolicyPort {
688+ return networkingv1.NetworkPolicyPort {Protocol : & protocolTCP , Port : & port }
689+ })... ,
690+ ),
669691 }
670692
671693 allowMetricsIngress := networkingv1.NetworkPolicyIngressRule {
672- Ports : []networkingv1.NetworkPolicyPort {
673- {Protocol : & protocolTCP , Port : & metricsPort },
674- },
694+ Ports : lo .Map (metricsPorts , func (port intstr.IntOrString , _ int ) networkingv1.NetworkPolicyPort {
695+ return networkingv1.NetworkPolicyPort {Protocol : & protocolTCP , Port : & port }
696+ }),
697+ }
698+
699+ ingressRules := []networkingv1.NetworkPolicyIngressRule {
700+ limitAdminAPIIngress ,
701+ allowProxyIngress ,
702+ allowMetricsIngress ,
703+ }
704+
705+ // Add a rule to allow ingress traffics to listened ports for stream proxy on dataplane pods.
706+ // Only add the rule when there are at least one stream proxy port because a rule with an empty port list allows ingress traffics to ALL ports,
707+ // then the whole NetworkPolicy allows ALL ingress traffics to the pods.
708+ if len (streamListenPorts ) > 0 {
709+ allowStreamIngress := networkingv1.NetworkPolicyIngressRule {
710+ Ports : lo .Map (streamListenPorts , func (port intstr.IntOrString , _ int ) networkingv1.NetworkPolicyPort {
711+ return networkingv1.NetworkPolicyPort {Protocol : & protocolTCP , Port : & port }
712+ }),
713+ }
714+ ingressRules = append (ingressRules , allowStreamIngress )
675715 }
676716
677717 return & networkingv1.NetworkPolicy {
@@ -688,11 +728,7 @@ func generateDataPlaneNetworkPolicy(
688728 PolicyTypes : []networkingv1.PolicyType {
689729 networkingv1 .PolicyTypeIngress ,
690730 },
691- Ingress : []networkingv1.NetworkPolicyIngressRule {
692- limitAdminAPIIngress ,
693- allowProxyIngress ,
694- allowMetricsIngress ,
695- },
731+ Ingress : ingressRules ,
696732 },
697733 }, nil
698734}
@@ -837,9 +873,8 @@ func supportedRoutesByProtocol() map[gatewayv1.ProtocolType]map[gatewayv1.Kind]s
837873 return map [gatewayv1.ProtocolType ]map [gatewayv1.Kind ]struct {}{
838874 gatewayv1 .HTTPProtocolType : {"HTTPRoute" : {}},
839875 gatewayv1 .HTTPSProtocolType : {"HTTPRoute" : {}},
840-
841- // L4 routes not supported yet
842- // gatewayv1.TLSProtocolType: {"TLSRoute": {}},
876+ gatewayv1 .TLSProtocolType : {"TLSRoute" : {}},
877+ // TCPRoutes ad UDPRoutes are not supported yet
843878 // gatewayv1.TCPProtocolType: {"TCPRoute": {}},
844879 // gatewayv1.UDPProtocolType: {"UDPRoute": {}},
845880 }
@@ -1024,8 +1059,11 @@ func countAttachedRoutesForGatewayListener(ctx context.Context, g *gwtypes.Gatew
10241059 )
10251060 }
10261061 count += countAttachedHTTPRoutes (listener , httpRoutes )
1062+ case "TLSRoute" :
1063+ // TODO: implement ListTLSRoute
1064+ return 0 , nil
10271065 default :
1028- return 0 , fmt .Errorf ("unsupported route kind: %T " , k )
1066+ return 0 , fmt .Errorf ("unsupported route kind: %s " , k )
10291067 }
10301068 }
10311069 }
@@ -1160,10 +1198,137 @@ func (g *gatewayConditionsAndListenersAwareT) setListenersStatus(status metav1.C
11601198 }
11611199}
11621200
1201+ // setDataPlaneOptionsForListeners configures DataPlaneOptions in generated DataPlane from listeners of the gateway.
1202+ // It includes configuring deployment options and ingress service options.
1203+ func setDataPlaneOptionsForListeners (
1204+ opts * operatorv1beta1.DataPlaneOptions ,
1205+ listeners []gatewayv1.Listener ,
1206+ listenersOpts []operatorv2beta1.GatewayConfigurationListenerOptions ,
1207+ ) error {
1208+ listenerPortToKongListenPort , err := setDataPlaneDeploymentListenPorts (opts , listeners )
1209+ if err != nil {
1210+ return err
1211+ }
1212+
1213+ return setDataPlaneIngressServicePorts (opts , listeners , listenersOpts , listenerPortToKongListenPort )
1214+ }
1215+
1216+ // isKnownPort returns true if the required listener port number is a well-known port (below 1024)
1217+ // that we usually cannot listen on Kong DP.
1218+ func isKnownPort (portNumber int ) bool {
1219+ return portNumber < 1024
1220+ }
1221+
1222+ // setDataPlaneDeploymentListenPorts configures deploymentOptions to set listen ports of the generated DataPlane.
1223+ // It returns the map from the listener's port to the port.
1224+ func setDataPlaneDeploymentListenPorts (
1225+ opts * operatorv1beta1.DataPlaneOptions ,
1226+ listeners []gatewayv1.Listener ,
1227+ ) (map [int ]int , error ) {
1228+
1229+ if opts .Deployment .PodTemplateSpec == nil {
1230+ return nil , errors .New ("podTemplateSpec in DeploymentOptions not initialized" )
1231+ }
1232+ container := k8sutils .GetPodContainerByName (& opts .Deployment .PodTemplateSpec .Spec , consts .DataPlaneProxyContainerName )
1233+ if container == nil {
1234+ return nil , errors .New ("dataPlane proxy container not initialized" )
1235+ }
1236+
1237+ listenerPortToKongListenPort := map [int ]int {}
1238+ kongPortOccupied := map [int ]struct {}{
1239+ consts .DataPlaneProxyPort : {},
1240+ consts .DataPlaneProxySSLPort : {},
1241+ consts .DataPlaneMetricsPort : {},
1242+ // Currently DataPlaneMetricsPort and DataPlaneStatusPort are the same port.
1243+ // We should uncomment this if they are changed to use different ports.
1244+ // consts.DataPlaneStatusPort: {},
1245+ consts .DataPlaneAdminAPIPort : {},
1246+ }
1247+
1248+ // Extract listeners with TLS ports.
1249+ tlsPorts := []int {}
1250+ var errs error
1251+ // assignedPortNumber and assignedPortMax defines the interval of ports to assign to listen on Kong stream proxy
1252+ // if the specified port in the listener is already occupied on Kong DataPlane.
1253+ assignedPortNumber := consts .DataPlaneAssignedPortStart
1254+ assignedPortMax := consts .DataPlaneAssignedPortStart + 1024
1255+
1256+ for i , l := range listeners {
1257+ switch l .Protocol {
1258+ case gatewayv1 .HTTPProtocolType :
1259+ listenerPortToKongListenPort [int (l .Port )] = consts .DataPlaneProxyPort
1260+ case gatewayv1 .HTTPSProtocolType :
1261+ listenerPortToKongListenPort [int (l .Port )] = consts .DataPlaneProxySSLPort
1262+ case gatewayv1 .TLSProtocolType :
1263+ portNumber := int (l .Port )
1264+ // TODO: support multiple listeners using the same port:
1265+ // https://github.com/Kong/kong-operator/issues/3511
1266+ tlsPorts = append (tlsPorts , portNumber )
1267+ // Assign another port if the listener's port is already allocated on Kong DP.
1268+ // Also re-assign a port if known ports (<1024) are used because we usually cannot listen on those port on Kong DP.
1269+ _ , occupied := kongPortOccupied [portNumber ]
1270+ if isKnownPort (portNumber ) || occupied {
1271+ for ; assignedPortNumber < assignedPortMax ; assignedPortNumber ++ {
1272+ if _ , occupied := kongPortOccupied [assignedPortNumber ]; ! occupied {
1273+ listenerPortToKongListenPort [portNumber ] = assignedPortNumber
1274+ kongPortOccupied [assignedPortNumber ] = struct {}{}
1275+ break
1276+ }
1277+ }
1278+ // Although it should not happen where no ports can be assigned for the listener,
1279+ // we attach an error if the case really happens.
1280+ if assignedPortNumber >= assignedPortMax {
1281+ errs = errors .Join (errs , fmt .Errorf ("listener %d's port %d already occupied and no available ports can be assigned" , i , portNumber ))
1282+ }
1283+
1284+ } else {
1285+ listenerPortToKongListenPort [portNumber ] = portNumber
1286+ kongPortOccupied [portNumber ] = struct {}{}
1287+ }
1288+ default :
1289+ errs = errors .Join (errs , fmt .Errorf ("listener %d uses unsupported protocol %s" , i , l .Protocol ))
1290+ }
1291+ }
1292+
1293+ if errs != nil {
1294+ return nil , errs
1295+ }
1296+
1297+ // Configure env `KONG_STREAM_LISTEN` if there are TLS listeners.
1298+ // To make the value of the env stable when listener not changed, we sort the ports here.
1299+ if len (tlsPorts ) > 0 {
1300+ sort .Ints (tlsPorts )
1301+ streamListenEnvs := make ([]string , 0 , len (tlsPorts ))
1302+ for _ , portNumber := range tlsPorts {
1303+ streamListenEnvs = append (streamListenEnvs , fmt .Sprintf ("0.0.0.0:%d ssl reuseport" , listenerPortToKongListenPort [portNumber ]))
1304+ }
1305+ k8sutils .SetContainerEnv (container , corev1.EnvVar {
1306+ Name : "KONG_STREAM_LISTEN" ,
1307+ Value : strings .Join (streamListenEnvs , "," ),
1308+ })
1309+ }
1310+
1311+ // Configure env KONG_PORT_MAPS.
1312+ // Also we sort the ports here to make the env stable.
1313+ listenerPorts := lo .Keys (listenerPortToKongListenPort )
1314+ sort .Ints (listenerPorts )
1315+ portMapEnvs := make ([]string , 0 , len (listenerPorts ))
1316+ for _ , portNumber := range listenerPorts {
1317+ portMapEnvs = append (portMapEnvs , fmt .Sprintf ("%d:%d" , portNumber , listenerPortToKongListenPort [portNumber ]))
1318+ }
1319+ k8sutils .SetContainerEnv (container , corev1.EnvVar {
1320+ Name : "KONG_PORT_MAPS" ,
1321+ Value : strings .Join (portMapEnvs , "," ),
1322+ })
1323+
1324+ return listenerPortToKongListenPort , nil
1325+ }
1326+
11631327func setDataPlaneIngressServicePorts (
11641328 opts * operatorv1beta1.DataPlaneOptions ,
11651329 listeners []gatewayv1.Listener ,
11661330 listenersOpts []operatorv2beta1.GatewayConfigurationListenerOptions ,
1331+ servicePortMap map [int ]int ,
11671332) error {
11681333 // Check if all the names in GatewayConfiguration's spec.listenersOptions matches a listener in Gateway.
11691334 for i , listenerOpts := range listenersOpts {
@@ -1187,6 +1352,10 @@ func setDataPlaneIngressServicePorts(
11871352 }
11881353 }
11891354
1355+ if servicePortMap == nil {
1356+ servicePortMap = map [int ]int {}
1357+ }
1358+
11901359 var errs error
11911360 for i , l := range listeners {
11921361 var name string
@@ -1206,6 +1375,13 @@ func setDataPlaneIngressServicePorts(
12061375 port .TargetPort = intstr .FromInt (consts .DataPlaneProxySSLPort )
12071376 case gatewayv1 .HTTPProtocolType :
12081377 port .TargetPort = intstr .FromInt (consts .DataPlaneProxyPort )
1378+ case gatewayv1 .TLSProtocolType :
1379+ targetPort , ok := servicePortMap [int (l .Port )]
1380+ if ! ok {
1381+ errs = errors .Join (errs , fmt .Errorf ("no target port assigned listener %s on port %d" , l .Name , l .Port ))
1382+ continue
1383+ }
1384+ port .TargetPort = intstr .FromInt (targetPort )
12091385 default :
12101386 errs = errors .Join (errs , fmt .Errorf ("listener %d uses unsupported protocol %s" , i , l .Protocol ))
12111387 continue
@@ -1349,8 +1525,8 @@ type proxyListenEndpoint struct {
13491525}
13501526
13511527type kongListenConfig struct {
1352- Endpoint * proxyListenEndpoint
1353- SSLEndpoint * proxyListenEndpoint
1528+ Endpoints [] * proxyListenEndpoint
1529+ SSLEndpoints [] * proxyListenEndpoint
13541530}
13551531
13561532// parseKongListenEnv parses the provided kong listen string and returns
@@ -1383,19 +1559,19 @@ func parseKongListenEnv(str string) (kongListenConfig, error) {
13831559 if err != nil {
13841560 return kongListenConfig , fmt .Errorf ("failed parsing port %s: %w" , port , err )
13851561 }
1386- kongListenConfig .SSLEndpoint = & proxyListenEndpoint {
1562+ kongListenConfig .SSLEndpoints = append ( kongListenConfig . SSLEndpoints , & proxyListenEndpoint {
13871563 Address : host ,
13881564 Port : p ,
1389- }
1565+ })
13901566 } else {
13911567 p , err := strconv .Atoi (port )
13921568 if err != nil {
13931569 return kongListenConfig , fmt .Errorf ("failed parsing port %s: %w" , port , err )
13941570 }
1395- kongListenConfig .Endpoint = & proxyListenEndpoint {
1571+ kongListenConfig .Endpoints = append ( kongListenConfig . Endpoints , & proxyListenEndpoint {
13961572 Address : host ,
13971573 Port : p ,
1398- }
1574+ })
13991575 }
14001576 }
14011577
0 commit comments